firest draft of firehose support.

This commit is contained in:
Steve Pulec 2015-10-30 09:59:57 -04:00
parent 96cf5eabc4
commit ac1bb336c8
4 changed files with 298 additions and 0 deletions

View File

@ -1,5 +1,8 @@
from __future__ import unicode_literals
import datetime
import time
import boto.kinesis
from moto.compat import OrderedDict
from moto.core import BaseBackend
@ -124,10 +127,78 @@ class Stream(object):
}
class FirehoseRecord(object):
def __init__(self, record_data):
self.record_id = 12345678
self.record_data = record_data
class DeliveryStream(object):
def __init__(self, stream_name, **stream_kwargs):
self.name = stream_name
self.redshift_username = stream_kwargs['redshift_username']
self.redshift_password = stream_kwargs['redshift_password']
self.redshift_jdbc_url = stream_kwargs['redshift_jdbc_url']
self.redshift_role_arn = stream_kwargs['redshift_role_arn']
self.redshift_copy_command = stream_kwargs['redshift_copy_command']
self.redshift_s3_role_arn = stream_kwargs['redshift_s3_role_arn']
self.redshift_s3_bucket_arn = stream_kwargs['redshift_s3_bucket_arn']
self.redshift_s3_prefix = stream_kwargs['redshift_s3_prefix']
self.redshift_s3_compression_format = stream_kwargs['redshift_s3_compression_format']
self.redshift_s3_buffering_hings = stream_kwargs['redshift_s3_buffering_hings']
self.records = []
self.status = 'ACTIVE'
self.create_at = datetime.datetime.utcnow()
self.last_updated = datetime.datetime.utcnow()
@property
def arn(self):
return 'arn:aws:firehose:us-east-1:123456789012:deliverystream/{0}'.format(self.name)
def to_dict(self):
return {
"DeliveryStreamDescription": {
"CreateTimestamp": time.mktime(self.create_at.timetuple()),
"DeliveryStreamARN": self.arn,
"DeliveryStreamName": self.name,
"DeliveryStreamStatus": self.status,
"Destinations": [
{
"DestinationId": "string",
"RedshiftDestinationDescription": {
"ClusterJDBCURL": self.redshift_jdbc_url,
"CopyCommand": self.redshift_copy_command,
"RoleARN": self.redshift_role_arn,
"S3DestinationDescription": {
"BucketARN": self.redshift_s3_bucket_arn,
"BufferingHints": self.redshift_s3_buffering_hings,
"CompressionFormat": self.redshift_s3_compression_format,
"Prefix": self.redshift_s3_prefix,
"RoleARN": self.redshift_s3_role_arn
},
"Username": self.redshift_username,
},
}
],
"HasMoreDestinations": False,
"LastUpdateTimestamp": time.mktime(self.last_updated.timetuple()),
"VersionId": "string",
}
}
def put_record(self, record_data):
record = FirehoseRecord(record_data)
self.records.append(record)
return record
class KinesisBackend(BaseBackend):
def __init__(self):
self.streams = {}
self.delivery_streams = {}
def create_stream(self, stream_name, shard_count, region):
stream = Stream(stream_name, shard_count, region)
@ -180,6 +251,26 @@ class KinesisBackend(BaseBackend):
return sequence_number, shard_id
''' Firehose '''
def create_delivery_stream(self, stream_name, **stream_kwargs):
stream = DeliveryStream(stream_name, **stream_kwargs)
self.delivery_streams[stream_name] = stream
return stream
def get_delivery_stream(self, stream_name):
return self.delivery_streams[stream_name]
def list_delivery_streams(self):
return self.delivery_streams.values()
def delete_delivery_stream(self, stream_name):
self.delivery_streams.pop(stream_name)
def put_firehose_record(self, stream_name, record_data):
stream = self.get_delivery_stream(stream_name)
record = stream.put_record(record_data)
return record
kinesis_backends = {}
for region in boto.kinesis.regions():
kinesis_backends[region.name] = KinesisBackend()

View File

