add kinesis API:
- AddTagsToStream - ListTagsForStream - RemoveTagsFromStream
This commit is contained in:
parent
8c122ee793
commit
03cd1e47c1
@ -2,8 +2,8 @@ from __future__ import unicode_literals
|
|||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import boto.kinesis
|
import boto.kinesis
|
||||||
|
|
||||||
from moto.compat import OrderedDict
|
from moto.compat import OrderedDict
|
||||||
from moto.core import BaseBackend
|
from moto.core import BaseBackend
|
||||||
from .exceptions import StreamNotFoundError, ShardNotFoundError, ResourceInUseError
|
from .exceptions import StreamNotFoundError, ShardNotFoundError, ResourceInUseError
|
||||||
@ -84,6 +84,7 @@ class Stream(object):
|
|||||||
self.region = region
|
self.region = region
|
||||||
self.account_number = "123456789012"
|
self.account_number = "123456789012"
|
||||||
self.shards = {}
|
self.shards = {}
|
||||||
|
self.tags = {}
|
||||||
|
|
||||||
for index in range(shard_count):
|
for index in range(shard_count):
|
||||||
shard_id = "shardId-{0}".format(str(index).zfill(12))
|
shard_id = "shardId-{0}".format(str(index).zfill(12))
|
||||||
@ -299,6 +300,39 @@ class KinesisBackend(BaseBackend):
|
|||||||
record = stream.put_record(record_data)
|
record = stream.put_record(record_data)
|
||||||
return record
|
return record
|
||||||
|
|
||||||
|
def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None, limit=None):
|
||||||
|
stream = self.describe_stream(stream_name)
|
||||||
|
|
||||||
|
tags = []
|
||||||
|
result = {
|
||||||
|
'HasMoreTags': False,
|
||||||
|
'Tags': tags
|
||||||
|
}
|
||||||
|
for key, val in sorted(stream.tags.items(), key=lambda x:x[0]):
|
||||||
|
if limit and len(res) >= limit:
|
||||||
|
result['HasMoreTags'] = True
|
||||||
|
break
|
||||||
|
if exclusive_start_tag_key and key < exexclusive_start_tag_key:
|
||||||
|
continue
|
||||||
|
|
||||||
|
tags.append({
|
||||||
|
'Key': key,
|
||||||
|
'Value': val
|
||||||
|
})
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def add_tags_to_stream(self, stream_name, tags):
|
||||||
|
stream = self.describe_stream(stream_name)
|
||||||
|
stream.tags.update(tags)
|
||||||
|
|
||||||
|
def remove_tags_from_stream(self, stream_name, tag_keys):
|
||||||
|
stream = self.describe_stream(stream_name)
|
||||||
|
for key in tag_keys:
|
||||||
|
if key in stream.tags:
|
||||||
|
del stream.tags[key]
|
||||||
|
|
||||||
|
|
||||||
kinesis_backends = {}
|
kinesis_backends = {}
|
||||||
for region in boto.kinesis.regions():
|
for region in boto.kinesis.regions():
|
||||||
kinesis_backends[region.name] = KinesisBackend()
|
kinesis_backends[region.name] = KinesisBackend()
|
||||||
|
@ -170,3 +170,22 @@ class KinesisResponse(BaseResponse):
|
|||||||
"FailedPutCount": 0,
|
"FailedPutCount": 0,
|
||||||
"RequestResponses": request_responses,
|
"RequestResponses": request_responses,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
def add_tags_to_stream(self):
|
||||||
|
stream_name = self.parameters.get('StreamName')
|
||||||
|
tags = self.parameters.get('Tags')
|
||||||
|
self.kinesis_backend.add_tags_to_stream(stream_name, tags)
|
||||||
|
return json.dumps({})
|
||||||
|
|
||||||
|
def list_tags_for_stream(self):
|
||||||
|
stream_name = self.parameters.get('StreamName')
|
||||||
|
exclusive_start_tag_key = self.parameters.get('ExclusiveStartTagKey')
|
||||||
|
limit = self.parameters.get('Limit')
|
||||||
|
response = self.kinesis_backend.list_tags_for_stream(stream_name, exclusive_start_tag_key, limit)
|
||||||
|
return json.dumps(response)
|
||||||
|
|
||||||
|
def remove_tags_from_stream(self):
|
||||||
|
stream_name = self.parameters.get('StreamName')
|
||||||
|
tag_keys = self.parameters.get('TagKeys')
|
||||||
|
self.kinesis_backend.remove_tags_from_stream(stream_name, tag_keys)
|
||||||
|
return json.dumps({})
|
||||||
|
Loading…
Reference in New Issue
Block a user