From 03cd1e47c15a743c148361bea2202c34d04243a4 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 3 Dec 2015 11:53:57 +0000 Subject: [PATCH] add kinesis API: - AddTagsToStream - ListTagsForStream - RemoveTagsFromStream --- moto/kinesis/models.py | 36 +++++++++++++++++++++++++++++++++++- moto/kinesis/responses.py | 19 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index 3d77c1677..d24afd2b7 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -2,8 +2,8 @@ from __future__ import unicode_literals import datetime import time - import boto.kinesis + from moto.compat import OrderedDict from moto.core import BaseBackend from .exceptions import StreamNotFoundError, ShardNotFoundError, ResourceInUseError @@ -84,6 +84,7 @@ class Stream(object): self.region = region self.account_number = "123456789012" self.shards = {} + self.tags = {} for index in range(shard_count): shard_id = "shardId-{0}".format(str(index).zfill(12)) @@ -299,6 +300,39 @@ class KinesisBackend(BaseBackend): record = stream.put_record(record_data) return record + def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None, limit=None): + stream = self.describe_stream(stream_name) + + tags = [] + result = { + 'HasMoreTags': False, + 'Tags': tags + } + for key, val in sorted(stream.tags.items(), key=lambda x:x[0]): + if limit and len(res) >= limit: + result['HasMoreTags'] = True + break + if exclusive_start_tag_key and key < exexclusive_start_tag_key: + continue + + tags.append({ + 'Key': key, + 'Value': val + }) + + return result + + def add_tags_to_stream(self, stream_name, tags): + stream = self.describe_stream(stream_name) + stream.tags.update(tags) + + def remove_tags_from_stream(self, stream_name, tag_keys): + stream = self.describe_stream(stream_name) + for key in tag_keys: + if key in stream.tags: + del stream.tags[key] + + kinesis_backends = {} for region in boto.kinesis.regions(): kinesis_backends[region.name] = KinesisBackend() diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index cd2aba4ef..61a4a8199 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -170,3 +170,22 @@ class KinesisResponse(BaseResponse): "FailedPutCount": 0, "RequestResponses": request_responses, }) + + def add_tags_to_stream(self): + stream_name = self.parameters.get('StreamName') + tags = self.parameters.get('Tags') + self.kinesis_backend.add_tags_to_stream(stream_name, tags) + return json.dumps({}) + + def list_tags_for_stream(self): + stream_name = self.parameters.get('StreamName') + exclusive_start_tag_key = self.parameters.get('ExclusiveStartTagKey') + limit = self.parameters.get('Limit') + response = self.kinesis_backend.list_tags_for_stream(stream_name, exclusive_start_tag_key, limit) + return json.dumps(response) + + def remove_tags_from_stream(self): + stream_name = self.parameters.get('StreamName') + tag_keys = self.parameters.get('TagKeys') + self.kinesis_backend.remove_tags_from_stream(stream_name, tag_keys) + return json.dumps({})