Kinesis:update_stream_mode() (#5456)
This commit is contained in:
parent
dbef197de8
commit
f61ffc81ec
@ -3638,7 +3638,7 @@
|
||||
|
||||
## kinesis
|
||||
<details>
|
||||
<summary>89% implemented</summary>
|
||||
<summary>93% implemented</summary>
|
||||
|
||||
- [X] add_tags_to_stream
|
||||
- [X] create_stream
|
||||
@ -3668,7 +3668,7 @@
|
||||
- [X] stop_stream_encryption
|
||||
- [ ] subscribe_to_shard
|
||||
- [X] update_shard_count
|
||||
- [ ] update_stream_mode
|
||||
- [X] update_stream_mode
|
||||
</details>
|
||||
|
||||
## kinesis-video-archived-media
|
||||
|
@ -57,5 +57,5 @@ kinesis
|
||||
- [X] stop_stream_encryption
|
||||
- [ ] subscribe_to_shard
|
||||
- [X] update_shard_count
|
||||
- [ ] update_stream_mode
|
||||
- [X] update_stream_mode
|
||||
|
||||
|
@ -82,3 +82,36 @@ class ValidationException(BadRequest):
|
||||
"__type": "ValidationException",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class RecordSizeExceedsLimit(BadRequest):
|
||||
def __init__(self, position):
|
||||
super().__init__()
|
||||
self.description = json.dumps(
|
||||
{
|
||||
"message": f"1 validation error detected: Value at 'records.{position}.member.data' failed to satisfy constraint: Member must have length less than or equal to 1048576",
|
||||
"__type": "ValidationException",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class TotalRecordsSizeExceedsLimit(BadRequest):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.description = json.dumps(
|
||||
{
|
||||
"message": "Records size exceeds 5 MB limit",
|
||||
"__type": "InvalidArgumentException",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class TooManyRecords(BadRequest):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.description = json.dumps(
|
||||
{
|
||||
"message": "1 validation error detected: Value at 'records' failed to satisfy constraint: Member must have length less than or equal to 500",
|
||||
"__type": "ValidationException",
|
||||
}
|
||||
)
|
||||
|
@ -21,6 +21,9 @@ from .exceptions import (
|
||||
InvalidDecreaseRetention,
|
||||
InvalidIncreaseRetention,
|
||||
ValidationException,
|
||||
RecordSizeExceedsLimit,
|
||||
TotalRecordsSizeExceedsLimit,
|
||||
TooManyRecords,
|
||||
)
|
||||
from .utils import (
|
||||
compose_shard_iterator,
|
||||
@ -411,6 +414,7 @@ class Stream(CloudFormationModel):
|
||||
"EnhancedMonitoring": [{"ShardLevelMetrics": self.shard_level_metrics}],
|
||||
"OpenShardCount": self.shard_count,
|
||||
"EncryptionType": self.encryption_type,
|
||||
"KeyId": self.key_id,
|
||||
}
|
||||
}
|
||||
|
||||
@ -542,7 +546,7 @@ class KinesisBackend(BaseBackend):
|
||||
self.streams[stream_name] = stream
|
||||
return stream
|
||||
|
||||
def describe_stream(self, stream_name):
|
||||
def describe_stream(self, stream_name) -> Stream:
|
||||
if stream_name in self.streams:
|
||||
return self.streams[stream_name]
|
||||
else:
|
||||
@ -622,6 +626,17 @@ class KinesisBackend(BaseBackend):
|
||||
|
||||
response = {"FailedRecordCount": 0, "Records": []}
|
||||
|
||||
if len(records) > 500:
|
||||
raise TooManyRecords
|
||||
data_sizes = [len(r.get("Data", "")) for r in records]
|
||||
if sum(data_sizes) >= 5000000:
|
||||
raise TotalRecordsSizeExceedsLimit
|
||||
idx_over_limit = next(
|
||||
(idx for idx, x in enumerate(data_sizes) if x >= 1048576), None
|
||||
)
|
||||
if idx_over_limit is not None:
|
||||
raise RecordSizeExceedsLimit(position=idx_over_limit + 1)
|
||||
|
||||
for record in records:
|
||||
partition_key = record.get("PartitionKey")
|
||||
explicit_hash_key = record.get("ExplicitHashKey")
|
||||
@ -821,5 +836,9 @@ class KinesisBackend(BaseBackend):
|
||||
stream.encryption_type = "NONE"
|
||||
stream.key_id = None
|
||||
|
||||
def update_stream_mode(self, stream_arn, stream_mode):
|
||||
stream = self._find_stream_by_arn(stream_arn)
|
||||
stream.stream_mode = stream_mode
|
||||
|
||||
|
||||
kinesis_backends = BackendDict(KinesisBackend, "kinesis")
|
||||
|
@ -279,3 +279,9 @@ class KinesisResponse(BaseResponse):
|
||||
stream_name = self.parameters.get("StreamName")
|
||||
self.kinesis_backend.stop_stream_encryption(stream_name=stream_name)
|
||||
return json.dumps(dict())
|
||||
|
||||
def update_stream_mode(self):
|
||||
stream_arn = self.parameters.get("StreamARN")
|
||||
stream_mode = self.parameters.get("StreamModeDetails")
|
||||
self.kinesis_backend.update_stream_mode(stream_arn, stream_mode)
|
||||
return "{}"
|
||||
|
@ -135,8 +135,10 @@ iam:
|
||||
iot:
|
||||
- TestAccIoTEndpointDataSource
|
||||
kinesis:
|
||||
- TestAccKinesisStream_basic
|
||||
- TestAccKinesisStream_disappear
|
||||
- TestAccKinesisStreamConsumerDataSource_
|
||||
- TestAccKinesisStreamConsumer_
|
||||
- TestAccKinesisStreamDataSource_
|
||||
- TestAccKinesisStream_
|
||||
kms:
|
||||
- TestAccKMSAlias
|
||||
- TestAccKMSGrant_arn
|
||||
|
@ -36,6 +36,25 @@ def test_stream_creation_on_demand():
|
||||
)
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
def test_update_stream_mode():
|
||||
client = boto3.client("kinesis", region_name="eu-west-1")
|
||||
resp = client.create_stream(
|
||||
StreamName="my_stream", StreamModeDetails={"StreamMode": "ON_DEMAND"}
|
||||
)
|
||||
arn = client.describe_stream(StreamName="my_stream")["StreamDescription"][
|
||||
"StreamARN"
|
||||
]
|
||||
|
||||
client.update_stream_mode(
|
||||
StreamARN=arn, StreamModeDetails={"StreamMode": "PROVISIONED"}
|
||||
)
|
||||
|
||||
resp = client.describe_stream_summary(StreamName="my_stream")
|
||||
stream = resp["StreamDescriptionSummary"]
|
||||
stream.should.have.key("StreamModeDetails").equals({"StreamMode": "PROVISIONED"})
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
def test_describe_non_existent_stream_boto3():
|
||||
client = boto3.client("kinesis", region_name="us-west-2")
|
||||
|
52
tests/test_kinesis/test_kinesis_stream_limits.py
Normal file
52
tests/test_kinesis/test_kinesis_stream_limits.py
Normal file
@ -0,0 +1,52 @@
|
||||
import boto3
|
||||
import pytest
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
from moto import mock_kinesis
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
def test_record_data_exceeds_1mb():
|
||||
client = boto3.client("kinesis", region_name="us-east-1")
|
||||
client.create_stream(StreamName="my_stream", ShardCount=1)
|
||||
with pytest.raises(ClientError) as exc:
|
||||
client.put_records(
|
||||
Records=[{"Data": b"a" * (2**20 + 1), "PartitionKey": "key"}],
|
||||
StreamName="my_stream",
|
||||
)
|
||||
err = exc.value.response["Error"]
|
||||
err["Code"].should.equal("ValidationException")
|
||||
err["Message"].should.equal(
|
||||
"1 validation error detected: Value at 'records.1.member.data' failed to satisfy constraint: Member must have length less than or equal to 1048576"
|
||||
)
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
def test_total_record_data_exceeds_5mb():
|
||||
client = boto3.client("kinesis", region_name="us-east-1")
|
||||
client.create_stream(StreamName="my_stream", ShardCount=1)
|
||||
with pytest.raises(ClientError) as exc:
|
||||
client.put_records(
|
||||
Records=[{"Data": b"a" * 2**20, "PartitionKey": "key"}] * 5,
|
||||
StreamName="my_stream",
|
||||
)
|
||||
err = exc.value.response["Error"]
|
||||
err["Code"].should.equal("InvalidArgumentException")
|
||||
err["Message"].should.equal("Records size exceeds 5 MB limit")
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
def test_too_many_records():
|
||||
client = boto3.client("kinesis", region_name="us-east-1")
|
||||
client.create_stream(StreamName="my_stream", ShardCount=1)
|
||||
with pytest.raises(ClientError) as exc:
|
||||
client.put_records(
|
||||
Records=[{"Data": b"a", "PartitionKey": "key"}] * 501,
|
||||
StreamName="my_stream",
|
||||
)
|
||||
err = exc.value.response["Error"]
|
||||
err["Code"].should.equal("ValidationException")
|
||||
err["Message"].should.equal(
|
||||
"1 validation error detected: Value at 'records' failed to satisfy constraint: Member must have length less than or equal to 500"
|
||||
)
|
Loading…
Reference in New Issue
Block a user