Merge pull request #2470 from JackDanger/jackdanger/kinesis-extended-s3-config

Supporting more modern Firehose features
This commit is contained in:
Mike Grima 2019-10-12 15:38:57 -07:00 committed by GitHub
commit f14d5ca6cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 117 additions and 35 deletions

View File

@ -222,12 +222,8 @@ class DeliveryStream(BaseModel):
self.redshift_role_arn = stream_kwargs.get('redshift_role_arn') self.redshift_role_arn = stream_kwargs.get('redshift_role_arn')
self.redshift_copy_command = stream_kwargs.get('redshift_copy_command') self.redshift_copy_command = stream_kwargs.get('redshift_copy_command')
self.s3_role_arn = stream_kwargs.get('s3_role_arn') self.s3_config = stream_kwargs.get('s3_config')
self.s3_bucket_arn = stream_kwargs.get('s3_bucket_arn') self.extended_s3_config = stream_kwargs.get('extended_s3_config')
self.s3_prefix = stream_kwargs.get('s3_prefix')
self.s3_compression_format = stream_kwargs.get(
's3_compression_format', 'UNCOMPRESSED')
self.s3_buffering_hings = stream_kwargs.get('s3_buffering_hings')
self.redshift_s3_role_arn = stream_kwargs.get('redshift_s3_role_arn') self.redshift_s3_role_arn = stream_kwargs.get('redshift_s3_role_arn')
self.redshift_s3_bucket_arn = stream_kwargs.get( self.redshift_s3_bucket_arn = stream_kwargs.get(
@ -235,8 +231,8 @@ class DeliveryStream(BaseModel):
self.redshift_s3_prefix = stream_kwargs.get('redshift_s3_prefix') self.redshift_s3_prefix = stream_kwargs.get('redshift_s3_prefix')
self.redshift_s3_compression_format = stream_kwargs.get( self.redshift_s3_compression_format = stream_kwargs.get(
'redshift_s3_compression_format', 'UNCOMPRESSED') 'redshift_s3_compression_format', 'UNCOMPRESSED')
self.redshift_s3_buffering_hings = stream_kwargs.get( self.redshift_s3_buffering_hints = stream_kwargs.get(
'redshift_s3_buffering_hings') 'redshift_s3_buffering_hints')
self.records = [] self.records = []
self.status = 'ACTIVE' self.status = 'ACTIVE'
@ -248,16 +244,15 @@ class DeliveryStream(BaseModel):
return 'arn:aws:firehose:us-east-1:123456789012:deliverystream/{0}'.format(self.name) return 'arn:aws:firehose:us-east-1:123456789012:deliverystream/{0}'.format(self.name)
def destinations_to_dict(self): def destinations_to_dict(self):
if self.s3_role_arn: if self.s3_config:
return [{ return [{
'DestinationId': 'string', 'DestinationId': 'string',
'S3DestinationDescription': { 'S3DestinationDescription': self.s3_config,
'RoleARN': self.s3_role_arn, }]
'BucketARN': self.s3_bucket_arn, elif self.extended_s3_config:
'Prefix': self.s3_prefix, return [{
'BufferingHints': self.s3_buffering_hings, 'DestinationId': 'string',
'CompressionFormat': self.s3_compression_format, 'ExtendedS3DestinationDescription': self.extended_s3_config,
}
}] }]
else: else:
return [{ return [{
@ -268,7 +263,7 @@ class DeliveryStream(BaseModel):
"RoleARN": self.redshift_role_arn, "RoleARN": self.redshift_role_arn,
"S3DestinationDescription": { "S3DestinationDescription": {
"BucketARN": self.redshift_s3_bucket_arn, "BucketARN": self.redshift_s3_bucket_arn,
"BufferingHints": self.redshift_s3_buffering_hings, "BufferingHints": self.redshift_s3_buffering_hints,
"CompressionFormat": self.redshift_s3_compression_format, "CompressionFormat": self.redshift_s3_compression_format,
"Prefix": self.redshift_s3_prefix, "Prefix": self.redshift_s3_prefix,
"RoleARN": self.redshift_s3_role_arn "RoleARN": self.redshift_s3_role_arn

View File

@ -149,6 +149,10 @@ class KinesisResponse(BaseResponse):
stream_name = self.parameters['DeliveryStreamName'] stream_name = self.parameters['DeliveryStreamName']
redshift_config = self.parameters.get( redshift_config = self.parameters.get(
'RedshiftDestinationConfiguration') 'RedshiftDestinationConfiguration')
s3_config = self.parameters.get(
'S3DestinationConfiguration')
extended_s3_config = self.parameters.get(
'ExtendedS3DestinationConfiguration')
if redshift_config: if redshift_config:
redshift_s3_config = redshift_config['S3Configuration'] redshift_s3_config = redshift_config['S3Configuration']
@ -163,18 +167,13 @@ class KinesisResponse(BaseResponse):
'redshift_s3_bucket_arn': redshift_s3_config['BucketARN'], 'redshift_s3_bucket_arn': redshift_s3_config['BucketARN'],
'redshift_s3_prefix': redshift_s3_config['Prefix'], 'redshift_s3_prefix': redshift_s3_config['Prefix'],
'redshift_s3_compression_format': redshift_s3_config.get('CompressionFormat'), 'redshift_s3_compression_format': redshift_s3_config.get('CompressionFormat'),
'redshift_s3_buffering_hings': redshift_s3_config['BufferingHints'], 'redshift_s3_buffering_hints': redshift_s3_config['BufferingHints'],
}
else:
# S3 Config
s3_config = self.parameters['S3DestinationConfiguration']
stream_kwargs = {
's3_role_arn': s3_config['RoleARN'],
's3_bucket_arn': s3_config['BucketARN'],
's3_prefix': s3_config['Prefix'],
's3_compression_format': s3_config.get('CompressionFormat'),
's3_buffering_hings': s3_config['BufferingHints'],
} }
elif s3_config:
stream_kwargs = {'s3_config': s3_config}
elif extended_s3_config:
stream_kwargs = {'extended_s3_config': extended_s3_config}
stream = self.kinesis_backend.create_delivery_stream( stream = self.kinesis_backend.create_delivery_stream(
stream_name, **stream_kwargs) stream_name, **stream_kwargs)
return json.dumps({ return json.dumps({

View File

@ -9,7 +9,41 @@ import sure # noqa
from moto import mock_kinesis from moto import mock_kinesis
def create_stream(client, stream_name): def create_s3_delivery_stream(client, stream_name):
return client.create_delivery_stream(
DeliveryStreamName=stream_name,
DeliveryStreamType="DirectPut",
ExtendedS3DestinationConfiguration={
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
'BucketARN': 'arn:aws:s3:::kinesis-test',
'Prefix': 'myFolder/',
'CompressionFormat': 'UNCOMPRESSED',
'DataFormatConversionConfiguration': {
'Enabled': True,
'InputFormatConfiguration': {
'Deserializer': {
'HiveJsonSerDe': {
},
},
},
'OutputFormatConfiguration': {
'Serializer': {
'ParquetSerDe': {
'Compression': 'SNAPPY',
},
},
},
'SchemaConfiguration': {
'DatabaseName': stream_name,
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
'TableName': 'outputTable',
},
},
})
def create_redshift_delivery_stream(client, stream_name):
return client.create_delivery_stream( return client.create_delivery_stream(
DeliveryStreamName=stream_name, DeliveryStreamName=stream_name,
RedshiftDestinationConfiguration={ RedshiftDestinationConfiguration={
@ -36,10 +70,10 @@ def create_stream(client, stream_name):
@mock_kinesis @mock_kinesis
def test_create_stream(): def test_create_redshift_delivery_stream():
client = boto3.client('firehose', region_name='us-east-1') client = boto3.client('firehose', region_name='us-east-1')
response = create_stream(client, 'stream1') response = create_redshift_delivery_stream(client, 'stream1')
stream_arn = response['DeliveryStreamARN'] stream_arn = response['DeliveryStreamARN']
response = client.describe_delivery_stream(DeliveryStreamName='stream1') response = client.describe_delivery_stream(DeliveryStreamName='stream1')
@ -82,6 +116,60 @@ def test_create_stream():
}) })
@mock_kinesis
def test_create_s3_delivery_stream():
client = boto3.client('firehose', region_name='us-east-1')
response = create_s3_delivery_stream(client, 'stream1')
stream_arn = response['DeliveryStreamARN']
response = client.describe_delivery_stream(DeliveryStreamName='stream1')
stream_description = response['DeliveryStreamDescription']
# Sure and Freezegun don't play nicely together
_ = stream_description.pop('CreateTimestamp')
_ = stream_description.pop('LastUpdateTimestamp')
stream_description.should.equal({
'DeliveryStreamName': 'stream1',
'DeliveryStreamARN': stream_arn,
'DeliveryStreamStatus': 'ACTIVE',
'VersionId': 'string',
'Destinations': [
{
'DestinationId': 'string',
'ExtendedS3DestinationDescription': {
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
'BucketARN': 'arn:aws:s3:::kinesis-test',
'Prefix': 'myFolder/',
'CompressionFormat': 'UNCOMPRESSED',
'DataFormatConversionConfiguration': {
'Enabled': True,
'InputFormatConfiguration': {
'Deserializer': {
'HiveJsonSerDe': {
},
},
},
'OutputFormatConfiguration': {
'Serializer': {
'ParquetSerDe': {
'Compression': 'SNAPPY',
},
},
},
'SchemaConfiguration': {
'DatabaseName': 'stream1',
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
'TableName': 'outputTable',
},
},
},
},
],
"HasMoreDestinations": False,
})
@mock_kinesis @mock_kinesis
def test_create_stream_without_redshift(): def test_create_stream_without_redshift():
client = boto3.client('firehose', region_name='us-east-1') client = boto3.client('firehose', region_name='us-east-1')
@ -145,8 +233,8 @@ def test_deescribe_non_existant_stream():
def test_list_and_delete_stream(): def test_list_and_delete_stream():
client = boto3.client('firehose', region_name='us-east-1') client = boto3.client('firehose', region_name='us-east-1')
create_stream(client, 'stream1') create_redshift_delivery_stream(client, 'stream1')
create_stream(client, 'stream2') create_redshift_delivery_stream(client, 'stream2')
set(client.list_delivery_streams()['DeliveryStreamNames']).should.equal( set(client.list_delivery_streams()['DeliveryStreamNames']).should.equal(
set(['stream1', 'stream2'])) set(['stream1', 'stream2']))
@ -161,7 +249,7 @@ def test_list_and_delete_stream():
def test_put_record(): def test_put_record():
client = boto3.client('firehose', region_name='us-east-1') client = boto3.client('firehose', region_name='us-east-1')
create_stream(client, 'stream1') create_redshift_delivery_stream(client, 'stream1')
client.put_record( client.put_record(
DeliveryStreamName='stream1', DeliveryStreamName='stream1',
Record={ Record={
@ -174,7 +262,7 @@ def test_put_record():
def test_put_record_batch(): def test_put_record_batch():
client = boto3.client('firehose', region_name='us-east-1') client = boto3.client('firehose', region_name='us-east-1')
create_stream(client, 'stream1') create_redshift_delivery_stream(client, 'stream1')
client.put_record_batch( client.put_record_batch(
DeliveryStreamName='stream1', DeliveryStreamName='stream1',
Records=[ Records=[