2014-11-26 10:55:58 -05:00
|
|
|
from __future__ import unicode_literals
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
|
|
|
from moto.core.responses import BaseResponse
|
|
|
|
from .models import kinesis_backends
|
|
|
|
|
|
|
|
|
|
|
|
class KinesisResponse(BaseResponse):
|
|
|
|
|
|
|
|
@property
|
|
|
|
def parameters(self):
|
2017-02-16 22:51:04 -05:00
|
|
|
return json.loads(self.body)
|
2014-11-26 10:55:58 -05:00
|
|
|
|
|
|
|
@property
|
|
|
|
def kinesis_backend(self):
|
|
|
|
return kinesis_backends[self.region]
|
|
|
|
|
2015-10-30 09:59:57 -04:00
|
|
|
@property
|
|
|
|
def is_firehose(self):
|
2017-02-18 09:31:47 -05:00
|
|
|
host = self.headers.get('host') or self.headers['Host']
|
2017-02-23 19:43:48 -05:00
|
|
|
return host.startswith('firehose') or 'firehose' in self.headers.get('Authorization', '')
|
2015-10-30 09:59:57 -04:00
|
|
|
|
2014-11-26 10:55:58 -05:00
|
|
|
def create_stream(self):
|
|
|
|
stream_name = self.parameters.get('StreamName')
|
|
|
|
shard_count = self.parameters.get('ShardCount')
|
2017-02-23 21:37:43 -05:00
|
|
|
self.kinesis_backend.create_stream(
|
|
|
|
stream_name, shard_count, self.region)
|
2014-11-26 10:55:58 -05:00
|
|
|
return ""
|
|
|
|
|
|
|
|
def describe_stream(self):
|
|
|
|
stream_name = self.parameters.get('StreamName')
|
|
|
|
stream = self.kinesis_backend.describe_stream(stream_name)
|
|
|
|
return json.dumps(stream.to_json())
|
|
|
|
|
|
|
|
def list_streams(self):
|
|
|
|
streams = self.kinesis_backend.list_streams()
|
|
|
|
|
|
|
|
return json.dumps({
|
|
|
|
"HasMoreStreams": False,
|
|
|
|
"StreamNames": [stream.stream_name for stream in streams],
|
|
|
|
})
|
|
|
|
|
|
|
|
def delete_stream(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
self.kinesis_backend.delete_stream(stream_name)
|
|
|
|
return ""
|
2014-11-26 20:49:21 -05:00
|
|
|
|
|
|
|
def get_shard_iterator(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
shard_id = self.parameters.get("ShardId")
|
|
|
|
shard_iterator_type = self.parameters.get("ShardIteratorType")
|
2017-02-23 21:37:43 -05:00
|
|
|
starting_sequence_number = self.parameters.get(
|
|
|
|
"StartingSequenceNumber")
|
2014-11-26 20:49:21 -05:00
|
|
|
|
|
|
|
shard_iterator = self.kinesis_backend.get_shard_iterator(
|
|
|
|
stream_name, shard_id, shard_iterator_type, starting_sequence_number,
|
|
|
|
)
|
|
|
|
|
|
|
|
return json.dumps({
|
|
|
|
"ShardIterator": shard_iterator
|
|
|
|
})
|
|
|
|
|
|
|
|
def get_records(self):
|
|
|
|
shard_iterator = self.parameters.get("ShardIterator")
|
|
|
|
limit = self.parameters.get("Limit")
|
|
|
|
|
2017-02-23 21:37:43 -05:00
|
|
|
next_shard_iterator, records = self.kinesis_backend.get_records(
|
|
|
|
shard_iterator, limit)
|
2014-11-26 20:49:21 -05:00
|
|
|
|
|
|
|
return json.dumps({
|
|
|
|
"NextShardIterator": next_shard_iterator,
|
|
|
|
"Records": [record.to_json() for record in records]
|
|
|
|
})
|
|
|
|
|
|
|
|
def put_record(self):
|
2015-10-30 09:59:57 -04:00
|
|
|
if self.is_firehose:
|
|
|
|
return self.firehose_put_record()
|
2014-11-26 20:49:21 -05:00
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
partition_key = self.parameters.get("PartitionKey")
|
|
|
|
explicit_hash_key = self.parameters.get("ExplicitHashKey")
|
2017-02-23 21:37:43 -05:00
|
|
|
sequence_number_for_ordering = self.parameters.get(
|
|
|
|
"SequenceNumberForOrdering")
|
2014-11-26 20:49:21 -05:00
|
|
|
data = self.parameters.get("Data")
|
|
|
|
|
|
|
|
sequence_number, shard_id = self.kinesis_backend.put_record(
|
|
|
|
stream_name, partition_key, explicit_hash_key, sequence_number_for_ordering, data
|
|
|
|
)
|
|
|
|
|
|
|
|
return json.dumps({
|
|
|
|
"SequenceNumber": sequence_number,
|
|
|
|
"ShardId": shard_id,
|
|
|
|
})
|
2015-10-30 09:59:57 -04:00
|
|
|
|
2015-11-12 10:05:02 +01:00
|
|
|
def put_records(self):
|
|
|
|
if self.is_firehose:
|
2015-11-17 18:18:02 +01:00
|
|
|
return self.put_record_batch()
|
2015-11-12 10:05:02 +01:00
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
records = self.parameters.get("Records")
|
|
|
|
|
|
|
|
response = self.kinesis_backend.put_records(
|
|
|
|
stream_name, records
|
|
|
|
)
|
|
|
|
|
|
|
|
return json.dumps(response)
|
|
|
|
|
2015-12-05 11:13:34 +00:00
|
|
|
def split_shard(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
shard_to_split = self.parameters.get("ShardToSplit")
|
|
|
|
new_starting_hash_key = self.parameters.get("NewStartingHashKey")
|
2017-02-23 21:37:43 -05:00
|
|
|
self.kinesis_backend.split_shard(
|
2015-12-05 11:13:34 +00:00
|
|
|
stream_name, shard_to_split, new_starting_hash_key
|
|
|
|
)
|
|
|
|
return ""
|
|
|
|
|
|
|
|
def merge_shards(self):
|
|
|
|
stream_name = self.parameters.get("StreamName")
|
|
|
|
shard_to_merge = self.parameters.get("ShardToMerge")
|
|
|
|
adjacent_shard_to_merge = self.parameters.get("AdjacentShardToMerge")
|
2017-02-23 21:37:43 -05:00
|
|
|
self.kinesis_backend.merge_shards(
|
2015-12-05 11:13:34 +00:00
|
|
|
stream_name, shard_to_merge, adjacent_shard_to_merge
|
|
|
|
)
|
|
|
|
return ""
|
|
|
|
|
2015-10-30 09:59:57 -04:00
|
|
|
''' Firehose '''
|
2017-02-23 21:37:43 -05:00
|
|
|
|
2015-10-30 09:59:57 -04:00
|
|
|
def create_delivery_stream(self):
|
|
|
|
stream_name = self.parameters['DeliveryStreamName']
|
2017-02-23 21:37:43 -05:00
|
|
|
redshift_config = self.parameters.get(
|
|
|
|
'RedshiftDestinationConfiguration')
|
2015-10-30 09:59:57 -04:00
|
|
|
|
|
|
|
if redshift_config:
|
|
|
|
redshift_s3_config = redshift_config['S3Configuration']
|
|
|
|
stream_kwargs = {
|
|
|
|
'redshift_username': redshift_config['Username'],
|
|
|
|
'redshift_password': redshift_config['Password'],
|
|
|
|
'redshift_jdbc_url': redshift_config['ClusterJDBCURL'],
|
|
|
|
'redshift_role_arn': redshift_config['RoleARN'],
|
|
|
|
'redshift_copy_command': redshift_config['CopyCommand'],
|
|
|
|
|
|
|
|
'redshift_s3_role_arn': redshift_s3_config['RoleARN'],
|
|
|
|
'redshift_s3_bucket_arn': redshift_s3_config['BucketARN'],
|
|
|
|
'redshift_s3_prefix': redshift_s3_config['Prefix'],
|
2015-10-30 14:18:29 -04:00
|
|
|
'redshift_s3_compression_format': redshift_s3_config.get('CompressionFormat'),
|
2015-10-30 09:59:57 -04:00
|
|
|
'redshift_s3_buffering_hings': redshift_s3_config['BufferingHints'],
|
|
|
|
}
|
2016-10-09 20:23:56 -04:00
|
|
|
else:
|
|
|
|
# S3 Config
|
|
|
|
s3_config = self.parameters['S3DestinationConfiguration']
|
|
|
|
stream_kwargs = {
|
|
|
|
's3_role_arn': s3_config['RoleARN'],
|
|
|
|
's3_bucket_arn': s3_config['BucketARN'],
|
|
|
|
's3_prefix': s3_config['Prefix'],
|
|
|
|
's3_compression_format': s3_config.get('CompressionFormat'),
|
|
|
|
's3_buffering_hings': s3_config['BufferingHints'],
|
|
|
|
}
|
2017-02-23 21:37:43 -05:00
|
|
|
stream = self.kinesis_backend.create_delivery_stream(
|
|
|
|
stream_name, **stream_kwargs)
|
2015-10-30 09:59:57 -04:00
|
|
|
return json.dumps({
|
|
|
|
'DeliveryStreamARN': stream.arn
|
|
|
|
})
|
|
|
|
|
|
|
|
def describe_delivery_stream(self):
|
|
|
|
stream_name = self.parameters["DeliveryStreamName"]
|
|
|
|
stream = self.kinesis_backend.get_delivery_stream(stream_name)
|
|
|
|
return json.dumps(stream.to_dict())
|
|
|
|
|
|
|
|
def list_delivery_streams(self):
|
|
|
|
streams = self.kinesis_backend.list_delivery_streams()
|
|
|
|
return json.dumps({
|
|
|
|
"DeliveryStreamNames": [
|
|
|
|
stream.name for stream in streams
|
|
|
|
],
|
|
|
|
"HasMoreDeliveryStreams": False
|
|
|
|
})
|
|
|
|
|
|
|
|
def delete_delivery_stream(self):
|
|
|
|
stream_name = self.parameters['DeliveryStreamName']
|
|
|
|
self.kinesis_backend.delete_delivery_stream(stream_name)
|
|
|
|
return json.dumps({})
|
|
|
|
|
|
|
|
def firehose_put_record(self):
|
|
|
|
stream_name = self.parameters['DeliveryStreamName']
|
|
|
|
record_data = self.parameters['Record']['Data']
|
|
|
|
|
2017-02-23 21:37:43 -05:00
|
|
|
record = self.kinesis_backend.put_firehose_record(
|
|
|
|
stream_name, record_data)
|
2015-10-30 09:59:57 -04:00
|
|
|
return json.dumps({
|
|
|
|
"RecordId": record.record_id,
|
|
|
|
})
|
|
|
|
|
|
|
|
def put_record_batch(self):
|
|
|
|
stream_name = self.parameters['DeliveryStreamName']
|
|
|
|
records = self.parameters['Records']
|
|
|
|
|
|
|
|
request_responses = []
|
|
|
|
for record in records:
|
2017-02-23 21:37:43 -05:00
|
|
|
record_response = self.kinesis_backend.put_firehose_record(
|
|
|
|
stream_name, record['Data'])
|
2015-10-30 09:59:57 -04:00
|
|
|
request_responses.append({
|
|
|
|
"RecordId": record_response.record_id
|
|
|
|
})
|
|
|
|
return json.dumps({
|
|
|
|
"FailedPutCount": 0,
|
|
|
|
"RequestResponses": request_responses,
|
|
|
|
})
|
2015-12-03 11:53:57 +00:00
|
|
|
|
|
|
|
def add_tags_to_stream(self):
|
|
|
|
stream_name = self.parameters.get('StreamName')
|
|
|
|
tags = self.parameters.get('Tags')
|
|
|
|
self.kinesis_backend.add_tags_to_stream(stream_name, tags)
|
|
|
|
return json.dumps({})
|
|
|
|
|
|
|
|
def list_tags_for_stream(self):
|
|
|
|
stream_name = self.parameters.get('StreamName')
|
|
|
|
exclusive_start_tag_key = self.parameters.get('ExclusiveStartTagKey')
|
|
|
|
limit = self.parameters.get('Limit')
|
2017-02-23 21:37:43 -05:00
|
|
|
response = self.kinesis_backend.list_tags_for_stream(
|
|
|
|
stream_name, exclusive_start_tag_key, limit)
|
2015-12-03 11:53:57 +00:00
|
|
|
return json.dumps(response)
|
|
|
|
|
|
|
|
def remove_tags_from_stream(self):
|
|
|
|
stream_name = self.parameters.get('StreamName')
|
|
|
|
tag_keys = self.parameters.get('TagKeys')
|
|
|
|
self.kinesis_backend.remove_tags_from_stream(stream_name, tag_keys)
|
|
|
|
return json.dumps({})
|