From d7dd8fb4c58f680d15408247183dbe35dc3e9c7c Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sun, 7 Nov 2021 19:24:54 -0100 Subject: [PATCH] Kinesis - Fix split_shards behaviour (#4540) --- moto/kinesis/exceptions.py | 15 +- moto/kinesis/models.py | 75 ++++++-- tests/test_kinesis/test_kinesis.py | 99 +--------- tests/test_kinesis/test_kinesis_boto3.py | 233 ++++++++++++++++++----- 4 files changed, 262 insertions(+), 160 deletions(-) diff --git a/moto/kinesis/exceptions.py b/moto/kinesis/exceptions.py index 9aa8a6f94..ec9e95f71 100644 --- a/moto/kinesis/exceptions.py +++ b/moto/kinesis/exceptions.py @@ -27,9 +27,9 @@ class StreamNotFoundError(ResourceNotFoundError): class ShardNotFoundError(ResourceNotFoundError): - def __init__(self, shard_id): + def __init__(self, shard_id, stream): super(ShardNotFoundError, self).__init__( - "Shard {0} under account {1} not found.".format(shard_id, ACCOUNT_ID) + f"Could not find shard {shard_id} in stream {stream} under account {ACCOUNT_ID}." ) @@ -39,3 +39,14 @@ class InvalidArgumentError(BadRequest): self.description = json.dumps( {"message": message, "__type": "InvalidArgumentException"} ) + + +class ValidationException(BadRequest): + def __init__(self, value, position, regex_to_match): + super(ValidationException, self).__init__() + self.description = json.dumps( + { + "message": f"1 validation error detected: Value '{value}' at '{position}' failed to satisfy constraint: Member must satisfy regular expression pattern: {regex_to_match}", + "__type": "ValidationException", + } + ) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index af9068297..99c3f1052 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -18,6 +18,7 @@ from .exceptions import ( ResourceInUseError, ResourceNotFoundError, InvalidArgumentError, + ValidationException, ) from .utils import ( compose_shard_iterator, @@ -46,12 +47,13 @@ class Record(BaseModel): class Shard(BaseModel): - def __init__(self, shard_id, starting_hash, ending_hash): + def __init__(self, shard_id, starting_hash, ending_hash, parent=None): self._shard_id = shard_id self.starting_hash = starting_hash self.ending_hash = ending_hash self.records = OrderedDict() self.is_open = True + self.parent = parent @property def shard_id(self): @@ -125,6 +127,8 @@ class Shard(BaseModel): }, "ShardId": self.shard_id, } + if self.parent: + response["ParentShardId"] = self.parent if not self.is_open: response["SequenceNumberRange"][ "EndingSequenceNumber" @@ -146,6 +150,8 @@ class Stream(CloudFormationModel): self.retention_period_hours = ( retention_period_hours if retention_period_hours else 24 ) + self.enhanced_monitoring = [{"ShardLevelMetrics": []}] + self.encryption_type = "NONE" def update_shard_count(self, shard_count): # ToDo: This was extracted from init. It's only accurate for new streams. @@ -165,7 +171,7 @@ class Stream(CloudFormationModel): @property def arn(self): - return "arn:aws:kinesis:{region}:{account_number}:{stream_name}".format( + return "arn:aws:kinesis:{region}:{account_number}:stream/{stream_name}".format( region=self.region, account_number=self.account_number, stream_name=self.stream_name, @@ -175,7 +181,7 @@ class Stream(CloudFormationModel): if shard_id in self.shards: return self.shards[shard_id] else: - raise ShardNotFoundError(shard_id) + raise ShardNotFoundError(shard_id, stream="") def get_shard_for_key(self, partition_key, explicit_hash_key): if not isinstance(partition_key, str): @@ -214,9 +220,12 @@ class Stream(CloudFormationModel): "StreamDescription": { "StreamARN": self.arn, "StreamName": self.stream_name, + "StreamCreationTimestamp": str(self.creation_datetime), "StreamStatus": self.status, "HasMoreShards": len(requested_shards) != len(all_shards), "RetentionPeriodHours": self.retention_period_hours, + "EnhancedMonitoring": self.enhanced_monitoring, + "EncryptionType": self.encryption_type, "Shards": [shard.to_json() for shard in requested_shards], } } @@ -374,7 +383,12 @@ class KinesisBackend(BaseBackend): ): # Validate params stream = self.describe_stream(stream_name) - shard = stream.get_shard(shard_id) + try: + shard = stream.get_shard(shard_id) + except ShardNotFoundError: + raise ResourceNotFoundError( + message=f"Shard {shard_id} in stream {stream_name} under account {ACCOUNT_ID} does not exist" + ) shard_iterator = compose_new_shard_iterator( stream_name, @@ -440,25 +454,58 @@ class KinesisBackend(BaseBackend): def split_shard(self, stream_name, shard_to_split, new_starting_hash_key): stream = self.describe_stream(stream_name) + if not re.match("[a-zA-Z0-9_.-]+", shard_to_split): + raise ValidationException( + value=shard_to_split, + position="shardToSplit", + regex_to_match="[a-zA-Z0-9_.-]+", + ) + if shard_to_split not in stream.shards: - raise ResourceNotFoundError(shard_to_split) + raise ShardNotFoundError(shard_id=shard_to_split, stream=stream_name) if not re.match(r"0|([1-9]\d{0,38})", new_starting_hash_key): - raise InvalidArgumentError(new_starting_hash_key) + raise ValidationException( + value=new_starting_hash_key, + position="newStartingHashKey", + regex_to_match=r"0|([1-9]\d{0,38})", + ) new_starting_hash_key = int(new_starting_hash_key) shard = stream.shards[shard_to_split] + if shard.starting_hash < new_starting_hash_key < shard.ending_hash: + pass + else: + raise InvalidArgumentError( + message=f"NewStartingHashKey {new_starting_hash_key} used in SplitShard() on shard {shard_to_split} in stream {stream_name} under account {ACCOUNT_ID} is not both greater than one plus the shard's StartingHashKey {shard.starting_hash} and less than the shard's EndingHashKey {(shard.ending_hash-1)}." + ) + + if not shard.is_open: + raise InvalidArgumentError( + message=f"Shard {shard.shard_id} in stream {stream_name} under account {ACCOUNT_ID} has already been merged or split, and thus is not eligible for merging or splitting." + ) + last_id = sorted(stream.shards.values(), key=attrgetter("_shard_id"))[ -1 ]._shard_id - if shard.starting_hash < new_starting_hash_key < shard.ending_hash: - new_shard = Shard(last_id + 1, new_starting_hash_key, shard.ending_hash) - shard.ending_hash = new_starting_hash_key - stream.shards[new_shard.shard_id] = new_shard - else: - raise InvalidArgumentError(new_starting_hash_key) + # Create two new shards + new_shard_1 = Shard( + last_id + 1, + starting_hash=shard.starting_hash, + ending_hash=new_starting_hash_key - 1, + parent=shard.shard_id, + ) + new_shard_2 = Shard( + last_id + 2, + starting_hash=new_starting_hash_key, + ending_hash=shard.ending_hash, + parent=shard.shard_id, + ) + stream.shards[new_shard_1.shard_id] = new_shard_1 + stream.shards[new_shard_2.shard_id] = new_shard_2 + shard.is_open = False records = shard.records shard.records = OrderedDict() @@ -473,10 +520,10 @@ class KinesisBackend(BaseBackend): stream = self.describe_stream(stream_name) if shard_to_merge not in stream.shards: - raise ResourceNotFoundError(shard_to_merge) + raise ShardNotFoundError(shard_to_merge, stream=stream_name) if adjacent_shard_to_merge not in stream.shards: - raise ResourceNotFoundError(adjacent_shard_to_merge) + raise ShardNotFoundError(adjacent_shard_to_merge, stream=stream_name) shard1 = stream.shards[shard_to_merge] shard2 = stream.shards[adjacent_shard_to_merge] diff --git a/tests/test_kinesis/test_kinesis.py b/tests/test_kinesis/test_kinesis.py index 98f2834e4..9e50ba588 100644 --- a/tests/test_kinesis/test_kinesis.py +++ b/tests/test_kinesis/test_kinesis.py @@ -28,7 +28,7 @@ def test_create_cluster(): stream["StreamName"].should.equal("my_stream") stream["HasMoreShards"].should.equal(False) stream["StreamARN"].should.equal( - "arn:aws:kinesis:us-west-2:{}:my_stream".format(ACCOUNT_ID) + "arn:aws:kinesis:us-west-2:{}:stream/my_stream".format(ACCOUNT_ID) ) stream["StreamStatus"].should.equal("ACTIVE") @@ -134,7 +134,7 @@ def test_describe_stream_summary(): stream["StreamName"].should.equal(stream_name) stream["OpenShardCount"].should.equal(shard_count) stream["StreamARN"].should.equal( - "arn:aws:kinesis:us-west-2:{}:{}".format(ACCOUNT_ID, stream_name) + "arn:aws:kinesis:us-west-2:{}:stream/{}".format(ACCOUNT_ID, stream_name) ) stream["StreamStatus"].should.equal("ACTIVE") @@ -204,7 +204,11 @@ def test_get_invalid_shard_iterator_boto3(): ) err = exc.value.response["Error"] err["Code"].should.equal("ResourceNotFoundException") - err["Message"].should.equal("Shard 123 under account 123456789012 not found.") + # There is some magic in AWS, that '123' is automatically converted into 'shardId-000000000123' + # AWS itself returns this normalized ID in the error message, not the given id + err["Message"].should.equal( + f"Shard 123 in stream {stream_name} under account {ACCOUNT_ID} does not exist" + ) # Has boto3 equivalent @@ -971,95 +975,6 @@ def test_remove_tags(): tags.get("tag2").should.equal(None) -# Has boto3 equivalent -@mock_kinesis_deprecated -def test_split_shard(): - conn = boto.kinesis.connect_to_region("us-west-2") - stream_name = "my_stream" - - conn.create_stream(stream_name, 2) - - # Create some data - for index in range(1, 100): - conn.put_record(stream_name, str(index), str(index)) - - stream_response = conn.describe_stream(stream_name) - - stream = stream_response["StreamDescription"] - shards = stream["Shards"] - shards.should.have.length_of(2) - - shard_range = shards[0]["HashKeyRange"] - new_starting_hash = ( - int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"]) - ) // 2 - conn.split_shard("my_stream", shards[0]["ShardId"], str(new_starting_hash)) - - stream_response = conn.describe_stream(stream_name) - - stream = stream_response["StreamDescription"] - shards = stream["Shards"] - shards.should.have.length_of(3) - - shard_range = shards[2]["HashKeyRange"] - new_starting_hash = ( - int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"]) - ) // 2 - conn.split_shard("my_stream", shards[2]["ShardId"], str(new_starting_hash)) - - stream_response = conn.describe_stream(stream_name) - - stream = stream_response["StreamDescription"] - shards = stream["Shards"] - shards.should.have.length_of(4) - - -@mock_kinesis -def test_split_shard_boto3(): - client = boto3.client("kinesis", region_name="eu-west-2") - stream_name = "my_stream_summary" - client.create_stream(StreamName=stream_name, ShardCount=2) - - for index in range(1, 100): - client.put_record( - StreamName=stream_name, - Data=f"data_{index}".encode("utf-8"), - PartitionKey=str(index), - ) - - stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] - shards = stream["Shards"] - shards.should.have.length_of(2) - - shard_range = shards[0]["HashKeyRange"] - new_starting_hash = ( - int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"]) - ) // 2 - client.split_shard( - StreamName=stream_name, - ShardToSplit=shards[0]["ShardId"], - NewStartingHashKey=str(new_starting_hash), - ) - - stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] - shards = stream["Shards"] - shards.should.have.length_of(3) - - shard_range = shards[2]["HashKeyRange"] - new_starting_hash = ( - int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"]) - ) // 2 - client.split_shard( - StreamName=stream_name, - ShardToSplit=shards[2]["ShardId"], - NewStartingHashKey=str(new_starting_hash), - ) - - stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] - shards = stream["Shards"] - shards.should.have.length_of(4) - - # Has boto3 equivalent @mock_kinesis_deprecated def test_merge_shards(): diff --git a/tests/test_kinesis/test_kinesis_boto3.py b/tests/test_kinesis/test_kinesis_boto3.py index 8675af5fe..fbc3b65dd 100644 --- a/tests/test_kinesis/test_kinesis_boto3.py +++ b/tests/test_kinesis/test_kinesis_boto3.py @@ -1,6 +1,9 @@ import boto3 +import pytest +from botocore.exceptions import ClientError from moto import mock_kinesis +from moto.core import ACCOUNT_ID import sure # noqa # pylint: disable=unused-import @@ -35,58 +38,6 @@ def test_describe_stream_limit_parameter(): with_filter["HasMoreShards"].should.equal(False) -@mock_kinesis -def test_split_shard(): - conn = boto3.client("kinesis", region_name="us-west-2") - stream_name = "my_stream" - - conn.create_stream(StreamName=stream_name, ShardCount=2) - - # Create some data - for index in range(1, 100): - conn.put_record( - StreamName=stream_name, Data="data:" + str(index), PartitionKey=str(index) - ) - - stream_response = conn.describe_stream(StreamName=stream_name) - - stream = stream_response["StreamDescription"] - shards = stream["Shards"] - shards.should.have.length_of(2) - - shard_range = shards[0]["HashKeyRange"] - new_starting_hash = ( - int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"]) - ) // 2 - conn.split_shard( - StreamName=stream_name, - ShardToSplit=shards[0]["ShardId"], - NewStartingHashKey=str(new_starting_hash), - ) - - stream_response = conn.describe_stream(StreamName=stream_name) - - stream = stream_response["StreamDescription"] - shards = stream["Shards"] - shards.should.have.length_of(3) - - shard_range = shards[2]["HashKeyRange"] - new_starting_hash = ( - int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"]) - ) // 2 - conn.split_shard( - StreamName=stream_name, - ShardToSplit=shards[2]["ShardId"], - NewStartingHashKey=str(new_starting_hash), - ) - - stream_response = conn.describe_stream(StreamName=stream_name) - - stream = stream_response["StreamDescription"] - shards = stream["Shards"] - shards.should.have.length_of(4) - - @mock_kinesis def test_list_shards(): conn = boto3.client("kinesis", region_name="us-west-2") @@ -168,3 +119,181 @@ def test_list_shards_paging(): ["shardId-000000000008", "shardId-000000000009"] ) resp.should_not.have.key("NextToken") + + +@mock_kinesis +def test_create_shard(): + client = boto3.client("kinesis", region_name="us-west-2") + client.create_stream(StreamName="my-stream", ShardCount=2) + + resp = client.describe_stream(StreamName="my-stream") + desc = resp["StreamDescription"] + desc.should.have.key("StreamName").equal("my-stream") + desc.should.have.key("StreamARN").equal( + f"arn:aws:kinesis:us-west-2:{ACCOUNT_ID}:stream/my-stream" + ) + desc.should.have.key("Shards").length_of(2) + desc.should.have.key("StreamStatus").equals("ACTIVE") + desc.should.have.key("HasMoreShards").equals(False) + desc.should.have.key("RetentionPeriodHours").equals(24) + desc.should.have.key("StreamCreationTimestamp") + desc.should.have.key("EnhancedMonitoring").should.equal([{"ShardLevelMetrics": []}]) + desc.should.have.key("EncryptionType").should.equal("NONE") + + shards = desc["Shards"] + shards[0].should.have.key("ShardId").equal("shardId-000000000000") + shards[0].should.have.key("HashKeyRange") + shards[0]["HashKeyRange"].should.have.key("StartingHashKey").equals("0") + shards[0]["HashKeyRange"].should.have.key("EndingHashKey") + shards[0].should.have.key("SequenceNumberRange") + shards[0]["SequenceNumberRange"].should.have.key("StartingSequenceNumber") + shards[0]["SequenceNumberRange"].shouldnt.have.key("EndingSequenceNumber") + + +@mock_kinesis +def test_split_shard_with_invalid_name(): + client = boto3.client("kinesis", region_name="us-west-2") + client.create_stream(StreamName="my-stream", ShardCount=2) + + with pytest.raises(ClientError) as exc: + client.split_shard( + StreamName="my-stream", + ShardToSplit="?", + NewStartingHashKey="170141183460469231731687303715884105728", + ) + err = exc.value.response["Error"] + err["Code"].should.equal("ValidationException") + err["Message"].should.equal( + "1 validation error detected: Value '?' at 'shardToSplit' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+" + ) + + +@mock_kinesis +def test_split_shard_with_unknown_name(): + client = boto3.client("kinesis", region_name="us-west-2") + client.create_stream(StreamName="my-stream", ShardCount=2) + + with pytest.raises(ClientError) as exc: + client.split_shard( + StreamName="my-stream", + ShardToSplit="unknown", + NewStartingHashKey="170141183460469231731687303715884105728", + ) + err = exc.value.response["Error"] + err["Code"].should.equal("ResourceNotFoundException") + err["Message"].should.equal( + "Could not find shard unknown in stream my-stream under account 123456789012." + ) + + +@mock_kinesis +def test_split_shard_invalid_hashkey(): + client = boto3.client("kinesis", region_name="us-west-2") + client.create_stream(StreamName="my-stream", ShardCount=2) + + with pytest.raises(ClientError) as exc: + client.split_shard( + StreamName="my-stream", + ShardToSplit="shardId-000000000001", + NewStartingHashKey="sth", + ) + err = exc.value.response["Error"] + err["Code"].should.equal("ValidationException") + err["Message"].should.equal( + "1 validation error detected: Value 'sth' at 'newStartingHashKey' failed to satisfy constraint: Member must satisfy regular expression pattern: 0|([1-9]\\d{0,38})" + ) + + +@mock_kinesis +def test_split_shard_hashkey_out_of_bounds(): + client = boto3.client("kinesis", region_name="us-west-2") + client.create_stream(StreamName="my-stream", ShardCount=2) + + with pytest.raises(ClientError) as exc: + client.split_shard( + StreamName="my-stream", + ShardToSplit="shardId-000000000001", + NewStartingHashKey="170141183460469231731687303715884000000", + ) + err = exc.value.response["Error"] + err["Code"].should.equal("InvalidArgumentException") + err["Message"].should.equal( + f"NewStartingHashKey 170141183460469231731687303715884000000 used in SplitShard() on shard shardId-000000000001 in stream my-stream under account {ACCOUNT_ID} is not both greater than one plus the shard's StartingHashKey 170141183460469231731687303715884105728 and less than the shard's EndingHashKey 340282366920938463463374607431768211455." + ) + + +@mock_kinesis +def test_split_shard(): + client = boto3.client("kinesis", region_name="us-west-2") + stream_name = "my-stream" + client.create_stream(StreamName=stream_name, ShardCount=2) + + for index in range(1, 100): + client.put_record( + StreamName=stream_name, + Data=f"data_{index}".encode("utf-8"), + PartitionKey=str(index), + ) + + original_shards = client.describe_stream(StreamName=stream_name)[ + "StreamDescription" + ]["Shards"] + + client.split_shard( + StreamName=stream_name, + ShardToSplit="shardId-000000000001", + NewStartingHashKey="170141183460469231731687303715884105829", + ) + + resp = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shards = resp["Shards"] + shards.should.have.length_of(4) + shards[0].should.have.key("ShardId").equals("shardId-000000000000") + shards[0].should.have.key("HashKeyRange") + shards[0].shouldnt.have.key("ParentShardId") + + shards[1].should.have.key("ShardId").equals("shardId-000000000001") + shards[1].shouldnt.have.key("ParentShardId") + shards[1].should.have.key("HashKeyRange") + shards[1]["HashKeyRange"].should.have.key("StartingHashKey").equals( + original_shards[1]["HashKeyRange"]["StartingHashKey"] + ) + shards[1]["HashKeyRange"].should.have.key("EndingHashKey").equals( + original_shards[1]["HashKeyRange"]["EndingHashKey"] + ) + shards[1]["SequenceNumberRange"].should.have.key("StartingSequenceNumber") + shards[1]["SequenceNumberRange"].should.have.key("EndingSequenceNumber") + + shards[2].should.have.key("ShardId").equals("shardId-000000000002") + shards[2].should.have.key("ParentShardId").equals(shards[1]["ShardId"]) + shards[2]["SequenceNumberRange"].should.have.key("StartingSequenceNumber") + shards[2]["SequenceNumberRange"].shouldnt.have.key("EndingSequenceNumber") + + shards[3].should.have.key("ShardId").equals("shardId-000000000003") + shards[3].should.have.key("ParentShardId").equals(shards[1]["ShardId"]) + shards[3]["SequenceNumberRange"].should.have.key("StartingSequenceNumber") + shards[3]["SequenceNumberRange"].shouldnt.have.key("EndingSequenceNumber") + + +@mock_kinesis +def test_split_shard_that_was_split_before(): + client = boto3.client("kinesis", region_name="us-west-2") + client.create_stream(StreamName="my-stream", ShardCount=2) + + client.split_shard( + StreamName="my-stream", + ShardToSplit="shardId-000000000001", + NewStartingHashKey="170141183460469231731687303715884105829", + ) + + with pytest.raises(ClientError) as exc: + client.split_shard( + StreamName="my-stream", + ShardToSplit="shardId-000000000001", + NewStartingHashKey="170141183460469231731687303715884105829", + ) + err = exc.value.response["Error"] + err["Code"].should.equal("InvalidArgumentException") + err["Message"].should.equal( + f"Shard shardId-000000000001 in stream my-stream under account {ACCOUNT_ID} has already been merged or split, and thus is not eligible for merging or splitting." + )