UnboundLocalError is thrown when creating a Firehose delivery stream. (#4144)
This commit is contained in:
parent
6fedb25a2a
commit
6ae37046f0
@ -350,6 +350,8 @@ class DeliveryStream(BaseModel):
|
||||
"redshift_s3_buffering_hints"
|
||||
)
|
||||
|
||||
self.elasticsearch_config = stream_kwargs.get("elasticsearch_config")
|
||||
|
||||
self.records = []
|
||||
self.status = "ACTIVE"
|
||||
self.created_at = datetime.datetime.utcnow()
|
||||
@ -373,6 +375,31 @@ class DeliveryStream(BaseModel):
|
||||
"ExtendedS3DestinationDescription": self.extended_s3_config,
|
||||
}
|
||||
]
|
||||
elif self.elasticsearch_config:
|
||||
return [
|
||||
{
|
||||
"DestinationId": "string",
|
||||
"ElasticsearchDestinationDescription": {
|
||||
"RoleARN": self.elasticsearch_config.get("RoleARN"),
|
||||
"DomainARN": self.elasticsearch_config.get("DomainARN"),
|
||||
"ClusterEndpoint": self.elasticsearch_config.get(
|
||||
"ClusterEndpoint"
|
||||
),
|
||||
"IndexName": self.elasticsearch_config.get("IndexName"),
|
||||
"TypeName": self.elasticsearch_config.get("TypeName"),
|
||||
"IndexRotationPeriod": self.elasticsearch_config.get(
|
||||
"IndexRotationPeriod"
|
||||
),
|
||||
"BufferingHints": self.elasticsearch_config.get(
|
||||
"BufferingHints"
|
||||
),
|
||||
"RetryOptions": self.elasticsearch_config.get("RetryOptions"),
|
||||
"S3DestinationDescription": self.elasticsearch_config.get(
|
||||
"S3Configuration"
|
||||
),
|
||||
},
|
||||
}
|
||||
]
|
||||
else:
|
||||
return [
|
||||
{
|
||||
|
@ -172,6 +172,9 @@ class KinesisResponse(BaseResponse):
|
||||
redshift_config = self.parameters.get("RedshiftDestinationConfiguration")
|
||||
s3_config = self.parameters.get("S3DestinationConfiguration")
|
||||
extended_s3_config = self.parameters.get("ExtendedS3DestinationConfiguration")
|
||||
elasticsearch_config = self.parameters.get(
|
||||
"ElasticsearchDestinationConfiguration"
|
||||
)
|
||||
|
||||
if redshift_config:
|
||||
redshift_s3_config = redshift_config["S3Configuration"]
|
||||
@ -193,6 +196,10 @@ class KinesisResponse(BaseResponse):
|
||||
stream_kwargs = {"s3_config": s3_config}
|
||||
elif extended_s3_config:
|
||||
stream_kwargs = {"extended_s3_config": extended_s3_config}
|
||||
elif elasticsearch_config:
|
||||
stream_kwargs = {"elasticsearch_config": elasticsearch_config}
|
||||
else:
|
||||
stream_kwargs = {}
|
||||
|
||||
stream = self.kinesis_backend.create_delivery_stream(
|
||||
stream_name, **stream_kwargs
|
||||
|
@ -62,6 +62,31 @@ def create_redshift_delivery_stream(client, stream_name):
|
||||
)
|
||||
|
||||
|
||||
def create_elasticsearch_delivery_stream(client, stream_name):
|
||||
return client.create_delivery_stream(
|
||||
DeliveryStreamName=stream_name,
|
||||
DeliveryStreamType="DirectPut",
|
||||
ElasticsearchDestinationConfiguration={
|
||||
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(ACCOUNT_ID),
|
||||
"DomainARN": "arn:aws:es:::domain/kinesis-test",
|
||||
"IndexName": "myIndex",
|
||||
"TypeName": "UNCOMPRESSED",
|
||||
"IndexRotationPeriod": "NoRotation",
|
||||
"BufferingHints": {"IntervalInSeconds": 123, "SizeInMBs": 123},
|
||||
"RetryOptions": {"DurationInSeconds": 123},
|
||||
"S3Configuration": {
|
||||
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
|
||||
ACCOUNT_ID
|
||||
),
|
||||
"BucketARN": "arn:aws:s3:::kinesis-test",
|
||||
"Prefix": "myFolder/",
|
||||
"BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124},
|
||||
"CompressionFormat": "UNCOMPRESSED",
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
def test_create_redshift_delivery_stream():
|
||||
client = boto3.client("firehose", region_name="us-east-1")
|
||||
@ -171,6 +196,59 @@ def test_create_s3_delivery_stream():
|
||||
)
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
def test_create_elasticsearch_delivery_stream():
|
||||
client = boto3.client("firehose", region_name="us-east-1")
|
||||
|
||||
response = create_elasticsearch_delivery_stream(client, "stream1")
|
||||
stream_arn = response["DeliveryStreamARN"]
|
||||
|
||||
response = client.describe_delivery_stream(DeliveryStreamName="stream1")
|
||||
stream_description = response["DeliveryStreamDescription"]
|
||||
|
||||
# Sure and Freezegun don't play nicely together
|
||||
_ = stream_description.pop("CreateTimestamp")
|
||||
_ = stream_description.pop("LastUpdateTimestamp")
|
||||
|
||||
stream_description.should.equal(
|
||||
{
|
||||
"DeliveryStreamName": "stream1",
|
||||
"DeliveryStreamARN": stream_arn,
|
||||
"DeliveryStreamStatus": "ACTIVE",
|
||||
"VersionId": "string",
|
||||
"Destinations": [
|
||||
{
|
||||
"DestinationId": "string",
|
||||
"ElasticsearchDestinationDescription": {
|
||||
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
|
||||
ACCOUNT_ID
|
||||
),
|
||||
"DomainARN": "arn:aws:es:::domain/kinesis-test",
|
||||
"IndexName": "myIndex",
|
||||
"TypeName": "UNCOMPRESSED",
|
||||
"IndexRotationPeriod": "NoRotation",
|
||||
"BufferingHints": {"IntervalInSeconds": 123, "SizeInMBs": 123},
|
||||
"RetryOptions": {"DurationInSeconds": 123},
|
||||
"S3DestinationDescription": {
|
||||
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
|
||||
ACCOUNT_ID
|
||||
),
|
||||
"BucketARN": "arn:aws:s3:::kinesis-test",
|
||||
"Prefix": "myFolder/",
|
||||
"BufferingHints": {
|
||||
"SizeInMBs": 123,
|
||||
"IntervalInSeconds": 124,
|
||||
},
|
||||
"CompressionFormat": "UNCOMPRESSED",
|
||||
},
|
||||
},
|
||||
}
|
||||
],
|
||||
"HasMoreDestinations": False,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
def test_create_stream_without_redshift():
|
||||
client = boto3.client("firehose", region_name="us-east-1")
|
||||
|
Loading…
Reference in New Issue
Block a user