Kinesis - DescribeStream(): Implement Filter-param (#4391)

This commit is contained in:
Bert Blommers 2021-10-11 20:33:32 +00:00 committed by GitHub
parent d9830c0766
commit 03083ede42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 37 additions and 4 deletions

View File

@ -209,15 +209,17 @@ class Stream(CloudFormationModel):
sequence_number = shard.put_record(partition_key, data, explicit_hash_key) sequence_number = shard.put_record(partition_key, data, explicit_hash_key)
return sequence_number, shard.shard_id return sequence_number, shard.shard_id
def to_json(self): def to_json(self, shard_limit=None):
all_shards = list(self.shards.values())
requested_shards = all_shards[0 : shard_limit or len(all_shards)]
return { return {
"StreamDescription": { "StreamDescription": {
"StreamARN": self.arn, "StreamARN": self.arn,
"StreamName": self.stream_name, "StreamName": self.stream_name,
"StreamStatus": self.status, "StreamStatus": self.status,
"HasMoreShards": False, "HasMoreShards": len(requested_shards) != len(all_shards),
"RetentionPeriodHours": self.retention_period_hours, "RetentionPeriodHours": self.retention_period_hours,
"Shards": [shard.to_json() for shard in self.shards.values()], "Shards": [shard.to_json() for shard in requested_shards],
} }
} }

View File

@ -26,8 +26,9 @@ class KinesisResponse(BaseResponse):
def describe_stream(self): def describe_stream(self):
stream_name = self.parameters.get("StreamName") stream_name = self.parameters.get("StreamName")
limit = self.parameters.get("Limit")
stream = self.kinesis_backend.describe_stream(stream_name) stream = self.kinesis_backend.describe_stream(stream_name)
return json.dumps(stream.to_json()) return json.dumps(stream.to_json(shard_limit=limit))
def describe_stream_summary(self): def describe_stream_summary(self):
stream_name = self.parameters.get("StreamName") stream_name = self.parameters.get("StreamName")

View File

@ -5,6 +5,36 @@ from moto import mock_kinesis
import sure # noqa import sure # noqa
@mock_kinesis
def test_describe_stream_limit_parameter():
client = boto3.client("kinesis", region_name="us-west-2")
stream_name = "my_stream"
client.create_stream(StreamName=stream_name, ShardCount=5)
without_filter = client.describe_stream(StreamName=stream_name)["StreamDescription"]
without_filter["Shards"].should.have.length_of(5)
without_filter["HasMoreShards"].should.equal(False)
with_filter = client.describe_stream(StreamName=stream_name, Limit=2)[
"StreamDescription"
]
with_filter["Shards"].should.have.length_of(2)
with_filter["HasMoreShards"].should.equal(True)
with_filter = client.describe_stream(StreamName=stream_name, Limit=5)[
"StreamDescription"
]
with_filter["Shards"].should.have.length_of(5)
with_filter["HasMoreShards"].should.equal(False)
with_filter = client.describe_stream(StreamName=stream_name, Limit=6)[
"StreamDescription"
]
with_filter["Shards"].should.have.length_of(5)
with_filter["HasMoreShards"].should.equal(False)
@mock_kinesis @mock_kinesis
def test_split_shard(): def test_split_shard():
conn = boto3.client("kinesis", region_name="us-west-2") conn = boto3.client("kinesis", region_name="us-west-2")