Kinesis: calculating size of record using decoded data and partition key (#6231)
This commit is contained in:
parent
1f56b75ccf
commit
fe7b15c9f9
@ -1,4 +1,4 @@
|
|||||||
from base64 import b64encode
|
from base64 import b64encode, b64decode
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from gzip import GzipFile
|
from gzip import GzipFile
|
||||||
import datetime
|
import datetime
|
||||||
@ -693,11 +693,15 @@ class KinesisBackend(BaseBackend):
|
|||||||
|
|
||||||
if len(records) > 500:
|
if len(records) > 500:
|
||||||
raise TooManyRecords
|
raise TooManyRecords
|
||||||
data_sizes = [len(r.get("Data", "")) for r in records]
|
|
||||||
if sum(data_sizes) >= 5000000:
|
data_sizes = [
|
||||||
|
len(b64decode(r.get("Data", ""))) + len(r.get("PartitionKey", ""))
|
||||||
|
for r in records
|
||||||
|
]
|
||||||
|
if sum(data_sizes) > 5242880:
|
||||||
raise TotalRecordsSizeExceedsLimit
|
raise TotalRecordsSizeExceedsLimit
|
||||||
idx_over_limit = next(
|
idx_over_limit = next(
|
||||||
(idx for idx, x in enumerate(data_sizes) if x >= 1048576), None
|
(idx for idx, x in enumerate(data_sizes) if x > 1048576), None
|
||||||
)
|
)
|
||||||
if idx_over_limit is not None:
|
if idx_over_limit is not None:
|
||||||
raise RecordSizeExceedsLimit(position=idx_over_limit + 1)
|
raise RecordSizeExceedsLimit(position=idx_over_limit + 1)
|
||||||
|
@ -6,13 +6,16 @@ from botocore.exceptions import ClientError
|
|||||||
from moto import mock_kinesis
|
from moto import mock_kinesis
|
||||||
|
|
||||||
|
|
||||||
|
ONE_MB = 2**20
|
||||||
|
|
||||||
|
|
||||||
@mock_kinesis
|
@mock_kinesis
|
||||||
def test_record_data_exceeds_1mb():
|
def test_record_data_exceeds_1mb():
|
||||||
client = boto3.client("kinesis", region_name="us-east-1")
|
client = boto3.client("kinesis", region_name="us-east-1")
|
||||||
client.create_stream(StreamName="my_stream", ShardCount=1)
|
client.create_stream(StreamName="my_stream", ShardCount=1)
|
||||||
with pytest.raises(ClientError) as exc:
|
with pytest.raises(ClientError) as exc:
|
||||||
client.put_records(
|
client.put_records(
|
||||||
Records=[{"Data": b"a" * (2**20 + 1), "PartitionKey": "key"}],
|
Records=[{"Data": b"a" * (ONE_MB + 1), "PartitionKey": "key"}],
|
||||||
StreamName="my_stream",
|
StreamName="my_stream",
|
||||||
)
|
)
|
||||||
err = exc.value.response["Error"]
|
err = exc.value.response["Error"]
|
||||||
@ -22,6 +25,55 @@ def test_record_data_exceeds_1mb():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_record_data_and_partition_key_exceeds_1mb():
|
||||||
|
client = boto3.client("kinesis", region_name="us-east-1")
|
||||||
|
client.create_stream(StreamName="my_stream", ShardCount=1)
|
||||||
|
|
||||||
|
key = "key"
|
||||||
|
key_size = len(key)
|
||||||
|
data_size = ONE_MB - key_size + 1
|
||||||
|
with pytest.raises(ClientError) as exc:
|
||||||
|
client.put_records(
|
||||||
|
Records=[{"Data": b"a" * data_size, "PartitionKey": key}],
|
||||||
|
StreamName="my_stream",
|
||||||
|
)
|
||||||
|
err = exc.value.response["Error"]
|
||||||
|
err["Code"].should.equal("ValidationException")
|
||||||
|
err["Message"].should.equal(
|
||||||
|
"1 validation error detected: Value at 'records.1.member.data' failed to satisfy constraint: Member must have length less than or equal to 1048576"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_record_data_and_partition_key_exactly_1mb():
|
||||||
|
client = boto3.client("kinesis", region_name="us-east-1")
|
||||||
|
client.create_stream(StreamName="my_stream", ShardCount=1)
|
||||||
|
|
||||||
|
key = "key"
|
||||||
|
key_size = len(key)
|
||||||
|
data_size = ONE_MB - key_size
|
||||||
|
|
||||||
|
client.put_records(
|
||||||
|
Records=[{"Data": b"a" * data_size, "PartitionKey": key}],
|
||||||
|
StreamName="my_stream",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_record_data_and_partition_key_smaller_than_1mb():
|
||||||
|
client = boto3.client("kinesis", region_name="us-east-1")
|
||||||
|
client.create_stream(StreamName="my_stream", ShardCount=1)
|
||||||
|
|
||||||
|
key = "key"
|
||||||
|
key_size = len(key)
|
||||||
|
data_size = ONE_MB - key_size - 1
|
||||||
|
client.put_records(
|
||||||
|
Records=[{"Data": b"a" * data_size, "PartitionKey": key}],
|
||||||
|
StreamName="my_stream",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@mock_kinesis
|
@mock_kinesis
|
||||||
def test_total_record_data_exceeds_5mb():
|
def test_total_record_data_exceeds_5mb():
|
||||||
client = boto3.client("kinesis", region_name="us-east-1")
|
client = boto3.client("kinesis", region_name="us-east-1")
|
||||||
@ -36,10 +88,24 @@ def test_total_record_data_exceeds_5mb():
|
|||||||
err["Message"].should.equal("Records size exceeds 5 MB limit")
|
err["Message"].should.equal("Records size exceeds 5 MB limit")
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_total_record_data_exact_5mb():
|
||||||
|
client = boto3.client("kinesis", region_name="us-east-1")
|
||||||
|
client.create_stream(StreamName="my_stream", ShardCount=1)
|
||||||
|
key = "key"
|
||||||
|
key_size = len(key)
|
||||||
|
data_size = ONE_MB - key_size
|
||||||
|
client.put_records(
|
||||||
|
Records=[{"Data": b"a" * (data_size), "PartitionKey": key}] * 5,
|
||||||
|
StreamName="my_stream",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@mock_kinesis
|
@mock_kinesis
|
||||||
def test_too_many_records():
|
def test_too_many_records():
|
||||||
client = boto3.client("kinesis", region_name="us-east-1")
|
client = boto3.client("kinesis", region_name="us-east-1")
|
||||||
client.create_stream(StreamName="my_stream", ShardCount=1)
|
client.create_stream(StreamName="my_stream", ShardCount=1)
|
||||||
|
|
||||||
with pytest.raises(ClientError) as exc:
|
with pytest.raises(ClientError) as exc:
|
||||||
client.put_records(
|
client.put_records(
|
||||||
Records=[{"Data": b"a", "PartitionKey": "key"}] * 501,
|
Records=[{"Data": b"a", "PartitionKey": "key"}] * 501,
|
||||||
|
Loading…
Reference in New Issue
Block a user