moto/moto/kinesis/responses.py

191 lines
6.8 KiB
Python
Raw Normal View History

2014-11-26 10:55:58 -05:00
from __future__ import unicode_literals
import json
from moto.core.responses import BaseResponse
from .models import kinesis_backends
class KinesisResponse(BaseResponse):
@property
def parameters(self):
2017-02-16 22:51:04 -05:00
return json.loads(self.body)
2014-11-26 10:55:58 -05:00
@property
def kinesis_backend(self):
return kinesis_backends[self.region]
def create_stream(self):
2019-10-31 08:44:26 -07:00
stream_name = self.parameters.get("StreamName")
shard_count = self.parameters.get("ShardCount")
retention_period_hours = self.parameters.get("RetentionPeriodHours")
self.kinesis_backend.create_stream(
stream_name, shard_count, retention_period_hours, self.region
)
2014-11-26 10:55:58 -05:00
return ""
def describe_stream(self):
2019-10-31 08:44:26 -07:00
stream_name = self.parameters.get("StreamName")
2014-11-26 10:55:58 -05:00
stream = self.kinesis_backend.describe_stream(stream_name)
return json.dumps(stream.to_json())
def describe_stream_summary(self):
2019-10-31 08:44:26 -07:00
stream_name = self.parameters.get("StreamName")
stream = self.kinesis_backend.describe_stream_summary(stream_name)
return json.dumps(stream.to_json_summary())
2014-11-26 10:55:58 -05:00
def list_streams(self):
streams = self.kinesis_backend.list_streams()
2017-05-10 21:58:42 -04:00
stream_names = [stream.stream_name for stream in streams]
2019-10-31 08:44:26 -07:00
max_streams = self._get_param("Limit", 10)
2017-05-10 21:58:42 -04:00
try:
2019-10-31 08:44:26 -07:00
token = self.parameters.get("ExclusiveStartStreamName")
2017-05-10 21:58:42 -04:00
except ValueError:
2019-10-31 08:44:26 -07:00
token = self._get_param("ExclusiveStartStreamName")
2017-05-10 21:58:42 -04:00
if token:
start = stream_names.index(token) + 1
else:
start = 0
2019-10-31 08:44:26 -07:00
streams_resp = stream_names[start : start + max_streams]
2017-05-10 21:58:42 -04:00
has_more_streams = False
if start + max_streams < len(stream_names):
has_more_streams = True
2014-11-26 10:55:58 -05:00
2019-10-31 08:44:26 -07:00
return json.dumps(
{"HasMoreStreams": has_more_streams, "StreamNames": streams_resp}
)
2014-11-26 10:55:58 -05:00
def delete_stream(self):
stream_name = self.parameters.get("StreamName")
self.kinesis_backend.delete_stream(stream_name)
return ""
2014-11-26 20:49:21 -05:00
def get_shard_iterator(self):
stream_name = self.parameters.get("StreamName")
shard_id = self.parameters.get("ShardId")
shard_iterator_type = self.parameters.get("ShardIteratorType")
2019-10-31 08:44:26 -07:00
starting_sequence_number = self.parameters.get("StartingSequenceNumber")
at_timestamp = self.parameters.get("Timestamp")
2014-11-26 20:49:21 -05:00
shard_iterator = self.kinesis_backend.get_shard_iterator(
2019-10-31 08:44:26 -07:00
stream_name,
shard_id,
shard_iterator_type,
starting_sequence_number,
at_timestamp,
2014-11-26 20:49:21 -05:00
)
2019-10-31 08:44:26 -07:00
return json.dumps({"ShardIterator": shard_iterator})
2014-11-26 20:49:21 -05:00
def get_records(self):
shard_iterator = self.parameters.get("ShardIterator")
limit = self.parameters.get("Limit")
2019-10-31 08:44:26 -07:00
(
next_shard_iterator,
records,
millis_behind_latest,
) = self.kinesis_backend.get_records(shard_iterator, limit)
return json.dumps(
{
"NextShardIterator": next_shard_iterator,
"Records": [record.to_json() for record in records],
"MillisBehindLatest": millis_behind_latest,
}
)
2014-11-26 20:49:21 -05:00
def put_record(self):
stream_name = self.parameters.get("StreamName")
partition_key = self.parameters.get("PartitionKey")
explicit_hash_key = self.parameters.get("ExplicitHashKey")
2019-10-31 08:44:26 -07:00
sequence_number_for_ordering = self.parameters.get("SequenceNumberForOrdering")
2014-11-26 20:49:21 -05:00
data = self.parameters.get("Data")
sequence_number, shard_id = self.kinesis_backend.put_record(
2019-10-31 08:44:26 -07:00
stream_name,
partition_key,
explicit_hash_key,
sequence_number_for_ordering,
data,
2014-11-26 20:49:21 -05:00
)
2019-10-31 08:44:26 -07:00
return json.dumps({"SequenceNumber": sequence_number, "ShardId": shard_id})
2015-10-30 09:59:57 -04:00
def put_records(self):
stream_name = self.parameters.get("StreamName")
records = self.parameters.get("Records")
2019-10-31 08:44:26 -07:00
response = self.kinesis_backend.put_records(stream_name, records)
return json.dumps(response)
def split_shard(self):
stream_name = self.parameters.get("StreamName")
shard_to_split = self.parameters.get("ShardToSplit")
new_starting_hash_key = self.parameters.get("NewStartingHashKey")
2017-02-23 21:37:43 -05:00
self.kinesis_backend.split_shard(
stream_name, shard_to_split, new_starting_hash_key
)
return ""
def merge_shards(self):
stream_name = self.parameters.get("StreamName")
shard_to_merge = self.parameters.get("ShardToMerge")
adjacent_shard_to_merge = self.parameters.get("AdjacentShardToMerge")
2017-02-23 21:37:43 -05:00
self.kinesis_backend.merge_shards(
stream_name, shard_to_merge, adjacent_shard_to_merge
)
return ""
def list_shards(self):
stream_name = self.parameters.get("StreamName")
next_token = self.parameters.get("NextToken")
start_id = self.parameters.get("ExclusiveStartShardId") # noqa
max = self.parameters.get("MaxResults", 10000)
start_timestamp = self.parameters.get("StreamCreationTimestamp") # noqa
shards, token = self.kinesis_backend.list_shards(
stream_name=stream_name, limit=max, next_token=next_token
)
res = {"Shards": shards}
if token:
res["NextToken"] = token
return json.dumps(res)
def increase_stream_retention_period(self):
stream_name = self.parameters.get("StreamName")
retention_period_hours = self.parameters.get("RetentionPeriodHours")
self.kinesis_backend.increase_stream_retention_period(
stream_name, retention_period_hours
)
return ""
def decrease_stream_retention_period(self):
stream_name = self.parameters.get("StreamName")
retention_period_hours = self.parameters.get("RetentionPeriodHours")
self.kinesis_backend.decrease_stream_retention_period(
stream_name, retention_period_hours
)
return ""
def add_tags_to_stream(self):
2019-10-31 08:44:26 -07:00
stream_name = self.parameters.get("StreamName")
tags = self.parameters.get("Tags")
self.kinesis_backend.add_tags_to_stream(stream_name, tags)
return json.dumps({})
def list_tags_for_stream(self):
2019-10-31 08:44:26 -07:00
stream_name = self.parameters.get("StreamName")
exclusive_start_tag_key = self.parameters.get("ExclusiveStartTagKey")
limit = self.parameters.get("Limit")
2017-02-23 21:37:43 -05:00
response = self.kinesis_backend.list_tags_for_stream(
2019-10-31 08:44:26 -07:00
stream_name, exclusive_start_tag_key, limit
)
return json.dumps(response)
def remove_tags_from_stream(self):
2019-10-31 08:44:26 -07:00
stream_name = self.parameters.get("StreamName")
tag_keys = self.parameters.get("TagKeys")
self.kinesis_backend.remove_tags_from_stream(stream_name, tag_keys)
return json.dumps({})