diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index c38d9c3a4..fe34258ee 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -2519,14 +2519,14 @@ - [ ] start_next_pending_job_execution - [ ] update_job_execution -## kinesis - 56% implemented +## kinesis - 61% implemented - [X] add_tags_to_stream - [X] create_stream - [ ] decrease_stream_retention_period - [X] delete_stream - [ ] describe_limits - [X] describe_stream -- [ ] describe_stream_summary +- [X] describe_stream_summary - [ ] disable_enhanced_monitoring - [ ] enable_enhanced_monitoring - [X] get_records diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index d9a47ea87..886a3a61f 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -116,10 +116,12 @@ class Stream(BaseModel): def __init__(self, stream_name, shard_count, region): self.stream_name = stream_name self.shard_count = shard_count + self.creation_datetime = datetime.datetime.now() self.region = region self.account_number = "123456789012" self.shards = {} self.tags = {} + self.status = "ACTIVE" if six.PY3: izip_longest = itertools.zip_longest @@ -183,12 +185,23 @@ class Stream(BaseModel): "StreamDescription": { "StreamARN": self.arn, "StreamName": self.stream_name, - "StreamStatus": "ACTIVE", + "StreamStatus": self.status, "HasMoreShards": False, "Shards": [shard.to_json() for shard in self.shards.values()], } } + def to_json_summary(self): + return { + "StreamDescriptionSummary": { + "StreamARN": self.arn, + "StreamName": self.stream_name, + "StreamStatus": self.status, + "StreamCreationTimestamp": six.text_type(self.creation_datetime), + "OpenShardCount": self.shard_count, + } + } + @classmethod def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name): properties = cloudformation_json['Properties'] @@ -309,6 +322,9 @@ class KinesisBackend(BaseBackend): else: raise StreamNotFoundError(stream_name) + def describe_stream_summary(self, stream_name): + return self.describe_stream(stream_name) + def list_streams(self): return self.streams.values() diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index 72b2af4ce..3a81bd9f4 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -33,6 +33,11 @@ class KinesisResponse(BaseResponse): stream = self.kinesis_backend.describe_stream(stream_name) return json.dumps(stream.to_json()) + def describe_stream_summary(self): + stream_name = self.parameters.get('StreamName') + stream = self.kinesis_backend.describe_stream_summary(stream_name) + return json.dumps(stream.to_json_summary()) + def list_streams(self): streams = self.kinesis_backend.list_streams() stream_names = [stream.stream_name for stream in streams] diff --git a/tests/test_kinesis/test_kinesis.py b/tests/test_kinesis/test_kinesis.py index c70236978..6986f79fc 100644 --- a/tests/test_kinesis/test_kinesis.py +++ b/tests/test_kinesis/test_kinesis.py @@ -1,12 +1,13 @@ from __future__ import unicode_literals -import boto.kinesis -from boto.kinesis.exceptions import ResourceNotFoundException, InvalidArgumentException -import boto3 -import sure # noqa import datetime import time +import boto.kinesis +import boto3 +from boto.kinesis.exceptions import ResourceNotFoundException, \ + InvalidArgumentException + from moto import mock_kinesis, mock_kinesis_deprecated @@ -73,6 +74,23 @@ def test_list_many_streams(): has_more_streams.should.equal(False) +@mock_kinesis +def test_describe_stream_summary(): + conn = boto3.client('kinesis', region_name="us-west-2") + stream_name = 'my_stream_summary' + shard_count = 5 + conn.create_stream(StreamName=stream_name, ShardCount=shard_count) + + resp = conn.describe_stream_summary(StreamName=stream_name) + stream = resp["StreamDescriptionSummary"] + + stream["StreamName"].should.equal(stream_name) + stream["OpenShardCount"].should.equal(shard_count) + stream["StreamARN"].should.equal( + "arn:aws:kinesis:us-west-2:123456789012:{}".format(stream_name)) + stream["StreamStatus"].should.equal("ACTIVE") + + @mock_kinesis_deprecated def test_basic_shard_iterator(): conn = boto.kinesis.connect_to_region("us-west-2") @@ -100,7 +118,8 @@ def test_get_invalid_shard_iterator(): conn.create_stream(stream_name, 1) conn.get_shard_iterator.when.called_with( - stream_name, "123", 'TRIM_HORIZON').should.throw(ResourceNotFoundException) + stream_name, "123", 'TRIM_HORIZON').should.throw( + ResourceNotFoundException) @mock_kinesis_deprecated @@ -354,8 +373,8 @@ def test_get_records_timestamp_filtering(): timestamp = datetime.datetime.utcnow() conn.put_record(StreamName=stream_name, - Data='1', - PartitionKey='1') + Data='1', + PartitionKey='1') response = conn.describe_stream(StreamName=stream_name) shard_id = response['StreamDescription']['Shards'][0]['ShardId'] @@ -368,7 +387,7 @@ def test_get_records_timestamp_filtering(): response = conn.get_records(ShardIterator=shard_iterator) response['Records'].should.have.length_of(1) response['Records'][0]['PartitionKey'].should.equal('1') - response['Records'][0]['ApproximateArrivalTimestamp'].should.be.\ + response['Records'][0]['ApproximateArrivalTimestamp'].should.be. \ greater_than(timestamp) response['MillisBehindLatest'].should.equal(0) @@ -461,7 +480,8 @@ def test_invalid_shard_iterator_type(): response = conn.describe_stream(stream_name) shard_id = response['StreamDescription']['Shards'][0]['ShardId'] response = conn.get_shard_iterator.when.called_with( - stream_name, shard_id, 'invalid-type').should.throw(InvalidArgumentException) + stream_name, shard_id, 'invalid-type').should.throw( + InvalidArgumentException) @mock_kinesis_deprecated @@ -549,7 +569,8 @@ def test_split_shard(): shard_range = shards[0]['HashKeyRange'] new_starting_hash = ( - int(shard_range['EndingHashKey']) + int(shard_range['StartingHashKey'])) // 2 + int(shard_range['EndingHashKey']) + int( + shard_range['StartingHashKey'])) // 2 conn.split_shard("my_stream", shards[0]['ShardId'], str(new_starting_hash)) stream_response = conn.describe_stream(stream_name) @@ -562,7 +583,8 @@ def test_split_shard(): shard_range = shards[2]['HashKeyRange'] new_starting_hash = ( - int(shard_range['EndingHashKey']) + int(shard_range['StartingHashKey'])) // 2 + int(shard_range['EndingHashKey']) + int( + shard_range['StartingHashKey'])) // 2 conn.split_shard("my_stream", shards[2]['ShardId'], str(new_starting_hash)) stream_response = conn.describe_stream(stream_name) @@ -592,7 +614,8 @@ def test_merge_shards(): shards.should.have.length_of(4) conn.merge_shards.when.called_with( - stream_name, 'shardId-000000000000', 'shardId-000000000002').should.throw(InvalidArgumentException) + stream_name, 'shardId-000000000000', + 'shardId-000000000002').should.throw(InvalidArgumentException) stream_response = conn.describe_stream(stream_name)