From fe7b15c9f9c84fc2af3bc61c8e3cb81fe6507aa5 Mon Sep 17 00:00:00 2001 From: Lucas Silva Chaves Date: Mon, 24 Apr 2023 16:11:36 -0300 Subject: [PATCH] Kinesis: calculating size of record using decoded data and partition key (#6231) --- moto/kinesis/models.py | 12 ++-- .../test_kinesis_stream_limits.py | 68 ++++++++++++++++++- 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index 2a060e6b1..0d6e7463b 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -1,4 +1,4 @@ -from base64 import b64encode +from base64 import b64encode, b64decode from collections import OrderedDict from gzip import GzipFile import datetime @@ -693,11 +693,15 @@ class KinesisBackend(BaseBackend): if len(records) > 500: raise TooManyRecords - data_sizes = [len(r.get("Data", "")) for r in records] - if sum(data_sizes) >= 5000000: + + data_sizes = [ + len(b64decode(r.get("Data", ""))) + len(r.get("PartitionKey", "")) + for r in records + ] + if sum(data_sizes) > 5242880: raise TotalRecordsSizeExceedsLimit idx_over_limit = next( - (idx for idx, x in enumerate(data_sizes) if x >= 1048576), None + (idx for idx, x in enumerate(data_sizes) if x > 1048576), None ) if idx_over_limit is not None: raise RecordSizeExceedsLimit(position=idx_over_limit + 1) diff --git a/tests/test_kinesis/test_kinesis_stream_limits.py b/tests/test_kinesis/test_kinesis_stream_limits.py index 5c5fa3a4d..3779be7f4 100644 --- a/tests/test_kinesis/test_kinesis_stream_limits.py +++ b/tests/test_kinesis/test_kinesis_stream_limits.py @@ -6,13 +6,16 @@ from botocore.exceptions import ClientError from moto import mock_kinesis +ONE_MB = 2**20 + + @mock_kinesis def test_record_data_exceeds_1mb(): client = boto3.client("kinesis", region_name="us-east-1") client.create_stream(StreamName="my_stream", ShardCount=1) with pytest.raises(ClientError) as exc: client.put_records( - Records=[{"Data": b"a" * (2**20 + 1), "PartitionKey": "key"}], + Records=[{"Data": b"a" * (ONE_MB + 1), "PartitionKey": "key"}], StreamName="my_stream", ) err = exc.value.response["Error"] @@ -22,6 +25,55 @@ def test_record_data_exceeds_1mb(): ) +@mock_kinesis +def test_record_data_and_partition_key_exceeds_1mb(): + client = boto3.client("kinesis", region_name="us-east-1") + client.create_stream(StreamName="my_stream", ShardCount=1) + + key = "key" + key_size = len(key) + data_size = ONE_MB - key_size + 1 + with pytest.raises(ClientError) as exc: + client.put_records( + Records=[{"Data": b"a" * data_size, "PartitionKey": key}], + StreamName="my_stream", + ) + err = exc.value.response["Error"] + err["Code"].should.equal("ValidationException") + err["Message"].should.equal( + "1 validation error detected: Value at 'records.1.member.data' failed to satisfy constraint: Member must have length less than or equal to 1048576" + ) + + +@mock_kinesis +def test_record_data_and_partition_key_exactly_1mb(): + client = boto3.client("kinesis", region_name="us-east-1") + client.create_stream(StreamName="my_stream", ShardCount=1) + + key = "key" + key_size = len(key) + data_size = ONE_MB - key_size + + client.put_records( + Records=[{"Data": b"a" * data_size, "PartitionKey": key}], + StreamName="my_stream", + ) + + +@mock_kinesis +def test_record_data_and_partition_key_smaller_than_1mb(): + client = boto3.client("kinesis", region_name="us-east-1") + client.create_stream(StreamName="my_stream", ShardCount=1) + + key = "key" + key_size = len(key) + data_size = ONE_MB - key_size - 1 + client.put_records( + Records=[{"Data": b"a" * data_size, "PartitionKey": key}], + StreamName="my_stream", + ) + + @mock_kinesis def test_total_record_data_exceeds_5mb(): client = boto3.client("kinesis", region_name="us-east-1") @@ -36,10 +88,24 @@ def test_total_record_data_exceeds_5mb(): err["Message"].should.equal("Records size exceeds 5 MB limit") +@mock_kinesis +def test_total_record_data_exact_5mb(): + client = boto3.client("kinesis", region_name="us-east-1") + client.create_stream(StreamName="my_stream", ShardCount=1) + key = "key" + key_size = len(key) + data_size = ONE_MB - key_size + client.put_records( + Records=[{"Data": b"a" * (data_size), "PartitionKey": key}] * 5, + StreamName="my_stream", + ) + + @mock_kinesis def test_too_many_records(): client = boto3.client("kinesis", region_name="us-east-1") client.create_stream(StreamName="my_stream", ShardCount=1) + with pytest.raises(ClientError) as exc: client.put_records( Records=[{"Data": b"a", "PartitionKey": "key"}] * 501,