diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index 0b01881a4..fed8214d2 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -1,5 +1,8 @@ from __future__ import unicode_literals +import datetime +import time + import boto.kinesis from moto.compat import OrderedDict from moto.core import BaseBackend @@ -124,10 +127,78 @@ class Stream(object): } +class FirehoseRecord(object): + def __init__(self, record_data): + self.record_id = 12345678 + self.record_data = record_data + + +class DeliveryStream(object): + def __init__(self, stream_name, **stream_kwargs): + self.name = stream_name + self.redshift_username = stream_kwargs['redshift_username'] + self.redshift_password = stream_kwargs['redshift_password'] + self.redshift_jdbc_url = stream_kwargs['redshift_jdbc_url'] + self.redshift_role_arn = stream_kwargs['redshift_role_arn'] + self.redshift_copy_command = stream_kwargs['redshift_copy_command'] + + self.redshift_s3_role_arn = stream_kwargs['redshift_s3_role_arn'] + self.redshift_s3_bucket_arn = stream_kwargs['redshift_s3_bucket_arn'] + self.redshift_s3_prefix = stream_kwargs['redshift_s3_prefix'] + self.redshift_s3_compression_format = stream_kwargs['redshift_s3_compression_format'] + self.redshift_s3_buffering_hings = stream_kwargs['redshift_s3_buffering_hings'] + + self.records = [] + self.status = 'ACTIVE' + self.create_at = datetime.datetime.utcnow() + self.last_updated = datetime.datetime.utcnow() + + @property + def arn(self): + return 'arn:aws:firehose:us-east-1:123456789012:deliverystream/{0}'.format(self.name) + + def to_dict(self): + return { + "DeliveryStreamDescription": { + "CreateTimestamp": time.mktime(self.create_at.timetuple()), + "DeliveryStreamARN": self.arn, + "DeliveryStreamName": self.name, + "DeliveryStreamStatus": self.status, + "Destinations": [ + { + "DestinationId": "string", + "RedshiftDestinationDescription": { + "ClusterJDBCURL": self.redshift_jdbc_url, + "CopyCommand": self.redshift_copy_command, + "RoleARN": self.redshift_role_arn, + "S3DestinationDescription": { + "BucketARN": self.redshift_s3_bucket_arn, + "BufferingHints": self.redshift_s3_buffering_hings, + "CompressionFormat": self.redshift_s3_compression_format, + "Prefix": self.redshift_s3_prefix, + "RoleARN": self.redshift_s3_role_arn + }, + "Username": self.redshift_username, + }, + } + ], + "HasMoreDestinations": False, + "LastUpdateTimestamp": time.mktime(self.last_updated.timetuple()), + "VersionId": "string", + } + } + + def put_record(self, record_data): + record = FirehoseRecord(record_data) + self.records.append(record) + return record + + class KinesisBackend(BaseBackend): def __init__(self): self.streams = {} + self.delivery_streams = {} def create_stream(self, stream_name, shard_count, region): stream = Stream(stream_name, shard_count, region) @@ -180,6 +251,26 @@ class KinesisBackend(BaseBackend): return sequence_number, shard_id + ''' Firehose ''' + def create_delivery_stream(self, stream_name, **stream_kwargs): + stream = DeliveryStream(stream_name, **stream_kwargs) + self.delivery_streams[stream_name] = stream + return stream + + def get_delivery_stream(self, stream_name): + return self.delivery_streams[stream_name] + + def list_delivery_streams(self): + return self.delivery_streams.values() + + def delete_delivery_stream(self, stream_name): + self.delivery_streams.pop(stream_name) + + def put_firehose_record(self, stream_name, record_data): + stream = self.get_delivery_stream(stream_name) + record = stream.put_record(record_data) + return record + kinesis_backends = {} for region in boto.kinesis.regions(): kinesis_backends[region.name] = KinesisBackend() diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index 4b5f13729..839bf73e1 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -16,6 +16,11 @@ class KinesisResponse(BaseResponse): def kinesis_backend(self): return kinesis_backends[self.region] + @property + def is_firehose(self): + host = self.headers.get('host') or self.headers['Host'] + return host.startswith('firehose') + def create_stream(self): stream_name = self.parameters.get('StreamName') shard_count = self.parameters.get('ShardCount') @@ -67,6 +72,8 @@ class KinesisResponse(BaseResponse): }) def put_record(self): + if self.is_firehose: + return self.firehose_put_record() stream_name = self.parameters.get("StreamName") partition_key = self.parameters.get("PartitionKey") explicit_hash_key = self.parameters.get("ExplicitHashKey") @@ -81,3 +88,71 @@ class KinesisResponse(BaseResponse): "SequenceNumber": sequence_number, "ShardId": shard_id, }) + + ''' Firehose ''' + def create_delivery_stream(self): + stream_name = self.parameters['DeliveryStreamName'] + redshift_config = self.parameters.get('RedshiftDestinationConfiguration') + + if redshift_config: + redshift_s3_config = redshift_config['S3Configuration'] + stream_kwargs = { + 'redshift_username': redshift_config['Username'], + 'redshift_password': redshift_config['Password'], + 'redshift_jdbc_url': redshift_config['ClusterJDBCURL'], + 'redshift_role_arn': redshift_config['RoleARN'], + 'redshift_copy_command': redshift_config['CopyCommand'], + + 'redshift_s3_role_arn': redshift_s3_config['RoleARN'], + 'redshift_s3_bucket_arn': redshift_s3_config['BucketARN'], + 'redshift_s3_prefix': redshift_s3_config['Prefix'], + 'redshift_s3_compression_format': redshift_s3_config['CompressionFormat'], + 'redshift_s3_buffering_hings': redshift_s3_config['BufferingHints'], + } + stream = self.kinesis_backend.create_delivery_stream(stream_name, **stream_kwargs) + return json.dumps({ + 'DeliveryStreamARN': stream.arn + }) + + def describe_delivery_stream(self): + stream_name = self.parameters["DeliveryStreamName"] + stream = self.kinesis_backend.get_delivery_stream(stream_name) + return json.dumps(stream.to_dict()) + + def list_delivery_streams(self): + streams = self.kinesis_backend.list_delivery_streams() + return json.dumps({ + "DeliveryStreamNames": [ + stream.name for stream in streams + ], + "HasMoreDeliveryStreams": False + }) + + def delete_delivery_stream(self): + stream_name = self.parameters['DeliveryStreamName'] + self.kinesis_backend.delete_delivery_stream(stream_name) + return json.dumps({}) + + def firehose_put_record(self): + stream_name = self.parameters['DeliveryStreamName'] + record_data = self.parameters['Record']['Data'] + + record = self.kinesis_backend.put_firehose_record(stream_name, record_data) + return json.dumps({ + "RecordId": record.record_id, + }) + + def put_record_batch(self): + stream_name = self.parameters['DeliveryStreamName'] + records = self.parameters['Records'] + + request_responses = [] + for record in records: + record_response = self.kinesis_backend.put_firehose_record(stream_name, record['Data']) + request_responses.append({ + "RecordId": record_response.record_id + }) + return json.dumps({ + "FailedPutCount": 0, + "RequestResponses": request_responses, + }) diff --git a/moto/kinesis/urls.py b/moto/kinesis/urls.py index 5de870c29..a8d15eecd 100644 --- a/moto/kinesis/urls.py +++ b/moto/kinesis/urls.py @@ -3,6 +3,7 @@ from .responses import KinesisResponse url_bases = [ "https?://kinesis.(.+).amazonaws.com", + "https?://firehose.(.+).amazonaws.com", ] url_paths = { diff --git a/tests/test_kinesis/test_firehose.py b/tests/test_kinesis/test_firehose.py new file mode 100644 index 000000000..c22562847 --- /dev/null +++ b/tests/test_kinesis/test_firehose.py @@ -0,0 +1,132 @@ +from __future__ import unicode_literals + +import datetime + +import boto3 +from freezegun import freeze_time +import sure # noqa + +from moto import mock_kinesis + + +def create_stream(client, stream_name): + return client.create_delivery_stream( + DeliveryStreamName=stream_name, + RedshiftDestinationConfiguration={ + 'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role', + 'ClusterJDBCURL': 'jdbc:redshift://host.amazonaws.com:5439/database', + 'CopyCommand': { + 'DataTableName': 'outputTable', + 'CopyOptions': "CSV DELIMITER ',' NULL '\\0'" + }, + 'Username': 'username', + 'Password': 'password', + 'S3Configuration': { + 'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role', + 'BucketARN': 'arn:aws:s3:::kinesis-test', + 'Prefix': 'myFolder/', + 'BufferingHints': { + 'SizeInMBs': 123, + 'IntervalInSeconds': 124 + }, + 'CompressionFormat': 'UNCOMPRESSED', + } + } + ) + + +@mock_kinesis +@freeze_time("2015-03-01") +def test_create_stream(): + client = boto3.client('firehose', region_name='us-east-1') + + response = create_stream(client, 'stream1') + stream_arn = response['DeliveryStreamARN'] + + response = client.describe_delivery_stream(DeliveryStreamName='stream1') + stream_description = response['DeliveryStreamDescription'] + + # Sure and Freezegun don't play nicely together + created = stream_description.pop('CreateTimestamp') + last_updated = stream_description.pop('LastUpdateTimestamp') + from dateutil.tz import tzlocal + assert created == datetime.datetime(2015, 3, 1, tzinfo=tzlocal()) + assert last_updated == datetime.datetime(2015, 3, 1, tzinfo=tzlocal()) + + stream_description.should.equal({ + 'DeliveryStreamName': 'stream1', + 'DeliveryStreamARN': stream_arn, + 'DeliveryStreamStatus': 'ACTIVE', + 'VersionId': 'string', + 'Destinations': [ + { + 'DestinationId': 'string', + 'RedshiftDestinationDescription': { + 'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role', + 'ClusterJDBCURL': 'jdbc:redshift://host.amazonaws.com:5439/database', + 'CopyCommand': { + 'DataTableName': 'outputTable', + 'CopyOptions': "CSV DELIMITER ',' NULL '\\0'" + }, + 'Username': 'username', + 'S3DestinationDescription': { + 'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role', + 'BucketARN': 'arn:aws:s3:::kinesis-test', + 'Prefix': 'myFolder/', + 'BufferingHints': { + 'SizeInMBs': 123, + 'IntervalInSeconds': 124 + }, + 'CompressionFormat': 'UNCOMPRESSED', + } + } + }, + ], + "HasMoreDestinations": False, + }) + + +@mock_kinesis +@freeze_time("2015-03-01") +def test_list_and_delete_stream(): + client = boto3.client('firehose', region_name='us-east-1') + + create_stream(client, 'stream1') + create_stream(client, 'stream2') + + set(client.list_delivery_streams()['DeliveryStreamNames']).should.equal(set(['stream1', 'stream2'])) + + client.delete_delivery_stream(DeliveryStreamName='stream1') + + set(client.list_delivery_streams()['DeliveryStreamNames']).should.equal(set(['stream2'])) + + +@mock_kinesis +def test_put_record(): + client = boto3.client('firehose', region_name='us-east-1') + + create_stream(client, 'stream1') + client.put_record( + DeliveryStreamName='stream1', + Record={ + 'Data': 'some data' + } + ) + + +@mock_kinesis +def test_put_record_batch(): + client = boto3.client('firehose', region_name='us-east-1') + + create_stream(client, 'stream1') + client.put_record_batch( + DeliveryStreamName='stream1', + Records=[ + { + 'Data': 'some data1' + }, + { + 'Data': 'some data2' + }, + ] + )