From f61ffc81ec59d7f7d64b9853d7d517b7e69cd14b Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Thu, 8 Sep 2022 21:51:55 +0000 Subject: [PATCH] Kinesis:update_stream_mode() (#5456) --- IMPLEMENTATION_COVERAGE.md | 4 +- docs/docs/services/kinesis.rst | 2 +- moto/kinesis/exceptions.py | 33 ++++++++++++ moto/kinesis/models.py | 21 +++++++- moto/kinesis/responses.py | 6 +++ .../terraform-tests.success.txt | 6 ++- tests/test_kinesis/test_kinesis.py | 19 +++++++ .../test_kinesis_stream_limits.py | 52 +++++++++++++++++++ 8 files changed, 137 insertions(+), 6 deletions(-) create mode 100644 tests/test_kinesis/test_kinesis_stream_limits.py diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index f2375f377..9a72c712c 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -3638,7 +3638,7 @@ ## kinesis
-89% implemented +93% implemented - [X] add_tags_to_stream - [X] create_stream @@ -3668,7 +3668,7 @@ - [X] stop_stream_encryption - [ ] subscribe_to_shard - [X] update_shard_count -- [ ] update_stream_mode +- [X] update_stream_mode
## kinesis-video-archived-media diff --git a/docs/docs/services/kinesis.rst b/docs/docs/services/kinesis.rst index 4905e4169..bf9dfa600 100644 --- a/docs/docs/services/kinesis.rst +++ b/docs/docs/services/kinesis.rst @@ -57,5 +57,5 @@ kinesis - [X] stop_stream_encryption - [ ] subscribe_to_shard - [X] update_shard_count -- [ ] update_stream_mode +- [X] update_stream_mode diff --git a/moto/kinesis/exceptions.py b/moto/kinesis/exceptions.py index 93822318a..d237b4fb6 100644 --- a/moto/kinesis/exceptions.py +++ b/moto/kinesis/exceptions.py @@ -82,3 +82,36 @@ class ValidationException(BadRequest): "__type": "ValidationException", } ) + + +class RecordSizeExceedsLimit(BadRequest): + def __init__(self, position): + super().__init__() + self.description = json.dumps( + { + "message": f"1 validation error detected: Value at 'records.{position}.member.data' failed to satisfy constraint: Member must have length less than or equal to 1048576", + "__type": "ValidationException", + } + ) + + +class TotalRecordsSizeExceedsLimit(BadRequest): + def __init__(self): + super().__init__() + self.description = json.dumps( + { + "message": "Records size exceeds 5 MB limit", + "__type": "InvalidArgumentException", + } + ) + + +class TooManyRecords(BadRequest): + def __init__(self): + super().__init__() + self.description = json.dumps( + { + "message": "1 validation error detected: Value at 'records' failed to satisfy constraint: Member must have length less than or equal to 500", + "__type": "ValidationException", + } + ) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index c6e206366..f51a31267 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -21,6 +21,9 @@ from .exceptions import ( InvalidDecreaseRetention, InvalidIncreaseRetention, ValidationException, + RecordSizeExceedsLimit, + TotalRecordsSizeExceedsLimit, + TooManyRecords, ) from .utils import ( compose_shard_iterator, @@ -411,6 +414,7 @@ class Stream(CloudFormationModel): "EnhancedMonitoring": [{"ShardLevelMetrics": self.shard_level_metrics}], "OpenShardCount": self.shard_count, "EncryptionType": self.encryption_type, + "KeyId": self.key_id, } } @@ -542,7 +546,7 @@ class KinesisBackend(BaseBackend): self.streams[stream_name] = stream return stream - def describe_stream(self, stream_name): + def describe_stream(self, stream_name) -> Stream: if stream_name in self.streams: return self.streams[stream_name] else: @@ -622,6 +626,17 @@ class KinesisBackend(BaseBackend): response = {"FailedRecordCount": 0, "Records": []} + if len(records) > 500: + raise TooManyRecords + data_sizes = [len(r.get("Data", "")) for r in records] + if sum(data_sizes) >= 5000000: + raise TotalRecordsSizeExceedsLimit + idx_over_limit = next( + (idx for idx, x in enumerate(data_sizes) if x >= 1048576), None + ) + if idx_over_limit is not None: + raise RecordSizeExceedsLimit(position=idx_over_limit + 1) + for record in records: partition_key = record.get("PartitionKey") explicit_hash_key = record.get("ExplicitHashKey") @@ -821,5 +836,9 @@ class KinesisBackend(BaseBackend): stream.encryption_type = "NONE" stream.key_id = None + def update_stream_mode(self, stream_arn, stream_mode): + stream = self._find_stream_by_arn(stream_arn) + stream.stream_mode = stream_mode + kinesis_backends = BackendDict(KinesisBackend, "kinesis") diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index 69f00204f..59b09c0ce 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -279,3 +279,9 @@ class KinesisResponse(BaseResponse): stream_name = self.parameters.get("StreamName") self.kinesis_backend.stop_stream_encryption(stream_name=stream_name) return json.dumps(dict()) + + def update_stream_mode(self): + stream_arn = self.parameters.get("StreamARN") + stream_mode = self.parameters.get("StreamModeDetails") + self.kinesis_backend.update_stream_mode(stream_arn, stream_mode) + return "{}" diff --git a/tests/terraformtests/terraform-tests.success.txt b/tests/terraformtests/terraform-tests.success.txt index d5875c21a..475961cda 100644 --- a/tests/terraformtests/terraform-tests.success.txt +++ b/tests/terraformtests/terraform-tests.success.txt @@ -135,8 +135,10 @@ iam: iot: - TestAccIoTEndpointDataSource kinesis: - - TestAccKinesisStream_basic - - TestAccKinesisStream_disappear + - TestAccKinesisStreamConsumerDataSource_ + - TestAccKinesisStreamConsumer_ + - TestAccKinesisStreamDataSource_ + - TestAccKinesisStream_ kms: - TestAccKMSAlias - TestAccKMSGrant_arn diff --git a/tests/test_kinesis/test_kinesis.py b/tests/test_kinesis/test_kinesis.py index aafd2685c..38b291636 100644 --- a/tests/test_kinesis/test_kinesis.py +++ b/tests/test_kinesis/test_kinesis.py @@ -36,6 +36,25 @@ def test_stream_creation_on_demand(): ) +@mock_kinesis +def test_update_stream_mode(): + client = boto3.client("kinesis", region_name="eu-west-1") + resp = client.create_stream( + StreamName="my_stream", StreamModeDetails={"StreamMode": "ON_DEMAND"} + ) + arn = client.describe_stream(StreamName="my_stream")["StreamDescription"][ + "StreamARN" + ] + + client.update_stream_mode( + StreamARN=arn, StreamModeDetails={"StreamMode": "PROVISIONED"} + ) + + resp = client.describe_stream_summary(StreamName="my_stream") + stream = resp["StreamDescriptionSummary"] + stream.should.have.key("StreamModeDetails").equals({"StreamMode": "PROVISIONED"}) + + @mock_kinesis def test_describe_non_existent_stream_boto3(): client = boto3.client("kinesis", region_name="us-west-2") diff --git a/tests/test_kinesis/test_kinesis_stream_limits.py b/tests/test_kinesis/test_kinesis_stream_limits.py new file mode 100644 index 000000000..5c5fa3a4d --- /dev/null +++ b/tests/test_kinesis/test_kinesis_stream_limits.py @@ -0,0 +1,52 @@ +import boto3 +import pytest +import sure # noqa # pylint: disable=unused-import + +from botocore.exceptions import ClientError +from moto import mock_kinesis + + +@mock_kinesis +def test_record_data_exceeds_1mb(): + client = boto3.client("kinesis", region_name="us-east-1") + client.create_stream(StreamName="my_stream", ShardCount=1) + with pytest.raises(ClientError) as exc: + client.put_records( + Records=[{"Data": b"a" * (2**20 + 1), "PartitionKey": "key"}], + StreamName="my_stream", + ) + err = exc.value.response["Error"] + err["Code"].should.equal("ValidationException") + err["Message"].should.equal( + "1 validation error detected: Value at 'records.1.member.data' failed to satisfy constraint: Member must have length less than or equal to 1048576" + ) + + +@mock_kinesis +def test_total_record_data_exceeds_5mb(): + client = boto3.client("kinesis", region_name="us-east-1") + client.create_stream(StreamName="my_stream", ShardCount=1) + with pytest.raises(ClientError) as exc: + client.put_records( + Records=[{"Data": b"a" * 2**20, "PartitionKey": "key"}] * 5, + StreamName="my_stream", + ) + err = exc.value.response["Error"] + err["Code"].should.equal("InvalidArgumentException") + err["Message"].should.equal("Records size exceeds 5 MB limit") + + +@mock_kinesis +def test_too_many_records(): + client = boto3.client("kinesis", region_name="us-east-1") + client.create_stream(StreamName="my_stream", ShardCount=1) + with pytest.raises(ClientError) as exc: + client.put_records( + Records=[{"Data": b"a", "PartitionKey": "key"}] * 501, + StreamName="my_stream", + ) + err = exc.value.response["Error"] + err["Code"].should.equal("ValidationException") + err["Message"].should.equal( + "1 validation error detected: Value at 'records' failed to satisfy constraint: Member must have length less than or equal to 500" + )