@ -16,6 +16,10 @@ class KinesisResponse(BaseResponse):
def kinesis_backend(self):
return kinesis_backends[self.region]
@property
def is_firehose(self):
return self.headers['host'].startswith('firehose')
def create_stream(self):
stream_name = self.parameters.get('StreamName')
shard_count = self.parameters.get('ShardCount')
@ -67,6 +71,8 @@ class KinesisResponse(BaseResponse):
})
def put_record(self):
if self.is_firehose:
return self.firehose_put_record()
stream_name = self.parameters.get("StreamName")
partition_key = self.parameters.get("PartitionKey")
explicit_hash_key = self.parameters.get("ExplicitHashKey")
@ -81,3 +87,71 @@ class KinesisResponse(BaseResponse):
"SequenceNumber": sequence_number,
"ShardId": shard_id,
})
''' Firehose '''
def create_delivery_stream(self):
stream_name = self.parameters['DeliveryStreamName']
redshift_config = self.parameters.get('RedshiftDestinationConfiguration')
if redshift_config:
redshift_s3_config = redshift_config['S3Configuration']
stream_kwargs = {
'redshift_username': redshift_config['Username'],
'redshift_password': redshift_config['Password'],
'redshift_jdbc_url': redshift_config['ClusterJDBCURL'],
'redshift_role_arn': redshift_config['RoleARN'],
'redshift_copy_command': redshift_config['CopyCommand'],
'redshift_s3_role_arn': redshift_s3_config['RoleARN'],
'redshift_s3_bucket_arn': redshift_s3_config['BucketARN'],
'redshift_s3_prefix': redshift_s3_config['Prefix'],
'redshift_s3_compression_format': redshift_s3_config['CompressionFormat'],
'redshift_s3_buffering_hings': redshift_s3_config['BufferingHints'],
}
stream = self.kinesis_backend.create_delivery_stream(stream_name, **stream_kwargs)
return json.dumps({
'DeliveryStreamARN': stream.arn
})
def describe_delivery_stream(self):
stream_name = self.parameters["DeliveryStreamName"]
stream = self.kinesis_backend.get_delivery_stream(stream_name)
return json.dumps(stream.to_dict())
def list_delivery_streams(self):
streams = self.kinesis_backend.list_delivery_streams()
return json.dumps({
"DeliveryStreamNames": [
stream.name for stream in streams
],
"HasMoreDeliveryStreams": False
})
def delete_delivery_stream(self):
stream_name = self.parameters['DeliveryStreamName']
self.kinesis_backend.delete_delivery_stream(stream_name)
return json.dumps({})
def firehose_put_record(self):
stream_name = self.parameters['DeliveryStreamName']
record_data = self.parameters['Record']['Data']
record = self.kinesis_backend.put_firehose_record(stream_name, record_data)
return json.dumps({
"RecordId": record.record_id,
})
def put_record_batch(self):
stream_name = self.parameters['DeliveryStreamName']
records = self.parameters['Records']
request_responses = []
for record in records:
record_response = self.kinesis_backend.put_firehose_record(stream_name, record['Data'])
request_responses.append({
"RecordId": record_response.record_id
})
return json.dumps({
"FailedPutCount": 0,
"RequestResponses": request_responses,
})

View File

@ -3,6 +3,7 @@ from .responses import KinesisResponse
url_bases = [
"https?://kinesis.(.+).amazonaws.com",
"https?://firehose.(.+).amazonaws.com",
]
url_paths = {

View File

@ -0,0 +1,132 @@
from __future__ import unicode_literals
import datetime
import boto3
from freezegun import freeze_time
import sure # noqa
from moto import mock_kinesis
def create_stream(client, stream_name):
return client.create_delivery_stream(
DeliveryStreamName=stream_name,
RedshiftDestinationConfiguration={
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
'ClusterJDBCURL': 'jdbc:redshift://host.amazonaws.com:5439/database',
'CopyCommand': {
'DataTableName': 'outputTable',
'CopyOptions': "CSV DELIMITER ',' NULL '\\0'"
},
'Username': 'username',
'Password': 'password',
'S3Configuration': {
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
'BucketARN': 'arn:aws:s3:::kinesis-test',
'Prefix': 'myFolder/',
'BufferingHints': {
'SizeInMBs': 123,
'IntervalInSeconds': 124
},
'CompressionFormat': 'UNCOMPRESSED',
}
}
)
@mock_kinesis
@freeze_time("2015-03-01")
def test_create_stream():
client = boto3.client('firehose', region_name='us-east-1')
response = create_stream(client, 'stream1')
stream_arn = response['DeliveryStreamARN']
response = client.describe_delivery_stream(DeliveryStreamName='stream1')
stream_description = response['DeliveryStreamDescription']
# Sure and Freezegun don't play nicely together
created = stream_description.pop('CreateTimestamp')
last_updated = stream_description.pop('LastUpdateTimestamp')
from dateutil.tz import tzlocal
assert created == datetime.datetime(2015, 3, 1, tzinfo=tzlocal())
assert last_updated == datetime.datetime(2015, 3, 1, tzinfo=tzlocal())
stream_description.should.equal({
'DeliveryStreamName': 'stream1',
'DeliveryStreamARN': stream_arn,
'DeliveryStreamStatus': 'ACTIVE',
'VersionId': 'string',
'Destinations': [
{
'DestinationId': 'string',
'RedshiftDestinationDescription': {
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
'ClusterJDBCURL': 'jdbc:redshift://host.amazonaws.com:5439/database',
'CopyCommand': {
'DataTableName': 'outputTable',
'CopyOptions': "CSV DELIMITER ',' NULL '\\0'"
},
'Username': 'username',
'S3DestinationDescription': {
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
'BucketARN': 'arn:aws:s3:::kinesis-test',
'Prefix': 'myFolder/',
'BufferingHints': {
'SizeInMBs': 123,
'IntervalInSeconds': 124
},
'CompressionFormat': 'UNCOMPRESSED',
}
}
},
],
"HasMoreDestinations": False,
})
@mock_kinesis
@freeze_time("2015-03-01")
def test_list_and_delete_stream():
client = boto3.client('firehose', region_name='us-east-1')
create_stream(client, 'stream1')
create_stream(client, 'stream2')
set(client.list_delivery_streams()['DeliveryStreamNames']).should.equal({'stream1', 'stream2'})
client.delete_delivery_stream(DeliveryStreamName='stream1')
set(client.list_delivery_streams()['DeliveryStreamNames']).should.equal({'stream2'})
@mock_kinesis
def test_put_record():
client = boto3.client('firehose', region_name='us-east-1')
create_stream(client, 'stream1')
client.put_record(
DeliveryStreamName='stream1',
Record={
'Data': 'some data'
}
)
@mock_kinesis
def test_put_record_batch():
client = boto3.client('firehose', region_name='us-east-1')
create_stream(client, 'stream1')
client.put_record_batch(
DeliveryStreamName='stream1',
Records=[
{
'Data': 'some data1'
},
{
'Data': 'some data2'
},
]
)