diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index e7a389981..965f3367a 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -222,12 +222,8 @@ class DeliveryStream(BaseModel): self.redshift_role_arn = stream_kwargs.get('redshift_role_arn') self.redshift_copy_command = stream_kwargs.get('redshift_copy_command') - self.s3_role_arn = stream_kwargs.get('s3_role_arn') - self.s3_bucket_arn = stream_kwargs.get('s3_bucket_arn') - self.s3_prefix = stream_kwargs.get('s3_prefix') - self.s3_compression_format = stream_kwargs.get( - 's3_compression_format', 'UNCOMPRESSED') - self.s3_buffering_hings = stream_kwargs.get('s3_buffering_hings') + self.s3_config = stream_kwargs.get('s3_config') + self.extended_s3_config = stream_kwargs.get('extended_s3_config') self.redshift_s3_role_arn = stream_kwargs.get('redshift_s3_role_arn') self.redshift_s3_bucket_arn = stream_kwargs.get( @@ -235,8 +231,8 @@ class DeliveryStream(BaseModel): self.redshift_s3_prefix = stream_kwargs.get('redshift_s3_prefix') self.redshift_s3_compression_format = stream_kwargs.get( 'redshift_s3_compression_format', 'UNCOMPRESSED') - self.redshift_s3_buffering_hings = stream_kwargs.get( - 'redshift_s3_buffering_hings') + self.redshift_s3_buffering_hints = stream_kwargs.get( + 'redshift_s3_buffering_hints') self.records = [] self.status = 'ACTIVE' @@ -248,16 +244,15 @@ class DeliveryStream(BaseModel): return 'arn:aws:firehose:us-east-1:123456789012:deliverystream/{0}'.format(self.name) def destinations_to_dict(self): - if self.s3_role_arn: + if self.s3_config: return [{ 'DestinationId': 'string', - 'S3DestinationDescription': { - 'RoleARN': self.s3_role_arn, - 'BucketARN': self.s3_bucket_arn, - 'Prefix': self.s3_prefix, - 'BufferingHints': self.s3_buffering_hings, - 'CompressionFormat': self.s3_compression_format, - } + 'S3DestinationDescription': self.s3_config, + }] + elif self.extended_s3_config: + return [{ + 'DestinationId': 'string', + 'ExtendedS3DestinationDescription': self.extended_s3_config, }] else: return [{ @@ -268,7 +263,7 @@ class DeliveryStream(BaseModel): "RoleARN": self.redshift_role_arn, "S3DestinationDescription": { "BucketARN": self.redshift_s3_bucket_arn, - "BufferingHints": self.redshift_s3_buffering_hings, + "BufferingHints": self.redshift_s3_buffering_hints, "CompressionFormat": self.redshift_s3_compression_format, "Prefix": self.redshift_s3_prefix, "RoleARN": self.redshift_s3_role_arn diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index 3a81bd9f4..aa2b8c225 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -149,6 +149,10 @@ class KinesisResponse(BaseResponse): stream_name = self.parameters['DeliveryStreamName'] redshift_config = self.parameters.get( 'RedshiftDestinationConfiguration') + s3_config = self.parameters.get( + 'S3DestinationConfiguration') + extended_s3_config = self.parameters.get( + 'ExtendedS3DestinationConfiguration') if redshift_config: redshift_s3_config = redshift_config['S3Configuration'] @@ -163,18 +167,13 @@ class KinesisResponse(BaseResponse): 'redshift_s3_bucket_arn': redshift_s3_config['BucketARN'], 'redshift_s3_prefix': redshift_s3_config['Prefix'], 'redshift_s3_compression_format': redshift_s3_config.get('CompressionFormat'), - 'redshift_s3_buffering_hings': redshift_s3_config['BufferingHints'], - } - else: - # S3 Config - s3_config = self.parameters['S3DestinationConfiguration'] - stream_kwargs = { - 's3_role_arn': s3_config['RoleARN'], - 's3_bucket_arn': s3_config['BucketARN'], - 's3_prefix': s3_config['Prefix'], - 's3_compression_format': s3_config.get('CompressionFormat'), - 's3_buffering_hings': s3_config['BufferingHints'], + 'redshift_s3_buffering_hints': redshift_s3_config['BufferingHints'], } + elif s3_config: + stream_kwargs = {'s3_config': s3_config} + elif extended_s3_config: + stream_kwargs = {'extended_s3_config': extended_s3_config} + stream = self.kinesis_backend.create_delivery_stream( stream_name, **stream_kwargs) return json.dumps({ diff --git a/tests/test_kinesis/test_firehose.py b/tests/test_kinesis/test_firehose.py index 6ab46c6f9..91c1038d3 100644 --- a/tests/test_kinesis/test_firehose.py +++ b/tests/test_kinesis/test_firehose.py @@ -9,7 +9,41 @@ import sure # noqa from moto import mock_kinesis -def create_stream(client, stream_name): +def create_s3_delivery_stream(client, stream_name): + return client.create_delivery_stream( + DeliveryStreamName=stream_name, + DeliveryStreamType="DirectPut", + ExtendedS3DestinationConfiguration={ + 'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role', + 'BucketARN': 'arn:aws:s3:::kinesis-test', + 'Prefix': 'myFolder/', + 'CompressionFormat': 'UNCOMPRESSED', + 'DataFormatConversionConfiguration': { + 'Enabled': True, + 'InputFormatConfiguration': { + 'Deserializer': { + 'HiveJsonSerDe': { + }, + }, + }, + 'OutputFormatConfiguration': { + 'Serializer': { + 'ParquetSerDe': { + 'Compression': 'SNAPPY', + }, + }, + }, + 'SchemaConfiguration': { + 'DatabaseName': stream_name, + 'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role', + 'TableName': 'outputTable', + }, + }, + }) + + + +def create_redshift_delivery_stream(client, stream_name): return client.create_delivery_stream( DeliveryStreamName=stream_name, RedshiftDestinationConfiguration={ @@ -36,10 +70,10 @@ def create_stream(client, stream_name): @mock_kinesis -def test_create_stream(): +def test_create_redshift_delivery_stream(): client = boto3.client('firehose', region_name='us-east-1') - response = create_stream(client, 'stream1') + response = create_redshift_delivery_stream(client, 'stream1') stream_arn = response['DeliveryStreamARN'] response = client.describe_delivery_stream(DeliveryStreamName='stream1') @@ -82,6 +116,60 @@ def test_create_stream(): }) +@mock_kinesis +def test_create_s3_delivery_stream(): + client = boto3.client('firehose', region_name='us-east-1') + + response = create_s3_delivery_stream(client, 'stream1') + stream_arn = response['DeliveryStreamARN'] + + response = client.describe_delivery_stream(DeliveryStreamName='stream1') + stream_description = response['DeliveryStreamDescription'] + + # Sure and Freezegun don't play nicely together + _ = stream_description.pop('CreateTimestamp') + _ = stream_description.pop('LastUpdateTimestamp') + + stream_description.should.equal({ + 'DeliveryStreamName': 'stream1', + 'DeliveryStreamARN': stream_arn, + 'DeliveryStreamStatus': 'ACTIVE', + 'VersionId': 'string', + 'Destinations': [ + { + 'DestinationId': 'string', + 'ExtendedS3DestinationDescription': { + 'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role', + 'BucketARN': 'arn:aws:s3:::kinesis-test', + 'Prefix': 'myFolder/', + 'CompressionFormat': 'UNCOMPRESSED', + 'DataFormatConversionConfiguration': { + 'Enabled': True, + 'InputFormatConfiguration': { + 'Deserializer': { + 'HiveJsonSerDe': { + }, + }, + }, + 'OutputFormatConfiguration': { + 'Serializer': { + 'ParquetSerDe': { + 'Compression': 'SNAPPY', + }, + }, + }, + 'SchemaConfiguration': { + 'DatabaseName': 'stream1', + 'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role', + 'TableName': 'outputTable', + }, + }, + }, + }, + ], + "HasMoreDestinations": False, + }) + @mock_kinesis def test_create_stream_without_redshift(): client = boto3.client('firehose', region_name='us-east-1') @@ -145,8 +233,8 @@ def test_deescribe_non_existant_stream(): def test_list_and_delete_stream(): client = boto3.client('firehose', region_name='us-east-1') - create_stream(client, 'stream1') - create_stream(client, 'stream2') + create_redshift_delivery_stream(client, 'stream1') + create_redshift_delivery_stream(client, 'stream2') set(client.list_delivery_streams()['DeliveryStreamNames']).should.equal( set(['stream1', 'stream2'])) @@ -161,7 +249,7 @@ def test_list_and_delete_stream(): def test_put_record(): client = boto3.client('firehose', region_name='us-east-1') - create_stream(client, 'stream1') + create_redshift_delivery_stream(client, 'stream1') client.put_record( DeliveryStreamName='stream1', Record={ @@ -174,7 +262,7 @@ def test_put_record(): def test_put_record_batch(): client = boto3.client('firehose', region_name='us-east-1') - create_stream(client, 'stream1') + create_redshift_delivery_stream(client, 'stream1') client.put_record_batch( DeliveryStreamName='stream1', Records=[