add implemented kinesis method describe_stream_summary (#2023)
This commit is contained in:
parent
e504226386
commit
f13e4e41cd
@ -2519,14 +2519,14 @@
|
|||||||
- [ ] start_next_pending_job_execution
|
- [ ] start_next_pending_job_execution
|
||||||
- [ ] update_job_execution
|
- [ ] update_job_execution
|
||||||
|
|
||||||
## kinesis - 56% implemented
|
## kinesis - 61% implemented
|
||||||
- [X] add_tags_to_stream
|
- [X] add_tags_to_stream
|
||||||
- [X] create_stream
|
- [X] create_stream
|
||||||
- [ ] decrease_stream_retention_period
|
- [ ] decrease_stream_retention_period
|
||||||
- [X] delete_stream
|
- [X] delete_stream
|
||||||
- [ ] describe_limits
|
- [ ] describe_limits
|
||||||
- [X] describe_stream
|
- [X] describe_stream
|
||||||
- [ ] describe_stream_summary
|
- [X] describe_stream_summary
|
||||||
- [ ] disable_enhanced_monitoring
|
- [ ] disable_enhanced_monitoring
|
||||||
- [ ] enable_enhanced_monitoring
|
- [ ] enable_enhanced_monitoring
|
||||||
- [X] get_records
|
- [X] get_records
|
||||||
|
@ -116,10 +116,12 @@ class Stream(BaseModel):
|
|||||||
def __init__(self, stream_name, shard_count, region):
|
def __init__(self, stream_name, shard_count, region):
|
||||||
self.stream_name = stream_name
|
self.stream_name = stream_name
|
||||||
self.shard_count = shard_count
|
self.shard_count = shard_count
|
||||||
|
self.creation_datetime = datetime.datetime.now()
|
||||||
self.region = region
|
self.region = region
|
||||||
self.account_number = "123456789012"
|
self.account_number = "123456789012"
|
||||||
self.shards = {}
|
self.shards = {}
|
||||||
self.tags = {}
|
self.tags = {}
|
||||||
|
self.status = "ACTIVE"
|
||||||
|
|
||||||
if six.PY3:
|
if six.PY3:
|
||||||
izip_longest = itertools.zip_longest
|
izip_longest = itertools.zip_longest
|
||||||
@ -183,12 +185,23 @@ class Stream(BaseModel):
|
|||||||
"StreamDescription": {
|
"StreamDescription": {
|
||||||
"StreamARN": self.arn,
|
"StreamARN": self.arn,
|
||||||
"StreamName": self.stream_name,
|
"StreamName": self.stream_name,
|
||||||
"StreamStatus": "ACTIVE",
|
"StreamStatus": self.status,
|
||||||
"HasMoreShards": False,
|
"HasMoreShards": False,
|
||||||
"Shards": [shard.to_json() for shard in self.shards.values()],
|
"Shards": [shard.to_json() for shard in self.shards.values()],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def to_json_summary(self):
|
||||||
|
return {
|
||||||
|
"StreamDescriptionSummary": {
|
||||||
|
"StreamARN": self.arn,
|
||||||
|
"StreamName": self.stream_name,
|
||||||
|
"StreamStatus": self.status,
|
||||||
|
"StreamCreationTimestamp": six.text_type(self.creation_datetime),
|
||||||
|
"OpenShardCount": self.shard_count,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
|
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
|
||||||
properties = cloudformation_json['Properties']
|
properties = cloudformation_json['Properties']
|
||||||
@ -309,6 +322,9 @@ class KinesisBackend(BaseBackend):
|
|||||||
else:
|
else:
|
||||||
raise StreamNotFoundError(stream_name)
|
raise StreamNotFoundError(stream_name)
|
||||||
|
|
||||||
|
def describe_stream_summary(self, stream_name):
|
||||||
|
return self.describe_stream(stream_name)
|
||||||
|
|
||||||
def list_streams(self):
|
def list_streams(self):
|
||||||
return self.streams.values()
|
return self.streams.values()
|
||||||
|
|
||||||
|
@ -33,6 +33,11 @@ class KinesisResponse(BaseResponse):
|
|||||||
stream = self.kinesis_backend.describe_stream(stream_name)
|
stream = self.kinesis_backend.describe_stream(stream_name)
|
||||||
return json.dumps(stream.to_json())
|
return json.dumps(stream.to_json())
|
||||||
|
|
||||||
|
def describe_stream_summary(self):
|
||||||
|
stream_name = self.parameters.get('StreamName')
|
||||||
|
stream = self.kinesis_backend.describe_stream_summary(stream_name)
|
||||||
|
return json.dumps(stream.to_json_summary())
|
||||||
|
|
||||||
def list_streams(self):
|
def list_streams(self):
|
||||||
streams = self.kinesis_backend.list_streams()
|
streams = self.kinesis_backend.list_streams()
|
||||||
stream_names = [stream.stream_name for stream in streams]
|
stream_names = [stream.stream_name for stream in streams]
|
||||||
|
@ -1,12 +1,13 @@
|
|||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
import boto.kinesis
|
|
||||||
from boto.kinesis.exceptions import ResourceNotFoundException, InvalidArgumentException
|
|
||||||
import boto3
|
|
||||||
import sure # noqa
|
|
||||||
import datetime
|
import datetime
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
import boto.kinesis
|
||||||
|
import boto3
|
||||||
|
from boto.kinesis.exceptions import ResourceNotFoundException, \
|
||||||
|
InvalidArgumentException
|
||||||
|
|
||||||
from moto import mock_kinesis, mock_kinesis_deprecated
|
from moto import mock_kinesis, mock_kinesis_deprecated
|
||||||
|
|
||||||
|
|
||||||
@ -73,6 +74,23 @@ def test_list_many_streams():
|
|||||||
has_more_streams.should.equal(False)
|
has_more_streams.should.equal(False)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_describe_stream_summary():
|
||||||
|
conn = boto3.client('kinesis', region_name="us-west-2")
|
||||||
|
stream_name = 'my_stream_summary'
|
||||||
|
shard_count = 5
|
||||||
|
conn.create_stream(StreamName=stream_name, ShardCount=shard_count)
|
||||||
|
|
||||||
|
resp = conn.describe_stream_summary(StreamName=stream_name)
|
||||||
|
stream = resp["StreamDescriptionSummary"]
|
||||||
|
|
||||||
|
stream["StreamName"].should.equal(stream_name)
|
||||||
|
stream["OpenShardCount"].should.equal(shard_count)
|
||||||
|
stream["StreamARN"].should.equal(
|
||||||
|
"arn:aws:kinesis:us-west-2:123456789012:{}".format(stream_name))
|
||||||
|
stream["StreamStatus"].should.equal("ACTIVE")
|
||||||
|
|
||||||
|
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_basic_shard_iterator():
|
def test_basic_shard_iterator():
|
||||||
conn = boto.kinesis.connect_to_region("us-west-2")
|
conn = boto.kinesis.connect_to_region("us-west-2")
|
||||||
@ -100,7 +118,8 @@ def test_get_invalid_shard_iterator():
|
|||||||
conn.create_stream(stream_name, 1)
|
conn.create_stream(stream_name, 1)
|
||||||
|
|
||||||
conn.get_shard_iterator.when.called_with(
|
conn.get_shard_iterator.when.called_with(
|
||||||
stream_name, "123", 'TRIM_HORIZON').should.throw(ResourceNotFoundException)
|
stream_name, "123", 'TRIM_HORIZON').should.throw(
|
||||||
|
ResourceNotFoundException)
|
||||||
|
|
||||||
|
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
@ -354,8 +373,8 @@ def test_get_records_timestamp_filtering():
|
|||||||
timestamp = datetime.datetime.utcnow()
|
timestamp = datetime.datetime.utcnow()
|
||||||
|
|
||||||
conn.put_record(StreamName=stream_name,
|
conn.put_record(StreamName=stream_name,
|
||||||
Data='1',
|
Data='1',
|
||||||
PartitionKey='1')
|
PartitionKey='1')
|
||||||
|
|
||||||
response = conn.describe_stream(StreamName=stream_name)
|
response = conn.describe_stream(StreamName=stream_name)
|
||||||
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
|
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
|
||||||
@ -368,7 +387,7 @@ def test_get_records_timestamp_filtering():
|
|||||||
response = conn.get_records(ShardIterator=shard_iterator)
|
response = conn.get_records(ShardIterator=shard_iterator)
|
||||||
response['Records'].should.have.length_of(1)
|
response['Records'].should.have.length_of(1)
|
||||||
response['Records'][0]['PartitionKey'].should.equal('1')
|
response['Records'][0]['PartitionKey'].should.equal('1')
|
||||||
response['Records'][0]['ApproximateArrivalTimestamp'].should.be.\
|
response['Records'][0]['ApproximateArrivalTimestamp'].should.be. \
|
||||||
greater_than(timestamp)
|
greater_than(timestamp)
|
||||||
response['MillisBehindLatest'].should.equal(0)
|
response['MillisBehindLatest'].should.equal(0)
|
||||||
|
|
||||||
@ -461,7 +480,8 @@ def test_invalid_shard_iterator_type():
|
|||||||
response = conn.describe_stream(stream_name)
|
response = conn.describe_stream(stream_name)
|
||||||
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
|
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
|
||||||
response = conn.get_shard_iterator.when.called_with(
|
response = conn.get_shard_iterator.when.called_with(
|
||||||
stream_name, shard_id, 'invalid-type').should.throw(InvalidArgumentException)
|
stream_name, shard_id, 'invalid-type').should.throw(
|
||||||
|
InvalidArgumentException)
|
||||||
|
|
||||||
|
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
@ -549,7 +569,8 @@ def test_split_shard():
|
|||||||
|
|
||||||
shard_range = shards[0]['HashKeyRange']
|
shard_range = shards[0]['HashKeyRange']
|
||||||
new_starting_hash = (
|
new_starting_hash = (
|
||||||
int(shard_range['EndingHashKey']) + int(shard_range['StartingHashKey'])) // 2
|
int(shard_range['EndingHashKey']) + int(
|
||||||
|
shard_range['StartingHashKey'])) // 2
|
||||||
conn.split_shard("my_stream", shards[0]['ShardId'], str(new_starting_hash))
|
conn.split_shard("my_stream", shards[0]['ShardId'], str(new_starting_hash))
|
||||||
|
|
||||||
stream_response = conn.describe_stream(stream_name)
|
stream_response = conn.describe_stream(stream_name)
|
||||||
@ -562,7 +583,8 @@ def test_split_shard():
|
|||||||
|
|
||||||
shard_range = shards[2]['HashKeyRange']
|
shard_range = shards[2]['HashKeyRange']
|
||||||
new_starting_hash = (
|
new_starting_hash = (
|
||||||
int(shard_range['EndingHashKey']) + int(shard_range['StartingHashKey'])) // 2
|
int(shard_range['EndingHashKey']) + int(
|
||||||
|
shard_range['StartingHashKey'])) // 2
|
||||||
conn.split_shard("my_stream", shards[2]['ShardId'], str(new_starting_hash))
|
conn.split_shard("my_stream", shards[2]['ShardId'], str(new_starting_hash))
|
||||||
|
|
||||||
stream_response = conn.describe_stream(stream_name)
|
stream_response = conn.describe_stream(stream_name)
|
||||||
@ -592,7 +614,8 @@ def test_merge_shards():
|
|||||||
shards.should.have.length_of(4)
|
shards.should.have.length_of(4)
|
||||||
|
|
||||||
conn.merge_shards.when.called_with(
|
conn.merge_shards.when.called_with(
|
||||||
stream_name, 'shardId-000000000000', 'shardId-000000000002').should.throw(InvalidArgumentException)
|
stream_name, 'shardId-000000000000',
|
||||||
|
'shardId-000000000002').should.throw(InvalidArgumentException)
|
||||||
|
|
||||||
stream_response = conn.describe_stream(stream_name)
|
stream_response = conn.describe_stream(stream_name)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user