Fix firehose to work without Redshift config.

This commit is contained in:
Steve Pulec 2016-10-09 20:23:56 -04:00
parent 768a58671a
commit ddf2f5a754
3 changed files with 112 additions and 27 deletions

View File

@ -176,17 +176,23 @@ class FirehoseRecord(object):
class DeliveryStream(object):
def __init__(self, stream_name, **stream_kwargs):
self.name = stream_name
self.redshift_username = stream_kwargs['redshift_username']
self.redshift_password = stream_kwargs['redshift_password']
self.redshift_jdbc_url = stream_kwargs['redshift_jdbc_url']
self.redshift_role_arn = stream_kwargs['redshift_role_arn']
self.redshift_copy_command = stream_kwargs['redshift_copy_command']
self.redshift_username = stream_kwargs.get('redshift_username')
self.redshift_password = stream_kwargs.get('redshift_password')
self.redshift_jdbc_url = stream_kwargs.get('redshift_jdbc_url')
self.redshift_role_arn = stream_kwargs.get('redshift_role_arn')
self.redshift_copy_command = stream_kwargs.get('redshift_copy_command')
self.redshift_s3_role_arn = stream_kwargs['redshift_s3_role_arn']
self.redshift_s3_bucket_arn = stream_kwargs['redshift_s3_bucket_arn']
self.redshift_s3_prefix = stream_kwargs['redshift_s3_prefix']
self.s3_role_arn = stream_kwargs.get('s3_role_arn')
self.s3_bucket_arn = stream_kwargs.get('s3_bucket_arn')
self.s3_prefix = stream_kwargs.get('s3_prefix')
self.s3_compression_format = stream_kwargs.get('s3_compression_format', 'UNCOMPRESSED')
self.s3_buffering_hings = stream_kwargs.get('s3_buffering_hings')
self.redshift_s3_role_arn = stream_kwargs.get('redshift_s3_role_arn')
self.redshift_s3_bucket_arn = stream_kwargs.get('redshift_s3_bucket_arn')
self.redshift_s3_prefix = stream_kwargs.get('redshift_s3_prefix')
self.redshift_s3_compression_format = stream_kwargs.get('redshift_s3_compression_format', 'UNCOMPRESSED')
self.redshift_s3_buffering_hings = stream_kwargs['redshift_s3_buffering_hings']
self.redshift_s3_buffering_hings = stream_kwargs.get('redshift_s3_buffering_hings')
self.records = []
self.status = 'ACTIVE'
@ -197,6 +203,38 @@ class DeliveryStream(object):
def arn(self):
return 'arn:aws:firehose:us-east-1:123456789012:deliverystream/{0}'.format(self.name)
def destinations_to_dict(self):
if self.s3_role_arn:
return [{
'DestinationId': 'string',
'S3DestinationDescription': {
'RoleARN': self.s3_role_arn,
'BucketARN': self.s3_bucket_arn,
'Prefix': self.s3_prefix,
'BufferingHints': self.s3_buffering_hings,
'CompressionFormat': self.s3_compression_format,
}
}]
else:
return [{
"DestinationId": "string",
"RedshiftDestinationDescription": {
"ClusterJDBCURL": self.redshift_jdbc_url,
"CopyCommand": self.redshift_copy_command,
"RoleARN": self.redshift_role_arn,
"S3DestinationDescription": {
"BucketARN": self.redshift_s3_bucket_arn,
"BufferingHints": self.redshift_s3_buffering_hings,
"CompressionFormat": self.redshift_s3_compression_format,
"Prefix": self.redshift_s3_prefix,
"RoleARN": self.redshift_s3_role_arn
},
"Username": self.redshift_username,
},
}
]
def to_dict(self):
return {
"DeliveryStreamDescription": {
@ -204,24 +242,7 @@ class DeliveryStream(object):
"DeliveryStreamARN": self.arn,
"DeliveryStreamName": self.name,
"DeliveryStreamStatus": self.status,
"Destinations": [
{
"DestinationId": "string",
"RedshiftDestinationDescription": {
"ClusterJDBCURL": self.redshift_jdbc_url,
"CopyCommand": self.redshift_copy_command,
"RoleARN": self.redshift_role_arn,
"S3DestinationDescription": {
"BucketARN": self.redshift_s3_bucket_arn,
"BufferingHints": self.redshift_s3_buffering_hings,
"CompressionFormat": self.redshift_s3_compression_format,
"Prefix": self.redshift_s3_prefix,
"RoleARN": self.redshift_s3_role_arn
},
"Username": self.redshift_username,
},
}
],
"Destinations": self.destinations_to_dict(),
"HasMoreDestinations": False,
"LastUpdateTimestamp": time.mktime(self.last_updated.timetuple()),
"VersionId": "string",

View File

@ -139,6 +139,16 @@ class KinesisResponse(BaseResponse):
'redshift_s3_compression_format': redshift_s3_config.get('CompressionFormat'),
'redshift_s3_buffering_hings': redshift_s3_config['BufferingHints'],
}
else:
# S3 Config
s3_config = self.parameters['S3DestinationConfiguration']
stream_kwargs = {
's3_role_arn': s3_config['RoleARN'],
's3_bucket_arn': s3_config['BucketARN'],
's3_prefix': s3_config['Prefix'],
's3_compression_format': s3_config.get('CompressionFormat'),
's3_buffering_hings': s3_config['BufferingHints'],
}
stream = self.kinesis_backend.create_delivery_stream(stream_name, **stream_kwargs)
return json.dumps({
'DeliveryStreamARN': stream.arn

View File

@ -87,6 +87,60 @@ def test_create_stream():
})
@mock_kinesis
@freeze_time("2015-03-01")
def test_create_stream_without_redshift():
client = boto3.client('firehose', region_name='us-east-1')
response = client.create_delivery_stream(
DeliveryStreamName="stream1",
S3DestinationConfiguration={
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
'BucketARN': 'arn:aws:s3:::kinesis-test',
'Prefix': 'myFolder/',
'BufferingHints': {
'SizeInMBs': 123,
'IntervalInSeconds': 124
},
'CompressionFormat': 'UNCOMPRESSED',
}
)
stream_arn = response['DeliveryStreamARN']
response = client.describe_delivery_stream(DeliveryStreamName='stream1')
stream_description = response['DeliveryStreamDescription']
# Sure and Freezegun don't play nicely together
created = stream_description.pop('CreateTimestamp')
last_updated = stream_description.pop('LastUpdateTimestamp')
from dateutil.tz import tzlocal
assert created == datetime.datetime(2015, 3, 1, tzinfo=tzlocal())
assert last_updated == datetime.datetime(2015, 3, 1, tzinfo=tzlocal())
stream_description.should.equal({
'DeliveryStreamName': 'stream1',
'DeliveryStreamARN': stream_arn,
'DeliveryStreamStatus': 'ACTIVE',
'VersionId': 'string',
'Destinations': [
{
'DestinationId': 'string',
'S3DestinationDescription': {
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
'BucketARN': 'arn:aws:s3:::kinesis-test',
'Prefix': 'myFolder/',
'BufferingHints': {
'SizeInMBs': 123,
'IntervalInSeconds': 124
},
'CompressionFormat': 'UNCOMPRESSED',
}
},
],
"HasMoreDestinations": False,
})
@mock_kinesis
@freeze_time("2015-03-01")
def test_deescribe_non_existant_stream():