2014-11-26 10:55:58 -05:00
|
|
|
import json
|
|
|
|
|
|
|
|
from moto.core.responses import BaseResponse
|
|
|
|
from .models import kinesis_backends
|
|
|
|
|
|
|
|
|
|
|
|
class KinesisResponse(BaseResponse):
|
2022-08-13 09:49:43 +00:00
|
|
|
def __init__(self):
|
|
|
|
super().__init__(service_name="kinesis")
|
|
|
|
|
2014-11-26 10:55:58 -05:00
|
|
|
@property
|
|
|
|
def parameters(self):
|
2017-02-16 22:51:04 -05:00
|
|
|
return json.loads(self.body)
|
2014-11-26 10:55:58 -05:00
|
|
|
|
|
|
|
@property
|
|
|
|
def kinesis_backend(self):
|
2022-08-13 09:49:43 +00:00
|
|
|
return kinesis_backends[self.current_account][self.region]
|
2014-11-26 10:55:58 -05:00
|
|
|
|
|
|
|
def create_stream(self):
|
2019-10-31 08:44:26 -07:00
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
shard_count = self.parameters.get("ShardCount")
|
2020-08-27 05:11:47 -04:00
|
|
|
retention_period_hours = self.parameters.get("RetentionPeriodHours")
|
|
|
|
self.kinesis_backend.create_stream(
|
2022-01-26 18:41:04 -01:00
|
|
|
stream_name, shard_count, retention_period_hours
|
2020-08-27 05:11:47 -04:00
|
|
|
)
|
2014-11-26 10:55:58 -05:00
|
|
|
return ""
|
|
|
|
|
|
|
|
def describe_stream(self):
|
2019-10-31 08:44:26 -07:00
|
|
|
stream_name = self.parameters.get("StreamName")
|
2021-10-11 20:33:32 +00:00
|
|
|
limit = self.parameters.get("Limit")
|
2014-11-26 10:55:58 -05:00
|
|
|
stream = self.kinesis_backend.describe_stream(stream_name)
|
2021-10-11 20:33:32 +00:00
|
|
|
return json.dumps(stream.to_json(shard_limit=limit))
|
2014-11-26 10:55:58 -05:00
|
|
|
|
2019-05-21 02:02:36 +03:00
|
|
|
def describe_stream_summary(self):
|
2019-10-31 08:44:26 -07:00
|
|
|
stream_name = self.parameters.get("StreamName")
|
2019-05-21 02:02:36 +03:00
|
|
|
stream = self.kinesis_backend.describe_stream_summary(stream_name)
|
|
|
|
return json.dumps(stream.to_json_summary())
|
|
|
|
|
2014-11-26 10:55:58 -05:00
|
|
|
def list_streams(self):
|
|
|
|
streams = self.kinesis_backend.list_streams()
|
2017-05-10 21:58:42 -04:00
|
|
|
stream_names = [stream.stream_name for stream in streams]
|
2019-10-31 08:44:26 -07:00
|
|
|
max_streams = self._get_param("Limit", 10)
|
2017-05-10 21:58:42 -04:00
|
|
|
try:
|
2019-10-31 08:44:26 -07:00
|
|
|
token = self.parameters.get("ExclusiveStartStreamName")
|
2017-05-10 21:58:42 -04:00
|
|
|
except ValueError:
|
2019-10-31 08:44:26 -07:00
|
|
|
token = self._get_param("ExclusiveStartStreamName")
|
2017-05-10 21:58:42 -04:00
|
|
|
if token:
|
|
|
|
start = stream_names.index(token) + 1
|
|
|
|
else:
|
|
|
|
start = 0
|
2019-10-31 08:44:26 -07:00
|
|
|
streams_resp = stream_names[start : start + max_streams]
|
2017-05-10 21:58:42 -04:00
|
|
|
has_more_streams = False
|
|
|
|
if start + max_streams < len(stream_names):
|
|
|
|
has_more_streams = True
|
2014-11-26 10:55:58 -05:00
|
|
|
|
2019-10-31 08:44:26 -07:00
|
|
|
return json.dumps(
|
|
|
|
{"HasMoreStreams": has_more_streams, "StreamNames": streams_resp}
|
|
|
|
)
|
2014-11-26 10:55:58 -05:00
|
|
|
|
|
|
|
def delete_stream(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
self.kinesis_backend.delete_stream(stream_name)
|
|
|
|
return ""
|
2014-11-26 20:49:21 -05:00
|
|
|
|
|
|
|
def get_shard_iterator(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
shard_id = self.parameters.get("ShardId")
|
|
|
|
shard_iterator_type = self.parameters.get("ShardIteratorType")
|
2019-10-31 08:44:26 -07:00
|
|
|
starting_sequence_number = self.parameters.get("StartingSequenceNumber")
|
2017-12-08 02:57:05 -08:00
|
|
|
at_timestamp = self.parameters.get("Timestamp")
|
2014-11-26 20:49:21 -05:00
|
|
|
|
|
|
|
shard_iterator = self.kinesis_backend.get_shard_iterator(
|
2019-10-31 08:44:26 -07:00
|
|
|
stream_name,
|
|
|
|
shard_id,
|
|
|
|
shard_iterator_type,
|
|
|
|
starting_sequence_number,
|
|
|
|
at_timestamp,
|
2014-11-26 20:49:21 -05:00
|
|
|
)
|
|
|
|
|
2019-10-31 08:44:26 -07:00
|
|
|
return json.dumps({"ShardIterator": shard_iterator})
|
2014-11-26 20:49:21 -05:00
|
|
|
|
|
|
|
def get_records(self):
|
|
|
|
shard_iterator = self.parameters.get("ShardIterator")
|
|
|
|
limit = self.parameters.get("Limit")
|
|
|
|
|
2019-10-31 08:44:26 -07:00
|
|
|
(
|
|
|
|
next_shard_iterator,
|
|
|
|
records,
|
|
|
|
millis_behind_latest,
|
|
|
|
) = self.kinesis_backend.get_records(shard_iterator, limit)
|
|
|
|
|
|
|
|
return json.dumps(
|
|
|
|
{
|
|
|
|
"NextShardIterator": next_shard_iterator,
|
|
|
|
"Records": [record.to_json() for record in records],
|
|
|
|
"MillisBehindLatest": millis_behind_latest,
|
|
|
|
}
|
|
|
|
)
|
2014-11-26 20:49:21 -05:00
|
|
|
|
|
|
|
def put_record(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
partition_key = self.parameters.get("PartitionKey")
|
|
|
|
explicit_hash_key = self.parameters.get("ExplicitHashKey")
|
|
|
|
data = self.parameters.get("Data")
|
|
|
|
|
|
|
|
sequence_number, shard_id = self.kinesis_backend.put_record(
|
2019-10-31 08:44:26 -07:00
|
|
|
stream_name,
|
|
|
|
partition_key,
|
|
|
|
explicit_hash_key,
|
|
|
|
data,
|
2014-11-26 20:49:21 -05:00
|
|
|
)
|
|
|
|
|
2019-10-31 08:44:26 -07:00
|
|
|
return json.dumps({"SequenceNumber": sequence_number, "ShardId": shard_id})
|
2015-10-30 09:59:57 -04:00
|
|
|
|
2015-11-12 10:05:02 +01:00
|
|
|
def put_records(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
records = self.parameters.get("Records")
|
|
|
|
|
2019-10-31 08:44:26 -07:00
|
|
|
response = self.kinesis_backend.put_records(stream_name, records)
|
2015-11-12 10:05:02 +01:00
|
|
|
|
|
|
|
return json.dumps(response)
|
|
|
|
|
2015-12-05 11:13:34 +00:00
|
|
|
def split_shard(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
shard_to_split = self.parameters.get("ShardToSplit")
|
|
|
|
new_starting_hash_key = self.parameters.get("NewStartingHashKey")
|
2017-02-23 21:37:43 -05:00
|
|
|
self.kinesis_backend.split_shard(
|
2015-12-05 11:13:34 +00:00
|
|
|
stream_name, shard_to_split, new_starting_hash_key
|
|
|
|
)
|
|
|
|
return ""
|
|
|
|
|
|
|
|
def merge_shards(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
shard_to_merge = self.parameters.get("ShardToMerge")
|
|
|
|
adjacent_shard_to_merge = self.parameters.get("AdjacentShardToMerge")
|
2017-02-23 21:37:43 -05:00
|
|
|
self.kinesis_backend.merge_shards(
|
2015-12-05 11:13:34 +00:00
|
|
|
stream_name, shard_to_merge, adjacent_shard_to_merge
|
|
|
|
)
|
|
|
|
return ""
|
|
|
|
|
2021-10-09 20:18:13 +00:00
|
|
|
def list_shards(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
next_token = self.parameters.get("NextToken")
|
2022-01-14 18:51:49 -01:00
|
|
|
max_results = self.parameters.get("MaxResults", 10000)
|
2021-10-09 20:18:13 +00:00
|
|
|
shards, token = self.kinesis_backend.list_shards(
|
2022-01-14 18:51:49 -01:00
|
|
|
stream_name=stream_name, limit=max_results, next_token=next_token
|
2021-10-09 20:18:13 +00:00
|
|
|
)
|
|
|
|
res = {"Shards": shards}
|
|
|
|
if token:
|
|
|
|
res["NextToken"] = token
|
|
|
|
return json.dumps(res)
|
|
|
|
|
2022-01-26 18:41:04 -01:00
|
|
|
def update_shard_count(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
target_shard_count = self.parameters.get("TargetShardCount")
|
|
|
|
current_shard_count = self.kinesis_backend.update_shard_count(
|
2022-03-10 13:39:59 -01:00
|
|
|
stream_name=stream_name, target_shard_count=target_shard_count
|
2022-01-26 18:41:04 -01:00
|
|
|
)
|
|
|
|
return json.dumps(
|
|
|
|
dict(
|
|
|
|
StreamName=stream_name,
|
|
|
|
CurrentShardCount=current_shard_count,
|
|
|
|
TargetShardCount=target_shard_count,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
2021-07-14 22:36:30 +08:00
|
|
|
def increase_stream_retention_period(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
retention_period_hours = self.parameters.get("RetentionPeriodHours")
|
|
|
|
self.kinesis_backend.increase_stream_retention_period(
|
|
|
|
stream_name, retention_period_hours
|
|
|
|
)
|
|
|
|
return ""
|
|
|
|
|
|
|
|
def decrease_stream_retention_period(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
retention_period_hours = self.parameters.get("RetentionPeriodHours")
|
|
|
|
self.kinesis_backend.decrease_stream_retention_period(
|
|
|
|
stream_name, retention_period_hours
|
|
|
|
)
|
|
|
|
return ""
|
|
|
|
|
2015-12-03 11:53:57 +00:00
|
|
|
def add_tags_to_stream(self):
|
2019-10-31 08:44:26 -07:00
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
tags = self.parameters.get("Tags")
|
2015-12-03 11:53:57 +00:00
|
|
|
self.kinesis_backend.add_tags_to_stream(stream_name, tags)
|
|
|
|
return json.dumps({})
|
|
|
|
|
|
|
|
def list_tags_for_stream(self):
|
2019-10-31 08:44:26 -07:00
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
exclusive_start_tag_key = self.parameters.get("ExclusiveStartTagKey")
|
|
|
|
limit = self.parameters.get("Limit")
|
2017-02-23 21:37:43 -05:00
|
|
|
response = self.kinesis_backend.list_tags_for_stream(
|
2019-10-31 08:44:26 -07:00
|
|
|
stream_name, exclusive_start_tag_key, limit
|
|
|
|
)
|
2015-12-03 11:53:57 +00:00
|
|
|
return json.dumps(response)
|
|
|
|
|
|
|
|
def remove_tags_from_stream(self):
|
2019-10-31 08:44:26 -07:00
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
tag_keys = self.parameters.get("TagKeys")
|
2015-12-03 11:53:57 +00:00
|
|
|
self.kinesis_backend.remove_tags_from_stream(stream_name, tag_keys)
|
|
|
|
return json.dumps({})
|
2022-01-26 18:41:04 -01:00
|
|
|
|
|
|
|
def enable_enhanced_monitoring(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
shard_level_metrics = self.parameters.get("ShardLevelMetrics")
|
|
|
|
current, desired = self.kinesis_backend.enable_enhanced_monitoring(
|
2022-03-10 13:39:59 -01:00
|
|
|
stream_name=stream_name, shard_level_metrics=shard_level_metrics
|
2022-01-26 18:41:04 -01:00
|
|
|
)
|
|
|
|
return json.dumps(
|
|
|
|
dict(
|
|
|
|
StreamName=stream_name,
|
|
|
|
CurrentShardLevelMetrics=current,
|
|
|
|
DesiredShardLevelMetrics=desired,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
def disable_enhanced_monitoring(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
shard_level_metrics = self.parameters.get("ShardLevelMetrics")
|
|
|
|
current, desired = self.kinesis_backend.disable_enhanced_monitoring(
|
2022-03-10 13:39:59 -01:00
|
|
|
stream_name=stream_name, to_be_disabled=shard_level_metrics
|
2022-01-26 18:41:04 -01:00
|
|
|
)
|
|
|
|
return json.dumps(
|
|
|
|
dict(
|
|
|
|
StreamName=stream_name,
|
|
|
|
CurrentShardLevelMetrics=current,
|
|
|
|
DesiredShardLevelMetrics=desired,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
def list_stream_consumers(self):
|
|
|
|
stream_arn = self.parameters.get("StreamARN")
|
|
|
|
consumers = self.kinesis_backend.list_stream_consumers(stream_arn=stream_arn)
|
|
|
|
return json.dumps(dict(Consumers=[c.to_json() for c in consumers]))
|
|
|
|
|
|
|
|
def register_stream_consumer(self):
|
|
|
|
stream_arn = self.parameters.get("StreamARN")
|
|
|
|
consumer_name = self.parameters.get("ConsumerName")
|
|
|
|
consumer = self.kinesis_backend.register_stream_consumer(
|
2022-03-10 13:39:59 -01:00
|
|
|
stream_arn=stream_arn, consumer_name=consumer_name
|
2022-01-26 18:41:04 -01:00
|
|
|
)
|
|
|
|
return json.dumps(dict(Consumer=consumer.to_json()))
|
|
|
|
|
|
|
|
def describe_stream_consumer(self):
|
|
|
|
stream_arn = self.parameters.get("StreamARN")
|
|
|
|
consumer_name = self.parameters.get("ConsumerName")
|
|
|
|
consumer_arn = self.parameters.get("ConsumerARN")
|
|
|
|
consumer = self.kinesis_backend.describe_stream_consumer(
|
|
|
|
stream_arn=stream_arn,
|
|
|
|
consumer_name=consumer_name,
|
|
|
|
consumer_arn=consumer_arn,
|
|
|
|
)
|
|
|
|
return json.dumps(
|
|
|
|
dict(ConsumerDescription=consumer.to_json(include_stream_arn=True))
|
|
|
|
)
|
|
|
|
|
|
|
|
def deregister_stream_consumer(self):
|
|
|
|
stream_arn = self.parameters.get("StreamARN")
|
|
|
|
consumer_name = self.parameters.get("ConsumerName")
|
|
|
|
consumer_arn = self.parameters.get("ConsumerARN")
|
|
|
|
self.kinesis_backend.deregister_stream_consumer(
|
|
|
|
stream_arn=stream_arn,
|
|
|
|
consumer_name=consumer_name,
|
|
|
|
consumer_arn=consumer_arn,
|
|
|
|
)
|
|
|
|
return json.dumps(dict())
|
|
|
|
|
|
|
|
def start_stream_encryption(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
encryption_type = self.parameters.get("EncryptionType")
|
|
|
|
key_id = self.parameters.get("KeyId")
|
|
|
|
self.kinesis_backend.start_stream_encryption(
|
|
|
|
stream_name=stream_name, encryption_type=encryption_type, key_id=key_id
|
|
|
|
)
|
|
|
|
return json.dumps(dict())
|
|
|
|
|
|
|
|
def stop_stream_encryption(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
2022-03-10 13:39:59 -01:00
|
|
|
self.kinesis_backend.stop_stream_encryption(stream_name=stream_name)
|
2022-01-26 18:41:04 -01:00
|
|
|
return json.dumps(dict())
|