2014-11-26 10:55:58 -05:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								from __future__ import unicode_literals
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								import datetime
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import time
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 10:55:58 -05:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-05-21 02:02:36 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								import boto.kinesis
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import boto3
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from boto.kinesis.exceptions import ResourceNotFoundException, InvalidArgumentException
							 | 
						
					
						
							
								
									
										
										
										
											2019-05-21 02:02:36 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-05-10 21:58:42 -04:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from moto import mock_kinesis, mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2019-12-16 21:05:29 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from moto.core import ACCOUNT_ID
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 10:55:58 -05:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2020-08-03 11:04:05 -04:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								import sure  # noqa
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 10:55:58 -05:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 10:55:58 -05:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								def test_create_cluster():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-03-08 16:27:24 +01:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream("my_stream", 3)
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 10:55:58 -05:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_response = conn.describe_stream("my_stream")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream = stream_response["StreamDescription"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream["StreamName"].should.equal("my_stream")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream["HasMoreShards"].should.equal(False)
							 | 
						
					
						
							
								
									
										
										
										
											2019-12-16 21:25:20 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    stream["StreamARN"].should.equal(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        "arn:aws:kinesis:us-west-2:{}:my_stream".format(ACCOUNT_ID)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 10:55:58 -05:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream["StreamStatus"].should.equal("ACTIVE")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shards = stream["Shards"]
							 | 
						
					
						
							
								
									
										
										
										
											2019-03-08 16:27:24 +01:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shards.should.have.length_of(3)
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 10:55:58 -05:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2020-01-20 15:21:11 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def test_describe_non_existent_stream():
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 10:55:58 -05:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-east-1")
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.describe_stream.when.called_with("not-a-stream").should.throw(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ResourceNotFoundException
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 10:55:58 -05:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 10:55:58 -05:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								def test_list_and_delete_stream():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream("stream1", 1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream("stream2", 1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.list_streams()["StreamNames"].should.have.length_of(2)
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 10:55:58 -05:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.delete_stream("stream2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.list_streams()["StreamNames"].should.have.length_of(1)
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 10:55:58 -05:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Delete invalid id
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.delete_stream.when.called_with("not-a-stream").should.throw(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ResourceNotFoundException
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-05-10 21:58:42 -04:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								def test_list_many_streams():
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn = boto3.client("kinesis", region_name="us-west-2")
							 | 
						
					
						
							
								
									
										
										
										
											2017-05-10 21:58:42 -04:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    for i in range(11):
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        conn.create_stream(StreamName="stream%d" % i, ShardCount=1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    resp = conn.list_streams()
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_names = resp["StreamNames"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    has_more_streams = resp["HasMoreStreams"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_names.should.have.length_of(10)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    has_more_streams.should.be(True)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    resp2 = conn.list_streams(ExclusiveStartStreamName=stream_names[-1])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_names = resp2["StreamNames"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    has_more_streams = resp2["HasMoreStreams"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_names.should.have.length_of(1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    has_more_streams.should.equal(False)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-05-21 02:02:36 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								def test_describe_stream_summary():
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn = boto3.client("kinesis", region_name="us-west-2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream_summary"
							 | 
						
					
						
							
								
									
										
										
										
											2019-05-21 02:02:36 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_count = 5
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(StreamName=stream_name, ShardCount=shard_count)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    resp = conn.describe_stream_summary(StreamName=stream_name)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream = resp["StreamDescriptionSummary"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream["StreamName"].should.equal(stream_name)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream["OpenShardCount"].should.equal(shard_count)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream["StreamARN"].should.equal(
							 | 
						
					
						
							
								
									
										
										
										
											2019-12-15 19:22:26 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        "arn:aws:kinesis:us-west-2:{}:{}".format(ACCOUNT_ID, stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							
								
									
										
										
										
											2019-05-21 02:02:36 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    stream["StreamStatus"].should.equal("ACTIVE")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def test_basic_shard_iterator():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(stream_name, 1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.describe_stream(stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(shard_iterator)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["NextShardIterator"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response["Records"].should.equal([])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response["MillisBehindLatest"].should.equal(0)
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def test_get_invalid_shard_iterator():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(stream_name, 1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.get_shard_iterator.when.called_with(
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        stream_name, "123", "TRIM_HORIZON"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    ).should.throw(ResourceNotFoundException)
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def test_put_records():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(stream_name, 1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    data = "hello world"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    partition_key = "1234"
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.put_record.when.called_with(stream_name, data, 1234).should.throw(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        InvalidArgumentException
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.put_record(stream_name, data, partition_key)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.describe_stream(stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(shard_iterator)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["NextShardIterator"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response["Records"].should.have.length_of(1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    record = response["Records"][0]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    record["Data"].should.equal("hello world")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    record["PartitionKey"].should.equal("1234")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    record["SequenceNumber"].should.equal("1")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def test_get_records_limit():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(stream_name, 1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Create some data
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    data = "hello world"
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    for index in range(5):
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        conn.put_record(stream_name, data, str(index))
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Get a shard iterator
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.describe_stream(stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Retrieve only 3 records
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(shard_iterator, limit=3)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response["Records"].should.have.length_of(3)
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Then get the rest of the results
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    next_shard_iterator = response["NextShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(next_shard_iterator)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response["Records"].should.have.length_of(2)
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def test_get_records_at_sequence_number():
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    # AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted by
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # a specific sequence number.
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(stream_name, 1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Create some data
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    for index in range(1, 5):
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        conn.put_record(stream_name, str(index), str(index))
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Get a shard iterator
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.describe_stream(stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-04 09:15:19 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    # Get the second record
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(shard_iterator, limit=2)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    second_sequence_id = response["Records"][1]["SequenceNumber"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Then get a new iterator starting at that id
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        stream_name, shard_id, "AT_SEQUENCE_NUMBER", second_sequence_id
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(shard_iterator)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # And the first result returned should be the second item
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response["Records"][0]["SequenceNumber"].should.equal(second_sequence_id)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response["Records"][0]["Data"].should.equal("2")
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def test_get_records_after_sequence_number():
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    # AFTER_SEQUENCE_NUMBER - Start reading right after the position denoted
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # by a specific sequence number.
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(stream_name, 1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Create some data
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    for index in range(1, 5):
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        conn.put_record(stream_name, str(index), str(index))
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Get a shard iterator
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.describe_stream(stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Get the second record
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(shard_iterator, limit=2)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    second_sequence_id = response["Records"][1]["SequenceNumber"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Then get a new iterator starting after that id
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        stream_name, shard_id, "AFTER_SEQUENCE_NUMBER", second_sequence_id
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(shard_iterator)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # And the first result returned should be the third item
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response["Records"][0]["Data"].should.equal("3")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response["MillisBehindLatest"].should.equal(0)
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def test_get_records_latest():
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    # LATEST - Start reading just after the most recent record in the shard,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # so that you always read the most recent data in the shard.
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(stream_name, 1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Create some data
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    for index in range(1, 5):
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        conn.put_record(stream_name, str(index), str(index))
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Get a shard iterator
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.describe_stream(stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Get the second record
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(shard_iterator, limit=2)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    second_sequence_id = response["Records"][1]["SequenceNumber"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Then get a new iterator starting after that id
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        stream_name, shard_id, "LATEST", second_sequence_id
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Write some more data
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.put_record(stream_name, "last_record", "last_record")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(shard_iterator)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # And the only result returned should be the new item
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response["Records"].should.have.length_of(1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response["Records"][0]["PartitionKey"].should.equal("last_record")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response["Records"][0]["Data"].should.equal("last_record")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response["MillisBehindLatest"].should.equal(0)
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								def test_get_records_at_timestamp():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # AT_TIMESTAMP - Read the first record at or after the specified timestamp
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn = boto3.client("kinesis", region_name="us-west-2")
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(StreamName=stream_name, ShardCount=1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Create some data
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    for index in range(1, 5):
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        conn.put_record(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            StreamName=stream_name, Data=str(index), PartitionKey=str(index)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        )
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # When boto3 floors the timestamp that we pass to get_shard_iterator to
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # second precision even though AWS supports ms precision:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # To test around this limitation we wait until we well into the next second
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # before capturing the time and storing the records we expect to retrieve.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    time.sleep(1.0)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    timestamp = datetime.datetime.utcnow()
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    keys = [str(i) for i in range(5, 10)]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    for k in keys:
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        conn.put_record(StreamName=stream_name, Data=k, PartitionKey=k)
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Get a shard iterator
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.describe_stream(StreamName=stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        StreamName=stream_name,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ShardId=shard_id,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ShardIteratorType="AT_TIMESTAMP",
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        Timestamp=timestamp,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(ShardIterator=shard_iterator)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response["Records"].should.have.length_of(len(keys))
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    partition_keys = [r["PartitionKey"] for r in response["Records"]]
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    partition_keys.should.equal(keys)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response["MillisBehindLatest"].should.equal(0)
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								def test_get_records_at_very_old_timestamp():
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn = boto3.client("kinesis", region_name="us-west-2")
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(StreamName=stream_name, ShardCount=1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Create some data
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    keys = [str(i) for i in range(1, 5)]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    for k in keys:
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        conn.put_record(StreamName=stream_name, Data=k, PartitionKey=k)
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Get a shard iterator
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.describe_stream(StreamName=stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        StreamName=stream_name,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ShardId=shard_id,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ShardIteratorType="AT_TIMESTAMP",
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        Timestamp=1,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(ShardIterator=shard_iterator)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response["Records"].should.have.length_of(len(keys))
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    partition_keys = [r["PartitionKey"] for r in response["Records"]]
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    partition_keys.should.equal(keys)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response["MillisBehindLatest"].should.equal(0)
							 | 
						
					
						
							
								
									
										
										
										
											2018-07-13 12:06:28 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								def test_get_records_timestamp_filtering():
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn = boto3.client("kinesis", region_name="us-west-2")
							 | 
						
					
						
							
								
									
										
										
										
											2018-07-13 12:06:28 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(StreamName=stream_name, ShardCount=1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.put_record(StreamName=stream_name, Data="0", PartitionKey="0")
							 | 
						
					
						
							
								
									
										
										
										
											2018-07-13 12:06:28 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    time.sleep(1.0)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    timestamp = datetime.datetime.utcnow()
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.put_record(StreamName=stream_name, Data="1", PartitionKey="1")
							 | 
						
					
						
							
								
									
										
										
										
											2018-07-13 12:06:28 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.describe_stream(StreamName=stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        StreamName=stream_name,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ShardId=shard_id,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ShardIteratorType="AT_TIMESTAMP",
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        Timestamp=timestamp,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2018-07-13 12:06:28 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(ShardIterator=shard_iterator)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response["Records"].should.have.length_of(1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response["Records"][0]["PartitionKey"].should.equal("1")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response["Records"][0]["ApproximateArrivalTimestamp"].should.be.greater_than(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        timestamp
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response["MillisBehindLatest"].should.equal(0)
							 | 
						
					
						
							
								
									
										
										
										
											2018-07-13 12:06:28 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								def test_get_records_millis_behind_latest():
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn = boto3.client("kinesis", region_name="us-west-2")
							 | 
						
					
						
							
								
									
										
										
										
											2018-07-13 12:06:28 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(StreamName=stream_name, ShardCount=1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.put_record(StreamName=stream_name, Data="0", PartitionKey="0")
							 | 
						
					
						
							
								
									
										
										
										
											2018-07-13 12:06:28 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    time.sleep(1.0)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.put_record(StreamName=stream_name, Data="1", PartitionKey="1")
							 | 
						
					
						
							
								
									
										
										
										
											2018-07-13 12:06:28 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.describe_stream(StreamName=stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2018-07-13 12:06:28 +03:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(ShardIterator=shard_iterator, Limit=1)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response["Records"].should.have.length_of(1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response["MillisBehindLatest"].should.be.greater_than(0)
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								def test_get_records_at_very_new_timestamp():
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn = boto3.client("kinesis", region_name="us-west-2")
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(StreamName=stream_name, ShardCount=1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Create some data
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    keys = [str(i) for i in range(1, 5)]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    for k in keys:
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        conn.put_record(StreamName=stream_name, Data=k, PartitionKey=k)
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    timestamp = datetime.datetime.utcnow() + datetime.timedelta(seconds=1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Get a shard iterator
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.describe_stream(StreamName=stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        StreamName=stream_name,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ShardId=shard_id,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ShardIteratorType="AT_TIMESTAMP",
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        Timestamp=timestamp,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(ShardIterator=shard_iterator)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response["Records"].should.have.length_of(0)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response["MillisBehindLatest"].should.equal(0)
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								def test_get_records_from_empty_stream_at_timestamp():
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn = boto3.client("kinesis", region_name="us-west-2")
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(StreamName=stream_name, ShardCount=1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    timestamp = datetime.datetime.utcnow()
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Get a shard iterator
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.describe_stream(StreamName=stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        StreamName=stream_name,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ShardId=shard_id,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ShardIteratorType="AT_TIMESTAMP",
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        Timestamp=timestamp,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    shard_iterator = response["ShardIterator"]
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_records(ShardIterator=shard_iterator)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response["Records"].should.have.length_of(0)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response["MillisBehindLatest"].should.equal(0)
							 | 
						
					
						
							
								
									
										
										
										
											2017-12-08 02:57:05 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def test_invalid_shard_iterator_type():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(stream_name, 1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    response = conn.describe_stream(stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-26 20:49:21 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    response = conn.get_shard_iterator.when.called_with(
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        stream_name, shard_id, "invalid-type"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    ).should.throw(InvalidArgumentException)
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-04 09:13:08 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-04 09:13:08 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def test_add_tags():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(stream_name, 1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.describe_stream(stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.add_tags_to_stream(stream_name, {"tag1": "val1"})
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.add_tags_to_stream(stream_name, {"tag2": "val2"})
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.add_tags_to_stream(stream_name, {"tag1": "val3"})
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.add_tags_to_stream(stream_name, {"tag2": "val4"})
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-04 09:13:08 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-04 09:13:08 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def test_list_tags():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(stream_name, 1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.describe_stream(stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.add_tags_to_stream(stream_name, {"tag1": "val1"})
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags = dict(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        [
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            (tag["Key"], tag["Value"])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            for tag in conn.list_tags_for_stream(stream_name)["Tags"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags.get("tag1").should.equal("val1")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.add_tags_to_stream(stream_name, {"tag2": "val2"})
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags = dict(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        [
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            (tag["Key"], tag["Value"])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            for tag in conn.list_tags_for_stream(stream_name)["Tags"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags.get("tag2").should.equal("val2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.add_tags_to_stream(stream_name, {"tag1": "val3"})
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags = dict(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        [
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            (tag["Key"], tag["Value"])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            for tag in conn.list_tags_for_stream(stream_name)["Tags"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags.get("tag1").should.equal("val3")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.add_tags_to_stream(stream_name, {"tag2": "val4"})
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags = dict(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        [
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            (tag["Key"], tag["Value"])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            for tag in conn.list_tags_for_stream(stream_name)["Tags"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags.get("tag2").should.equal("val4")
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-04 09:13:08 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-04 09:13:08 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def test_remove_tags():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(stream_name, 1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.describe_stream(stream_name)
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.add_tags_to_stream(stream_name, {"tag1": "val1"})
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags = dict(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        [
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            (tag["Key"], tag["Value"])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            for tag in conn.list_tags_for_stream(stream_name)["Tags"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags.get("tag1").should.equal("val1")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.remove_tags_from_stream(stream_name, ["tag1"])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags = dict(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        [
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            (tag["Key"], tag["Value"])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            for tag in conn.list_tags_for_stream(stream_name)["Tags"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags.get("tag1").should.equal(None)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.add_tags_to_stream(stream_name, {"tag2": "val2"})
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags = dict(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        [
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            (tag["Key"], tag["Value"])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            for tag in conn.list_tags_for_stream(stream_name)["Tags"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags.get("tag2").should.equal("val2")
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.remove_tags_from_stream(stream_name, ["tag2"])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags = dict(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        [
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            (tag["Key"], tag["Value"])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            for tag in conn.list_tags_for_stream(stream_name)["Tags"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    )
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    tags.get("tag2").should.equal(None)
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def test_split_shard():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(stream_name, 2)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Create some data
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    for index in range(1, 100):
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        conn.put_record(stream_name, str(index), str(index))
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_response = conn.describe_stream(stream_name)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream = stream_response["StreamDescription"]
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shards = stream["Shards"]
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shards.should.have.length_of(2)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_range = shards[0]["HashKeyRange"]
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    new_starting_hash = (
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    ) // 2
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.split_shard("my_stream", shards[0]["ShardId"], str(new_starting_hash))
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_response = conn.describe_stream(stream_name)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream = stream_response["StreamDescription"]
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shards = stream["Shards"]
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shards.should.have.length_of(3)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shard_range = shards[2]["HashKeyRange"]
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    new_starting_hash = (
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    ) // 2
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.split_shard("my_stream", shards[2]["ShardId"], str(new_starting_hash))
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_response = conn.describe_stream(stream_name)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream = stream_response["StreamDescription"]
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shards = stream["Shards"]
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shards.should.have.length_of(4)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-15 22:35:45 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								@mock_kinesis_deprecated
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def test_merge_shards():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn = boto.kinesis.connect_to_region("us-west-2")
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    stream_name = "my_stream"
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    conn.create_stream(stream_name, 4)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # Create some data
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    for index in range(1, 100):
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        conn.put_record(stream_name, str(index), str(index))
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_response = conn.describe_stream(stream_name)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream = stream_response["StreamDescription"]
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shards = stream["Shards"]
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shards.should.have.length_of(4)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.merge_shards.when.called_with(
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        stream_name, "shardId-000000000000", "shardId-000000000002"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    ).should.throw(InvalidArgumentException)
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_response = conn.describe_stream(stream_name)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream = stream_response["StreamDescription"]
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shards = stream["Shards"]
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shards.should.have.length_of(4)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.merge_shards(stream_name, "shardId-000000000000", "shardId-000000000001")
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_response = conn.describe_stream(stream_name)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream = stream_response["StreamDescription"]
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shards = stream["Shards"]
							 | 
						
					
						
							
								
									
										
										
										
											2020-08-03 11:04:05 -04:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    active_shards = [
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        shard
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        for shard in shards
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        if "EndingSequenceNumber" not in shard["SequenceNumberRange"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    ]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    active_shards.should.have.length_of(3)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    conn.merge_shards(stream_name, "shardId-000000000002", "shardId-000000000000")
							 | 
						
					
						
							
								
									
										
										
										
											2015-12-05 11:13:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream_response = conn.describe_stream(stream_name)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    stream = stream_response["StreamDescription"]
							 | 
						
					
						
							
								
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    shards = stream["Shards"]
							 | 
						
					
						
							
								
									
										
										
										
											2020-08-03 11:04:05 -04:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    active_shards = [
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        shard
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        for shard in shards
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        if "EndingSequenceNumber" not in shard["SequenceNumberRange"]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    ]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    active_shards.should.have.length_of(2)
							 |