diff --git a/tests/test_kinesis/test_kinesis.py b/tests/test_kinesis/test_kinesis.py index cd04afd56..2a8c0fc6d 100644 --- a/tests/test_kinesis/test_kinesis.py +++ b/tests/test_kinesis/test_kinesis.py @@ -17,6 +17,7 @@ from moto.core import ACCOUNT_ID import sure # noqa +# Has boto3 equivalent @mock_kinesis_deprecated def test_create_cluster(): conn = boto.kinesis.connect_to_region("us-west-2") @@ -37,6 +38,7 @@ def test_create_cluster(): shards.should.have.length_of(3) +# Has boto3 equivalent @mock_kinesis_deprecated def test_describe_non_existent_stream(): conn = boto.kinesis.connect_to_region("us-east-1") @@ -45,6 +47,19 @@ def test_describe_non_existent_stream(): ) +@mock_kinesis +def test_describe_non_existent_stream_boto3(): + client = boto3.client("kinesis", region_name="us-west-2") + with pytest.raises(ClientError) as exc: + client.describe_stream_summary(StreamName="not-a-stream") + err = exc.value.response["Error"] + err["Code"].should.equal("ResourceNotFoundException") + err["Message"].should.equal( + "Stream not-a-stream under account 123456789012 not found." + ) + + +# Has boto3 equivalent @mock_kinesis_deprecated def test_list_and_delete_stream(): conn = boto.kinesis.connect_to_region("us-west-2") @@ -64,6 +79,31 @@ def test_list_and_delete_stream(): ) +@mock_kinesis +def test_list_and_delete_stream_boto3(): + client = boto3.client("kinesis", region_name="us-west-2") + client.list_streams()["StreamNames"].should.have.length_of(0) + + client.create_stream(StreamName="stream1", ShardCount=1) + client.create_stream(StreamName="stream2", ShardCount=1) + client.list_streams()["StreamNames"].should.have.length_of(2) + + client.delete_stream(StreamName="stream1") + client.list_streams()["StreamNames"].should.have.length_of(1) + + +@mock_kinesis +def test_delete_unknown_stream(): + client = boto3.client("kinesis", region_name="us-west-2") + with pytest.raises(ClientError) as exc: + client.delete_stream(StreamName="not-a-stream") + err = exc.value.response["Error"] + err["Code"].should.equal("ResourceNotFoundException") + err["Message"].should.equal( + "Stream not-a-stream under account 123456789012 not found." + ) + + @mock_kinesis def test_list_many_streams(): conn = boto3.client("kinesis", region_name="us-west-2") @@ -101,6 +141,7 @@ def test_describe_stream_summary(): stream["StreamStatus"].should.equal("ACTIVE") +# Has boto3 equivalent @mock_kinesis_deprecated def test_basic_shard_iterator(): conn = boto.kinesis.connect_to_region("us-west-2") @@ -120,6 +161,26 @@ def test_basic_shard_iterator(): response["MillisBehindLatest"].should.equal(0) +@mock_kinesis +def test_basic_shard_iterator_boto3(): + client = boto3.client("kinesis", region_name="us-west-1") + + stream_name = "mystream" + client.create_stream(StreamName=stream_name, ShardCount=1) + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shard_id = stream["Shards"][0]["ShardId"] + + resp = client.get_shard_iterator( + StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON" + ) + shard_iterator = resp["ShardIterator"] + + resp = client.get_records(ShardIterator=shard_iterator) + resp.should.have.key("Records").length_of(0) + resp.should.have.key("MillisBehindLatest").equal(0) + + +# Has boto3 equivalent @mock_kinesis_deprecated def test_get_invalid_shard_iterator(): conn = boto.kinesis.connect_to_region("us-west-2") @@ -132,6 +193,23 @@ def test_get_invalid_shard_iterator(): ).should.throw(ResourceNotFoundException) +@mock_kinesis +def test_get_invalid_shard_iterator_boto3(): + client = boto3.client("kinesis", region_name="us-west-1") + + stream_name = "mystream" + client.create_stream(StreamName=stream_name, ShardCount=1) + + with pytest.raises(ClientError) as exc: + client.get_shard_iterator( + StreamName=stream_name, ShardId="123", ShardIteratorType="TRIM_HORIZON" + ) + err = exc.value.response["Error"] + err["Code"].should.equal("ResourceNotFoundException") + err["Message"].should.equal("Shard 123 under account 123456789012 not found.") + + +# Has boto3 equivalent @mock_kinesis_deprecated def test_put_records(): conn = boto.kinesis.connect_to_region("us-west-2") @@ -164,6 +242,35 @@ def test_put_records(): record["SequenceNumber"].should.equal("1") +@mock_kinesis +def test_put_records_boto3(): + client = boto3.client("kinesis", region_name="eu-west-2") + + stream_name = "my_stream_summary" + client.create_stream(StreamName=stream_name, ShardCount=1) + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shard_id = stream["Shards"][0]["ShardId"] + + data = b"hello world" + partition_key = "1234" + + client.put_record(StreamName=stream_name, Data=data, PartitionKey=partition_key) + + resp = client.get_shard_iterator( + StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON" + ) + shard_iterator = resp["ShardIterator"] + + resp = client.get_records(ShardIterator=shard_iterator) + resp["Records"].should.have.length_of(1) + record = resp["Records"][0] + + record["Data"].should.equal(b"hello world") + record["PartitionKey"].should.equal("1234") + record["SequenceNumber"].should.equal("1") + + +# Has boto3 equivalent @mock_kinesis_deprecated def test_get_records_limit(): conn = boto.kinesis.connect_to_region("us-west-2") @@ -193,6 +300,36 @@ def test_get_records_limit(): response["Records"].should.have.length_of(2) +@mock_kinesis +def test_get_records_limit_boto3(): + client = boto3.client("kinesis", region_name="eu-west-2") + + stream_name = "my_stream_summary" + client.create_stream(StreamName=stream_name, ShardCount=1) + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shard_id = stream["Shards"][0]["ShardId"] + + data = b"hello world" + + for index in range(5): + client.put_record(StreamName=stream_name, Data=data, PartitionKey=str(index)) + + resp = client.get_shard_iterator( + StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON" + ) + shard_iterator = resp["ShardIterator"] + + # Retrieve only 3 records + resp = client.get_records(ShardIterator=shard_iterator, Limit=3) + resp["Records"].should.have.length_of(3) + + # Then get the rest of the results + next_shard_iterator = resp["NextShardIterator"] + response = client.get_records(ShardIterator=next_shard_iterator) + response["Records"].should.have.length_of(2) + + +# Has boto3 equivalent @mock_kinesis_deprecated def test_get_records_at_sequence_number(): # AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted by @@ -227,6 +364,46 @@ def test_get_records_at_sequence_number(): response["Records"][0]["Data"].should.equal("2") +@mock_kinesis +def test_get_records_at_sequence_number_boto3(): + client = boto3.client("kinesis", region_name="eu-west-2") + stream_name = "my_stream_summary" + client.create_stream(StreamName=stream_name, ShardCount=1) + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shard_id = stream["Shards"][0]["ShardId"] + + for index in range(1, 5): + client.put_record( + StreamName=stream_name, + Data=f"data_{index}".encode("utf-8"), + PartitionKey=str(index), + ) + + resp = client.get_shard_iterator( + StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON" + ) + shard_iterator = resp["ShardIterator"] + + # Retrieve only 2 records + resp = client.get_records(ShardIterator=shard_iterator, Limit=2) + sequence_nr = resp["Records"][1]["SequenceNumber"] + + # Then get a new iterator starting at that id + resp = client.get_shard_iterator( + StreamName=stream_name, + ShardId=shard_id, + ShardIteratorType="AT_SEQUENCE_NUMBER", + StartingSequenceNumber=sequence_nr, + ) + shard_iterator = resp["ShardIterator"] + + resp = client.get_records(ShardIterator=shard_iterator) + # And the first result returned should be the second item + resp["Records"][0]["SequenceNumber"].should.equal(sequence_nr) + resp["Records"][0]["Data"].should.equal(b"data_2") + + +# Has boto3 equivalent @mock_kinesis_deprecated def test_get_records_after_sequence_number(): # AFTER_SEQUENCE_NUMBER - Start reading right after the position denoted @@ -261,6 +438,47 @@ def test_get_records_after_sequence_number(): response["MillisBehindLatest"].should.equal(0) +@mock_kinesis +def test_get_records_after_sequence_number_boto3(): + client = boto3.client("kinesis", region_name="eu-west-2") + stream_name = "my_stream_summary" + client.create_stream(StreamName=stream_name, ShardCount=1) + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shard_id = stream["Shards"][0]["ShardId"] + + for index in range(1, 5): + client.put_record( + StreamName=stream_name, + Data=f"data_{index}".encode("utf-8"), + PartitionKey=str(index), + ) + + resp = client.get_shard_iterator( + StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON" + ) + shard_iterator = resp["ShardIterator"] + + # Retrieve only 2 records + resp = client.get_records(ShardIterator=shard_iterator, Limit=2) + sequence_nr = resp["Records"][1]["SequenceNumber"] + + # Then get a new iterator starting at that id + resp = client.get_shard_iterator( + StreamName=stream_name, + ShardId=shard_id, + ShardIteratorType="AFTER_SEQUENCE_NUMBER", + StartingSequenceNumber=sequence_nr, + ) + shard_iterator = resp["ShardIterator"] + + resp = client.get_records(ShardIterator=shard_iterator) + # And the first result returned should be the second item + resp["Records"][0]["SequenceNumber"].should.equal("3") + resp["Records"][0]["Data"].should.equal(b"data_3") + resp["MillisBehindLatest"].should.equal(0) + + +# Has boto3 equivalent @mock_kinesis_deprecated def test_get_records_latest(): # LATEST - Start reading just after the most recent record in the shard, @@ -300,6 +518,52 @@ def test_get_records_latest(): response["MillisBehindLatest"].should.equal(0) +@mock_kinesis +def test_get_records_latest_boto3(): + client = boto3.client("kinesis", region_name="eu-west-2") + stream_name = "my_stream_summary" + client.create_stream(StreamName=stream_name, ShardCount=1) + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shard_id = stream["Shards"][0]["ShardId"] + + for index in range(1, 5): + client.put_record( + StreamName=stream_name, + Data=f"data_{index}".encode("utf-8"), + PartitionKey=str(index), + ) + + resp = client.get_shard_iterator( + StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON" + ) + shard_iterator = resp["ShardIterator"] + + # Retrieve only 2 records + resp = client.get_records(ShardIterator=shard_iterator, Limit=2) + sequence_nr = resp["Records"][1]["SequenceNumber"] + + # Then get a new iterator starting at that id + resp = client.get_shard_iterator( + StreamName=stream_name, + ShardId=shard_id, + ShardIteratorType="LATEST", + StartingSequenceNumber=sequence_nr, + ) + shard_iterator = resp["ShardIterator"] + + client.put_record( + StreamName=stream_name, Data=b"last_record", PartitionKey="last_record" + ) + + resp = client.get_records(ShardIterator=shard_iterator) + # And the first result returned should be the second item + resp["Records"].should.have.length_of(1) + resp["Records"][0]["SequenceNumber"].should.equal("5") + resp["Records"][0]["PartitionKey"].should.equal("last_record") + resp["Records"][0]["Data"].should.equal(b"last_record") + resp["MillisBehindLatest"].should.equal(0) + + @mock_kinesis def test_get_records_at_timestamp(): # AT_TIMESTAMP - Read the first record at or after the specified timestamp @@ -547,6 +811,7 @@ def test_invalid_decrease_stream_retention_period(): ex.value.response["Error"]["Message"].should.equal(20) +# Has boto3 equivalent @mock_kinesis_deprecated def test_invalid_shard_iterator_type(): conn = boto.kinesis.connect_to_region("us-west-2") @@ -560,6 +825,24 @@ def test_invalid_shard_iterator_type(): ).should.throw(InvalidArgumentException) +@mock_kinesis +def test_invalid_shard_iterator_type_boto3(): + client = boto3.client("kinesis", region_name="eu-west-2") + stream_name = "my_stream_summary" + client.create_stream(StreamName=stream_name, ShardCount=1) + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shard_id = stream["Shards"][0]["ShardId"] + + with pytest.raises(ClientError) as exc: + client.get_shard_iterator( + StreamName=stream_name, ShardId=shard_id, ShardIteratorType="invalid-type" + ) + err = exc.value.response["Error"] + err["Code"].should.equal("InvalidArgumentException") + err["Message"].should.equal("Invalid ShardIteratorType: invalid-type") + + +# Has boto3 equivalent @mock_kinesis_deprecated def test_add_tags(): conn = boto.kinesis.connect_to_region("us-west-2") @@ -573,6 +856,39 @@ def test_add_tags(): conn.add_tags_to_stream(stream_name, {"tag2": "val4"}) +@mock_kinesis +def test_add_list_remove_tags_boto3(): + client = boto3.client("kinesis", region_name="eu-west-2") + stream_name = "my_stream_summary" + client.create_stream(StreamName=stream_name, ShardCount=1) + client.add_tags_to_stream( + StreamName=stream_name, + Tags={"tag1": "val1", "tag2": "val2", "tag3": "val3", "tag4": "val4",}, + ) + + tags = client.list_tags_for_stream(StreamName=stream_name)["Tags"] + tags.should.have.length_of(4) + tags.should.contain({"Key": "tag1", "Value": "val1"}) + tags.should.contain({"Key": "tag2", "Value": "val2"}) + tags.should.contain({"Key": "tag3", "Value": "val3"}) + tags.should.contain({"Key": "tag4", "Value": "val4"}) + + client.add_tags_to_stream(StreamName=stream_name, Tags={"tag5": "val5"}) + + tags = client.list_tags_for_stream(StreamName=stream_name)["Tags"] + tags.should.have.length_of(5) + tags.should.contain({"Key": "tag5", "Value": "val5"}) + + client.remove_tags_from_stream(StreamName=stream_name, TagKeys=["tag2", "tag3"]) + + tags = client.list_tags_for_stream(StreamName=stream_name)["Tags"] + tags.should.have.length_of(3) + tags.should.contain({"Key": "tag1", "Value": "val1"}) + tags.should.contain({"Key": "tag4", "Value": "val4"}) + tags.should.contain({"Key": "tag5", "Value": "val5"}) + + +# Has boto3 equivalent @mock_kinesis_deprecated def test_list_tags(): conn = boto.kinesis.connect_to_region("us-west-2") @@ -614,6 +930,7 @@ def test_list_tags(): tags.get("tag2").should.equal("val4") +# Has boto3 equivalent @mock_kinesis_deprecated def test_remove_tags(): conn = boto.kinesis.connect_to_region("us-west-2") @@ -656,6 +973,7 @@ def test_remove_tags(): tags.get("tag2").should.equal(None) +# Has boto3 equivalent @mock_kinesis_deprecated def test_split_shard(): conn = boto.kinesis.connect_to_region("us-west-2") @@ -698,6 +1016,53 @@ def test_split_shard(): shards.should.have.length_of(4) +@mock_kinesis +def test_split_shard_boto3(): + client = boto3.client("kinesis", region_name="eu-west-2") + stream_name = "my_stream_summary" + client.create_stream(StreamName=stream_name, ShardCount=2) + + for index in range(1, 100): + client.put_record( + StreamName=stream_name, + Data=f"data_{index}".encode("utf-8"), + PartitionKey=str(index), + ) + + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shards = stream["Shards"] + shards.should.have.length_of(2) + + shard_range = shards[0]["HashKeyRange"] + new_starting_hash = ( + int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"]) + ) // 2 + client.split_shard( + StreamName=stream_name, + ShardToSplit=shards[0]["ShardId"], + NewStartingHashKey=str(new_starting_hash), + ) + + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shards = stream["Shards"] + shards.should.have.length_of(3) + + shard_range = shards[2]["HashKeyRange"] + new_starting_hash = ( + int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"]) + ) // 2 + client.split_shard( + StreamName=stream_name, + ShardToSplit=shards[2]["ShardId"], + NewStartingHashKey=str(new_starting_hash), + ) + + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shards = stream["Shards"] + shards.should.have.length_of(4) + + +# Has boto3 equivalent @mock_kinesis_deprecated def test_merge_shards(): conn = boto.kinesis.connect_to_region("us-west-2") @@ -751,3 +1116,68 @@ def test_merge_shards(): ] active_shards.should.have.length_of(2) + + +@mock_kinesis +def test_merge_shards_boto3(): + client = boto3.client("kinesis", region_name="eu-west-2") + stream_name = "my_stream_summary" + client.create_stream(StreamName=stream_name, ShardCount=4) + + for index in range(1, 100): + client.put_record( + StreamName=stream_name, + Data=f"data_{index}".encode("utf-8"), + PartitionKey=str(index), + ) + + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shards = stream["Shards"] + shards.should.have.length_of(4) + + client.merge_shards( + StreamName=stream_name, + ShardToMerge="shardId-000000000000", + AdjacentShardToMerge="shardId-000000000001", + ) + + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shards = stream["Shards"] + active_shards = [ + shard + for shard in shards + if "EndingSequenceNumber" not in shard["SequenceNumberRange"] + ] + active_shards.should.have.length_of(3) + + client.merge_shards( + StreamName=stream_name, + ShardToMerge="shardId-000000000002", + AdjacentShardToMerge="shardId-000000000000", + ) + + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shards = stream["Shards"] + active_shards = [ + shard + for shard in shards + if "EndingSequenceNumber" not in shard["SequenceNumberRange"] + ] + active_shards.should.have.length_of(2) + + +@mock_kinesis +def test_merge_shards_invalid_arg(): + client = boto3.client("kinesis", region_name="eu-west-2") + stream_name = "my_stream_summary" + client.create_stream(StreamName=stream_name, ShardCount=4) + + with pytest.raises(ClientError) as exc: + client.merge_shards( + StreamName=stream_name, + ShardToMerge="shardId-000000000000", + AdjacentShardToMerge="shardId-000000000002", + ) + err = exc.value.response["Error"] + err["Code"].should.equal("InvalidArgumentException") + err["Message"].should.equal("shardId-000000000002")