From f93b9a86e9faf7846888acf37f705d07e2bfa647 Mon Sep 17 00:00:00 2001 From: mfranke Date: Thu, 12 Nov 2015 10:05:02 +0100 Subject: [PATCH] add put_records API fix create_stream API to get right response in case of stream already exists --- moto/kinesis/exceptions.py | 9 +++++++++ moto/kinesis/models.py | 27 ++++++++++++++++++++++++++- moto/kinesis/responses.py | 12 ++++++++++++ 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/moto/kinesis/exceptions.py b/moto/kinesis/exceptions.py index c6be76885..0fcb3652a 100644 --- a/moto/kinesis/exceptions.py +++ b/moto/kinesis/exceptions.py @@ -13,6 +13,15 @@ class ResourceNotFoundError(BadRequest): }) +class ResourceInUseError(BadRequest): + def __init__(self, message): + super(ResourceNotFoundError, self).__init__() + self.description = json.dumps({ + "message": message, + '__type': 'ResourceInUseException', + }) + + class StreamNotFoundError(ResourceNotFoundError): def __init__(self, stream_name): super(StreamNotFoundError, self).__init__( diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index d2d0d2913..aae4e918f 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -6,7 +6,7 @@ import time import boto.kinesis from moto.compat import OrderedDict from moto.core import BaseBackend -from .exceptions import StreamNotFoundError, ShardNotFoundError +from .exceptions import StreamNotFoundError, ShardNotFoundError, ResourceInUseError from .utils import compose_shard_iterator, compose_new_shard_iterator, decompose_shard_iterator @@ -201,6 +201,8 @@ class KinesisBackend(BaseBackend): self.delivery_streams = {} def create_stream(self, stream_name, shard_count, region): + if stream_name in self.streams: + return ResourceInUseError(stream_name) stream = Stream(stream_name, shard_count, region) self.streams[stream_name] = stream return stream @@ -251,6 +253,29 @@ class KinesisBackend(BaseBackend): return sequence_number, shard_id + def put_records(self, stream_name, records): + stream = self.describe_stream(stream_name) + + response = { + "FailedRecordCount": 0, + "Records" : [] + } + + for record in records: + partition_key = record.get("PartitionKey") + explicit_hash_key = record.get("ExplicitHashKey") + data = record.get("data") + + sequence_number, shard_id = stream.put_record( + partition_key, explicit_hash_key, None, data + ) + response['Records'].append({ + "SequenceNumber": sequence_number, + "ShardId": shard_id + }) + + return response + ''' Firehose ''' def create_delivery_stream(self, stream_name, **stream_kwargs): stream = DeliveryStream(stream_name, **stream_kwargs) diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index 5b8c7be06..35500e8ac 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -89,6 +89,18 @@ class KinesisResponse(BaseResponse): "ShardId": shard_id, }) + def put_records(self): + if self.is_firehose: + return self.firehose_put_record() + stream_name = self.parameters.get("StreamName") + records = self.parameters.get("Records") + + response = self.kinesis_backend.put_records( + stream_name, records + ) + + return json.dumps(response) + ''' Firehose ''' def create_delivery_stream(self): stream_name = self.parameters['DeliveryStreamName']