Add some basic Firehose service APIs (#4246)

This commit is contained in:
kbalk 2021-09-14 05:39:39 -04:00 committed by GitHub
parent 9859d66ff8
commit b89b0039e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1793 additions and 353 deletions

View File

@ -4525,18 +4525,22 @@
<details>
<summary>0% implemented</summary>
- [ ] create_delivery_stream
- [ ] delete_delivery_stream
- [ ] describe_delivery_stream
- [ ] list_delivery_streams
- [ ] list_tags_for_delivery_stream
- [ ] put_record
- [ ] put_record_batch
- [ ] can_paginate
- [X] create_delivery_stream
- [X] delete_delivery_stream
- [X] describe_delivery_stream
- [ ] generate_presigned_url
- [ ] get_paginator
- [ ] get_waiter
- [X] list_delivery_streams
- [X] list_tags_for_delivery_stream
- [X] put_record
- [X] put_record_batch
- [ ] start_delivery_stream_encryption
- [ ] stop_delivery_stream_encryption
- [ ] tag_delivery_stream
- [ ] untag_delivery_stream
- [ ] update_destination
- [X] tag_delivery_stream
- [X] untag_delivery_stream
- [X] update_destination
</details>
## fis

View File

@ -62,6 +62,8 @@ Currently implemented Services:
+---------------------------+-----------------------+------------------------------------+
| EMR | @mock_emr | core endpoints done |
+---------------------------+-----------------------+------------------------------------+
| Firehose | @mock_firehose | basic endpoints done |
+---------------------------+-----------------------+------------------------------------+
| Forecast | @mock_forecast | basic endpoints done |
+---------------------------+-----------------------+------------------------------------+
| Glacier | @mock_glacier | core endpoints done |

View File

@ -75,6 +75,7 @@ mock_elbv2 = lazy_load(".elbv2", "mock_elbv2")
mock_emr = lazy_load(".emr", "mock_emr")
mock_emr_deprecated = lazy_load(".emr", "mock_emr_deprecated")
mock_events = lazy_load(".events", "mock_events")
mock_firehose = lazy_load(".firehose", "mock_firehose")
mock_forecast = lazy_load(".forecast", "mock_forecast")
mock_glacier = lazy_load(".glacier", "mock_glacier")
mock_glacier_deprecated = lazy_load(".glacier", "mock_glacier_deprecated")

View File

@ -47,6 +47,7 @@ backend_url_patterns = [
("emr", re.compile("https?://(.+).elasticmapreduce.amazonaws.com")),
("emr", re.compile("https?://elasticmapreduce.(.+).amazonaws.com")),
("events", re.compile("https?://events\\.(.+)\\.amazonaws\\.com")),
("firehose", re.compile("https?://firehose\\.(.+)\\.amazonaws\\.com")),
("forecast", re.compile("https?://forecast\\.(.+)\\.amazonaws\\.com")),
("glacier", re.compile("https?://glacier.(.+).amazonaws.com")),
("glue", re.compile("https?://glue\\.(.+)\\.amazonaws\\.com")),
@ -54,7 +55,6 @@ backend_url_patterns = [
("iot", re.compile("https?://iot\\.(.+)\\.amazonaws\\.com")),
("iot-data", re.compile("https?://data.iot.(.+).amazonaws.com")),
("kinesis", re.compile("https?://kinesis\\.(.+)\\.amazonaws\\.com")),
("kinesis", re.compile("https?://firehose\\.(.+)\\.amazonaws\\.com")),
("kinesisvideo", re.compile("https?://kinesisvideo.(.+).amazonaws.com")),
(
"kinesis-video-archived-media",

View File

@ -0,0 +1,5 @@
"""Firehose module initialization; sets value for base decorator."""
from .models import firehose_backends
from ..core.models import base_decorator
mock_firehose = base_decorator(firehose_backends)

View File

@ -0,0 +1,56 @@
"""Exceptions raised by the Firehose service."""
from moto.core.exceptions import JsonRESTError
class ConcurrentModificationException(JsonRESTError):
"""Existing config has a version ID that does not match given ID."""
code = 400
def __init__(self, message):
super().__init__("ConcurrentModificationException", message)
class InvalidArgumentException(JsonRESTError):
"""The specified input parameter has a value that is not valid."""
code = 400
def __init__(self, message):
super().__init__("InvalidArgumentException", message)
class LimitExceededException(JsonRESTError):
"""You have already reached the limit for a requested resource."""
code = 400
def __init__(self, message):
super().__init__("LimitExceededException", message)
class ResourceInUseException(JsonRESTError):
"""The resource is already in use and not available for this operation."""
code = 400
def __init__(self, message):
super().__init__("ResourceInUseException", message)
class ResourceNotFoundException(JsonRESTError):
"""The specified resource could not be found."""
code = 400
def __init__(self, message):
super().__init__("ResourceNotFoundException", message)
class ValidationException(JsonRESTError):
"""The tag key or tag value is not valid."""
code = 400
def __init__(self, message):
super().__init__("ValidationException", message)

635
moto/firehose/models.py Normal file
View File

@ -0,0 +1,635 @@
"""FirehoseBackend class with methods for supported APIs.
Incomplete list of unfinished items:
- The create_delivery_stream() argument
DeliveryStreamEncryptionConfigurationInput is not supported.
- Better validation of delivery destination parameters, e.g.,
validation of the url for an http endpoint (boto3 does this),
- Better handling of the put_record_batch() API. Not only is
the existing logic bare bones, but for the ElasticSearch and
RedShift destinations, the data is just ignored.
- put_record_batch() handling of errors is minimal and no errors
are reported back to the user. Instead an exception is raised.
- put_record(), put_record_batch() always set "Encrypted" to False.
"""
from base64 import b64decode
from datetime import datetime
from time import time
from uuid import uuid4
import warnings
import requests
from boto3 import Session
from moto.core import BaseBackend, BaseModel
from moto.core import ACCOUNT_ID
from moto.firehose.exceptions import (
ConcurrentModificationException,
InvalidArgumentException,
LimitExceededException,
ResourceInUseException,
ResourceNotFoundException,
ValidationException,
)
from moto.core.utils import get_random_hex
from moto.s3 import s3_backend
from moto.utilities.tagging_service import TaggingService
MAX_TAGS_PER_DELIVERY_STREAM = 50
DESTINATION_TYPES_TO_NAMES = {
"s3": "S3",
"extended_s3": "ExtendedS3",
"http_endpoint": "HttpEndpoint",
"elasticsearch": "Elasticsearch",
"redshift": "Redshift",
"splunk": "Splunk", # Unimplemented
}
def find_destination_config_in_args(api_args):
"""Return (config_arg, config_name) tuple for destination config.
Determines which destination config(s) have been specified. The
alternative is to use a bunch of 'if' statements to check each
destination configuration. If more than one destination config is
specified, than an exception is raised.
A logical name for the destination type is returned along with the
destination config as it's useful way to compare current and replacement
destinations.
"""
destination_names = DESTINATION_TYPES_TO_NAMES.keys()
configs = []
for arg_name, arg_value in api_args.items():
# Ignore arguments that are not destination configs.
if "_destination" not in arg_name:
continue
# If the destination config value is non-null, save it.
name = arg_name.split("_destination")[0]
if name in destination_names and arg_value:
configs.append((DESTINATION_TYPES_TO_NAMES[name], arg_value))
# Only a single destination configuration is allowed.
if len(configs) > 1:
raise InvalidArgumentException(
"Exactly one destination configuration is supported for a Firehose"
)
return configs[0]
def create_s3_destination_config(extended_s3_destination_config):
"""Return dict with selected fields copied from ExtendedS3 config.
When an ExtendedS3 config is chosen, AWS tacks on a S3 config as
well. When the same field names for S3 and ExtendedS3 exists,
the ExtendedS3 fields are copied to the added S3 destination.
"""
fields_not_needed = [
"S3BackupMode",
"S3Description",
"DataFormatconversionConfiguration",
"DynamicPartitionConfiguration",
]
destination = {}
for field, value in extended_s3_destination_config.items():
if field in fields_not_needed:
continue
destination[field] = value
return destination
class DeliveryStream(
BaseModel
): # pylint: disable=too-few-public-methods,too-many-instance-attributes
"""Represents a delivery stream, its source and destination configs."""
STATES = {"CREATING", "ACTIVE", "CREATING_FAILED"}
MAX_STREAMS_PER_REGION = 50
def __init__(
self,
region,
delivery_stream_name,
delivery_stream_type,
kinesis_stream_source_configuration,
destination_name,
destination_config,
): # pylint: disable=too-many-arguments
self.delivery_stream_status = "CREATING"
self.delivery_stream_name = delivery_stream_name
self.delivery_stream_type = (
delivery_stream_type if delivery_stream_type else "DirectPut"
)
self.source = kinesis_stream_source_configuration
self.destinations = [
{
"destination_id": "destinationId-000000000001",
destination_name: destination_config,
}
]
if destination_name == "ExtendedS3":
# Add a S3 destination as well, minus a few ExtendedS3 fields.
self.destinations[0]["S3"] = create_s3_destination_config(
destination_config
)
elif "S3Configuration" in destination_config:
# S3Configuration becomes S3DestinationDescription for the
# other destinations.
self.destinations[0][destination_name][
"S3DestinationDescription"
] = destination_config["S3Configuration"]
del self.destinations[0][destination_name]["S3Configuration"]
self.delivery_stream_status = "ACTIVE"
self.delivery_stream_arn = f"arn:aws:firehose:{region}:{ACCOUNT_ID}:/delivery_stream/{delivery_stream_name}"
self.create_timestamp = str(datetime.utcnow())
self.version_id = "1" # Used to track updates of destination configs
# I believe boto3 only adds this field after an update ...
self.last_update_timestamp = str(datetime.utcnow())
class FirehoseBackend(BaseBackend):
"""Implementation of Firehose APIs."""
def __init__(self, region_name=None):
self.region_name = region_name
self.delivery_streams = {}
self.tagger = TaggingService()
def lookup_name_from_arn(self, arn):
"""Given an ARN, return the associated delivery stream name."""
# TODO - need to test
return self.delivery_streams.get(arn.split("/")[-1])
def reset(self):
"""Re-initializes all attributes for this instance."""
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def create_delivery_stream(
self,
region,
delivery_stream_name,
delivery_stream_type,
kinesis_stream_source_configuration,
delivery_stream_encryption_configuration_input,
s3_destination_configuration,
extended_s3_destination_configuration,
redshift_destination_configuration,
elasticsearch_destination_configuration,
splunk_destination_configuration,
http_endpoint_destination_configuration,
tags,
): # pylint: disable=too-many-arguments,too-many-locals,unused-argument
"""Create a Kinesis Data Firehose delivery stream."""
(destination_name, destination_config) = find_destination_config_in_args(
locals()
)
if delivery_stream_name in self.delivery_streams:
raise ResourceInUseException(
f"Firehose {delivery_stream_name} under accountId {ACCOUNT_ID} "
f"already exists"
)
if len(self.delivery_streams) == DeliveryStream.MAX_STREAMS_PER_REGION:
raise LimitExceededException(
f"You have already consumed your firehose quota of "
f"{DeliveryStream.MAX_STREAMS_PER_REGION} hoses. Firehose "
f"names: {list(self.delivery_streams.keys())}"
)
# Rule out situations that are not yet implemented.
if delivery_stream_encryption_configuration_input:
warnings.warn(
"A delivery stream with server-side encryption enabled is not "
"yet implemented"
)
if destination_name == "Splunk":
warnings.warn("A Splunk destination delivery stream is not yet implemented")
if (
kinesis_stream_source_configuration
and delivery_stream_type != "KinesisStreamAsSource"
):
raise InvalidArgumentException(
"KinesisSourceStreamConfig is only applicable for "
"KinesisStreamAsSource stream type"
)
# Validate the tags before proceeding.
errmsg = self.tagger.validate_tags(tags or [])
if errmsg:
raise ValidationException(errmsg)
# Create a DeliveryStream instance that will be stored and indexed
# by delivery stream name. This instance will update the state and
# create the ARN.
delivery_stream = DeliveryStream(
region,
delivery_stream_name,
delivery_stream_type,
kinesis_stream_source_configuration,
destination_name,
destination_config,
)
self.tagger.tag_resource(delivery_stream.delivery_stream_arn, tags or [])
self.delivery_streams[delivery_stream_name] = delivery_stream
return self.delivery_streams[delivery_stream_name].delivery_stream_arn
def delete_delivery_stream(
self, delivery_stream_name, allow_force_delete=False
): # pylint: disable=unused-argument
"""Delete a delivery stream and its data.
AllowForceDelete option is ignored as we only superficially
apply state.
"""
delivery_stream = self.delivery_streams.get(delivery_stream_name)
if not delivery_stream:
raise ResourceNotFoundException(
f"Firehose {delivery_stream_name} under account {ACCOUNT_ID} "
f"not found."
)
self.tagger.delete_all_tags_for_resource(delivery_stream.delivery_stream_arn)
delivery_stream.delivery_stream_status = "DELETING"
self.delivery_streams.pop(delivery_stream_name)
def describe_delivery_stream(
self, delivery_stream_name, limit, exclusive_start_destination_id,
): # pylint: disable=unused-argument
"""Return description of specified delivery stream and its status.
Note: the 'limit' and 'exclusive_start_destination_id' parameters
are not currently processed/implemented.
"""
delivery_stream = self.delivery_streams.get(delivery_stream_name)
if not delivery_stream:
raise ResourceNotFoundException(
f"Firehose {delivery_stream_name} under account {ACCOUNT_ID} "
f"not found."
)
result = {"DeliveryStreamDescription": {"HasMoreDestinations": False}}
for attribute, attribute_value in vars(delivery_stream).items():
if not attribute_value:
continue
# Convert from attribute's snake case to camel case for outgoing
# JSON.
name = "".join([x.capitalize() for x in attribute.split("_")])
# Fooey ... always an exception to the rule:
if name == "DeliveryStreamArn":
name = "DeliveryStreamARN"
if name != "Destinations":
if name == "Source":
result["DeliveryStreamDescription"][name] = {
"KinesisStreamSourceDescription": attribute_value
}
else:
result["DeliveryStreamDescription"][name] = attribute_value
continue
result["DeliveryStreamDescription"]["Destinations"] = []
for destination in attribute_value:
description = {}
for key, value in destination.items():
if key == "destination_id":
description["DestinationId"] = value
else:
description[f"{key}DestinationDescription"] = value
result["DeliveryStreamDescription"]["Destinations"].append(description)
return result
def list_delivery_streams(
self, limit, delivery_stream_type, exclusive_start_delivery_stream_name
):
"""Return list of delivery streams in alphabetic order of names."""
result = {"DeliveryStreamNames": [], "HasMoreDeliveryStreams": False}
if not self.delivery_streams:
return result
# If delivery_stream_type is specified, filter out any stream that's
# not of that type.
stream_list = self.delivery_streams.keys()
if delivery_stream_type:
stream_list = [
x
for x in stream_list
if self.delivery_streams[x].delivery_stream_type == delivery_stream_type
]
# The list is sorted alphabetically, not alphanumerically.
sorted_list = sorted(stream_list)
# Determine the limit or number of names to return in the list.
limit = limit or DeliveryStream.MAX_STREAMS_PER_REGION
# If a starting delivery stream name is given, find the index into
# the sorted list, then add one to get the name following it. If the
# exclusive_start_delivery_stream_name doesn't exist, it's ignored.
start = 0
if exclusive_start_delivery_stream_name:
if self.delivery_streams.get(exclusive_start_delivery_stream_name):
start = sorted_list.index(exclusive_start_delivery_stream_name) + 1
result["DeliveryStreamNames"] = sorted_list[start : start + limit]
if len(sorted_list) > (start + limit):
result["HasMoreDeliveryStreams"] = True
return result
def list_tags_for_delivery_stream(
self, delivery_stream_name, exclusive_start_tag_key, limit,
):
"""Return list of tags."""
result = {"Tags": [], "HasMoreTags": False}
delivery_stream = self.delivery_streams.get(delivery_stream_name)
if not delivery_stream:
raise ResourceNotFoundException(
f"Firehose {delivery_stream_name} under account {ACCOUNT_ID} "
f"not found."
)
tags = self.tagger.list_tags_for_resource(delivery_stream.delivery_stream_arn)[
"Tags"
]
keys = self.tagger.extract_tag_names(tags)
# If a starting tag is given and can be found, find the index into
# tags, then add one to get the tag following it.
start = 0
if exclusive_start_tag_key:
if exclusive_start_tag_key in keys:
start = keys.index(exclusive_start_tag_key) + 1
limit = limit or MAX_TAGS_PER_DELIVERY_STREAM
result["Tags"] = tags[start : start + limit]
if len(tags) > (start + limit):
result["HasMoreTags"] = True
return result
def put_record(self, delivery_stream_name, record):
"""Write a single data record into a Kinesis Data firehose stream."""
result = self.put_record_batch(delivery_stream_name, [record])
return {
"RecordId": result["RequestResponses"][0]["RecordId"],
"Encrypted": False,
}
@staticmethod
def put_http_records(http_destination, records):
"""Put records to a HTTP destination."""
# Mostly copied from localstack
url = http_destination["EndpointConfiguration"]["Url"]
headers = {"Content-Type": "application/json"}
record_to_send = {
"requestId": str(uuid4()),
"timestamp": int(time()),
"records": [{"data": record["Data"]} for record in records],
}
try:
requests.post(url, json=record_to_send, headers=headers)
except Exception as exc:
# This could be better ...
raise RuntimeError(
"Firehose PutRecord(Batch) to HTTP destination failed"
) from exc
return [{"RecordId": str(uuid4())} for _ in range(len(records))]
@staticmethod
def _format_s3_object_path(delivery_stream_name, version_id, prefix):
"""Return a S3 object path in the expected format."""
# Taken from LocalStack's firehose logic, with minor changes.
# See https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#s3-object-name
# Path prefix pattern: myApp/YYYY/MM/DD/HH/
# Object name pattern:
# DeliveryStreamName-DeliveryStreamVersion-YYYY-MM-DD-HH-MM-SS-RandomString
prefix = f"{prefix}{'' if prefix.endswith('/') else '/'}"
now = datetime.utcnow()
return (
f"{prefix}{now.strftime('%Y/%m/%d/%H')}/"
f"{delivery_stream_name}-{version_id}-"
f"{now.strftime('%Y-%m-%d-%H-%M-%S')}-{get_random_hex()}"
)
def put_s3_records(self, delivery_stream_name, version_id, s3_destination, records):
"""Put records to a ExtendedS3 or S3 destination."""
# Taken from LocalStack's firehose logic, with minor changes.
bucket_name = s3_destination["BucketARN"].split(":")[-1]
prefix = s3_destination.get("Prefix", "")
object_path = self._format_s3_object_path(
delivery_stream_name, version_id, prefix
)
batched_data = b"".join([b64decode(r["Data"]) for r in records])
try:
s3_backend.put_object(bucket_name, object_path, batched_data)
except Exception as exc:
# This could be better ...
raise RuntimeError(
"Firehose PutRecord(Batch to S3 destination failed"
) from exc
return [{"RecordId": str(uuid4())} for _ in range(len(records))]
def put_record_batch(self, delivery_stream_name, records):
"""Write multiple data records into a Kinesis Data firehose stream."""
delivery_stream = self.delivery_streams.get(delivery_stream_name)
if not delivery_stream:
raise ResourceNotFoundException(
f"Firehose {delivery_stream_name} under account {ACCOUNT_ID} "
f"not found."
)
request_responses = []
for destination in delivery_stream.destinations:
if "ExtendedS3" in destination:
# ExtendedS3 will be handled like S3,but in the future
# this will probably need to be revisited. This destination
# must be listed before S3 otherwise both destinations will
# be processed instead of just ExtendedS3.
request_responses = self.put_s3_records(
delivery_stream_name,
delivery_stream.version_id,
destination["ExtendedS3"],
records,
)
elif "S3" in destination:
request_responses = self.put_s3_records(
delivery_stream_name,
delivery_stream.version_id,
destination["S3"],
records,
)
elif "HttpEndpoint" in destination:
request_responses = self.put_http_records(
destination["HttpEndpoint"], records
)
elif "Elasticsearch" in destination or "Redshift" in destination:
# This isn't implmented as these services aren't implemented,
# so ignore the data, but return a "proper" response.
request_responses = [
{"RecordId": str(uuid4())} for _ in range(len(records))
]
return {
"FailedPutCount": 0,
"Encrypted": False,
"RequestResponses": request_responses,
}
def tag_delivery_stream(self, delivery_stream_name, tags):
"""Add/update tags for specified delivery stream."""
delivery_stream = self.delivery_streams.get(delivery_stream_name)
if not delivery_stream:
raise ResourceNotFoundException(
f"Firehose {delivery_stream_name} under account {ACCOUNT_ID} "
f"not found."
)
if len(tags) >= MAX_TAGS_PER_DELIVERY_STREAM:
raise ValidationException(
f"1 validation error detected: Value '{tags}' at 'tags' "
f"failed to satisify contstraint: Member must have length "
f"less than or equal to {MAX_TAGS_PER_DELIVERY_STREAM}"
)
errmsg = self.tagger.validate_tags(tags)
if errmsg:
raise ValidationException(errmsg)
self.tagger.tag_resource(delivery_stream.delivery_stream_arn, tags)
def untag_delivery_stream(self, delivery_stream_name, tag_keys):
"""Removes tags from specified delivery stream."""
delivery_stream = self.delivery_streams.get(delivery_stream_name)
if not delivery_stream:
raise ResourceNotFoundException(
f"Firehose {delivery_stream_name} under account {ACCOUNT_ID} "
f"not found."
)
# If a tag key doesn't exist for the stream, boto3 ignores it.
self.tagger.untag_resource_using_names(
delivery_stream.delivery_stream_arn, tag_keys
)
def update_destination(
self,
delivery_stream_name,
current_delivery_stream_version_id,
destination_id,
s3_destination_update,
extended_s3_destination_update,
s3_backup_mode,
redshift_destination_update,
elasticsearch_destination_update,
splunk_destination_update,
http_endpoint_destination_update,
): # pylint: disable=unused-argument,too-many-arguments,too-many-locals
"""Updates specified destination of specified delivery stream."""
(destination_name, destination_config) = find_destination_config_in_args(
locals()
)
delivery_stream = self.delivery_streams.get(delivery_stream_name)
if not delivery_stream:
raise ResourceNotFoundException(
f"Firehose {delivery_stream_name} under accountId "
f"{ACCOUNT_ID} not found."
)
if destination_name == "Splunk":
warnings.warn("A Splunk destination delivery stream is not yet implemented")
if delivery_stream.version_id != current_delivery_stream_version_id:
raise ConcurrentModificationException(
f"Cannot update firehose: {delivery_stream_name} since the "
f"current version id: {delivery_stream.version_id} and "
f"specified version id: {current_delivery_stream_version_id} "
f"do not match"
)
destination = {}
destination_idx = 0
for destination in delivery_stream.destinations:
if destination["destination_id"] == destination_id:
break
destination_idx += 1
else:
raise InvalidArgumentException("Destination Id {destination_id} not found")
# Switching between Amazon ES and other services is not supported.
# For an Amazon ES destination, you can only update to another Amazon
# ES destination. Same with HTTP. Didn't test Splunk.
if (
destination_name == "Elasticsearch" and "Elasticsearch" not in destination
) or (destination_name == "HttpEndpoint" and "HttpEndpoint" not in destination):
raise InvalidArgumentException(
f"Changing the destination type to or from {destination_name} "
f"is not supported at this time."
)
# If this is a different type of destination configuration,
# the existing configuration is reset first.
if destination_name in destination:
delivery_stream.destinations[destination_idx][destination_name].update(
destination_config
)
else:
delivery_stream.destinations[destination_idx] = {
"destination_id": destination_id,
destination_name: destination_config,
}
# Once S3 is updated to an ExtendedS3 destination, both remain in
# the destination. That means when one is updated, the other needs
# to be updated as well. The problem is that they don't have the
# same fields.
if destination_name == "ExtendedS3":
delivery_stream.destinations[destination_idx][
"S3"
] = create_s3_destination_config(destination_config)
elif destination_name == "S3" and "ExtendedS3" in destination:
destination["ExtendedS3"] = {
k: v
for k, v in destination["S3"].items()
if k in destination["ExtendedS3"]
}
# Increment version number and update the timestamp.
delivery_stream.version_id = str(int(current_delivery_stream_version_id) + 1)
delivery_stream.last_update_timestamp = str(datetime.utcnow())
# Unimplemented: processing of the "S3BackupMode" parameter. Per the
# documentation: "You can update a delivery stream to enable Amazon
# S3 backup if it is disabled. If backup is enabled, you can't update
# the delivery stream to disable it."
firehose_backends = {}
for available_region in Session().get_available_regions("firehose"):
firehose_backends[available_region] = FirehoseBackend()
for available_region in Session().get_available_regions(
"firehose", partition_name="aws-us-gov"
):
firehose_backends[available_region] = FirehoseBackend()
for available_region in Session().get_available_regions(
"firehose", partition_name="aws-cn"
):
firehose_backends[available_region] = FirehoseBackend()

110
moto/firehose/responses.py Normal file
View File

@ -0,0 +1,110 @@
"""Handles Firehose API requests, invokes method and returns response."""
import json
from moto.core.responses import BaseResponse
from .models import firehose_backends
class FirehoseResponse(BaseResponse):
"""Handler for Firehose requests and responses."""
@property
def firehose_backend(self):
"""Return backend instance specific to this region."""
return firehose_backends[self.region]
def create_delivery_stream(self):
"""Prepare arguments and respond to CreateDeliveryStream request."""
delivery_stream_arn = self.firehose_backend.create_delivery_stream(
self.region,
self._get_param("DeliveryStreamName"),
self._get_param("DeliveryStreamType"),
self._get_param("KinesisStreamSourceConfiguration"),
self._get_param("DeliveryStreamEncryptionConfigurationInput"),
self._get_param("S3DestinationConfiguration"),
self._get_param("ExtendedS3DestinationConfiguration"),
self._get_param("RedshiftDestinationConfiguration"),
self._get_param("ElasticsearchDestinationConfiguration"),
self._get_param("SplunkDestinationConfiguration"),
self._get_param("HttpEndpointDestinationConfiguration"),
self._get_param("Tags"),
)
return json.dumps({"DeliveryStreamARN": delivery_stream_arn})
def delete_delivery_stream(self):
"""Prepare arguments and respond to DeleteDeliveryStream request."""
self.firehose_backend.delete_delivery_stream(
self._get_param("DeliveryStreamName"), self._get_param("AllowForceDelete"),
)
return json.dumps({})
def describe_delivery_stream(self):
"""Prepare arguments and respond to DescribeDeliveryStream request."""
result = self.firehose_backend.describe_delivery_stream(
self._get_param("DeliveryStreamName"),
self._get_param("Limit"),
self._get_param("ExclusiveStartDestinationId"),
)
return json.dumps(result)
def list_delivery_streams(self):
"""Prepare arguments and respond to ListDeliveryStreams request."""
stream_list = self.firehose_backend.list_delivery_streams(
self._get_param("Limit"),
self._get_param("DeliveryStreamType"),
self._get_param("ExclusiveStartDeliveryStreamName"),
)
return json.dumps(stream_list)
def list_tags_for_delivery_stream(self):
"""Prepare arguments and respond to ListTagsForDeliveryStream()."""
result = self.firehose_backend.list_tags_for_delivery_stream(
self._get_param("DeliveryStreamName"),
self._get_param("ExclusiveStartTagKey"),
self._get_param("Limit"),
)
return json.dumps(result)
def put_record(self):
"""Prepare arguments and response to PutRecord()."""
result = self.firehose_backend.put_record(
self._get_param("DeliveryStreamName"), self._get_param("Record")
)
return json.dumps(result)
def put_record_batch(self):
"""Prepare arguments and response to PutRecordBatch()."""
result = self.firehose_backend.put_record_batch(
self._get_param("DeliveryStreamName"), self._get_param("Records")
)
return json.dumps(result)
def tag_delivery_stream(self):
"""Prepare arguments and respond to TagDeliveryStream request."""
self.firehose_backend.tag_delivery_stream(
self._get_param("DeliveryStreamName"), self._get_param("Tags"),
)
return json.dumps({})
def untag_delivery_stream(self):
"""Prepare arguments and respond to UntagDeliveryStream()."""
self.firehose_backend.untag_delivery_stream(
self._get_param("DeliveryStreamName"), self._get_param("TagKeys"),
)
return json.dumps({})
def update_destination(self):
"""Prepare arguments and respond to UpdateDestination()."""
self.firehose_backend.update_destination(
self._get_param("DeliveryStreamName"),
self._get_param("CurrentDeliveryStreamVersionId"),
self._get_param("DestinationId"),
self._get_param("S3DestinationUpdate"),
self._get_param("ExtendedS3DestinationUpdate"),
self._get_param("S3BackupMode"),
self._get_param("RedshiftDestinationUpdate"),
self._get_param("ElasticsearchDestinationUpdate"),
self._get_param("SplunkDestinationUpdate"),
self._get_param("HttpEndpointDestinationUpdate"),
)
return json.dumps({})

6
moto/firehose/urls.py Normal file
View File

@ -0,0 +1,6 @@
"""Firehose base URL and path."""
from .responses import FirehoseResponse
url_bases = [r"https?://firehose\.(.+)\.amazonaws\.com"]
url_paths = {"{0}/$": FirehoseResponse.dispatch}

View File

@ -1,7 +1,7 @@
from __future__ import unicode_literals
from collections import OrderedDict
import datetime
import time
import re
import itertools
@ -10,7 +10,6 @@ from hashlib import md5
from boto3 import Session
from collections import OrderedDict
from moto.core import BaseBackend, BaseModel, CloudFormationModel
from moto.core.utils import unix_time
from moto.core import ACCOUNT_ID
@ -322,128 +321,9 @@ class Stream(CloudFormationModel):
return self.stream_name
class FirehoseRecord(BaseModel):
def __init__(self, record_data):
self.record_id = 12345678
self.record_data = record_data
class DeliveryStream(BaseModel):
def __init__(self, stream_name, **stream_kwargs):
self.name = stream_name
self.redshift_username = stream_kwargs.get("redshift_username")
self.redshift_password = stream_kwargs.get("redshift_password")
self.redshift_jdbc_url = stream_kwargs.get("redshift_jdbc_url")
self.redshift_role_arn = stream_kwargs.get("redshift_role_arn")
self.redshift_copy_command = stream_kwargs.get("redshift_copy_command")
self.s3_config = stream_kwargs.get("s3_config")
self.extended_s3_config = stream_kwargs.get("extended_s3_config")
self.redshift_s3_role_arn = stream_kwargs.get("redshift_s3_role_arn")
self.redshift_s3_bucket_arn = stream_kwargs.get("redshift_s3_bucket_arn")
self.redshift_s3_prefix = stream_kwargs.get("redshift_s3_prefix")
self.redshift_s3_compression_format = stream_kwargs.get(
"redshift_s3_compression_format", "UNCOMPRESSED"
)
self.redshift_s3_buffering_hints = stream_kwargs.get(
"redshift_s3_buffering_hints"
)
self.elasticsearch_config = stream_kwargs.get("elasticsearch_config")
self.records = []
self.status = "ACTIVE"
self.created_at = datetime.datetime.utcnow()
self.last_updated = datetime.datetime.utcnow()
@property
def arn(self):
return "arn:aws:firehose:us-east-1:{1}:deliverystream/{0}".format(
self.name, ACCOUNT_ID
)
def destinations_to_dict(self):
if self.s3_config:
return [
{"DestinationId": "string", "S3DestinationDescription": self.s3_config}
]
elif self.extended_s3_config:
return [
{
"DestinationId": "string",
"ExtendedS3DestinationDescription": self.extended_s3_config,
}
]
elif self.elasticsearch_config:
return [
{
"DestinationId": "string",
"ElasticsearchDestinationDescription": {
"RoleARN": self.elasticsearch_config.get("RoleARN"),
"DomainARN": self.elasticsearch_config.get("DomainARN"),
"ClusterEndpoint": self.elasticsearch_config.get(
"ClusterEndpoint"
),
"IndexName": self.elasticsearch_config.get("IndexName"),
"TypeName": self.elasticsearch_config.get("TypeName"),
"IndexRotationPeriod": self.elasticsearch_config.get(
"IndexRotationPeriod"
),
"BufferingHints": self.elasticsearch_config.get(
"BufferingHints"
),
"RetryOptions": self.elasticsearch_config.get("RetryOptions"),
"S3DestinationDescription": self.elasticsearch_config.get(
"S3Configuration"
),
},
}
]
else:
return [
{
"DestinationId": "string",
"RedshiftDestinationDescription": {
"ClusterJDBCURL": self.redshift_jdbc_url,
"CopyCommand": self.redshift_copy_command,
"RoleARN": self.redshift_role_arn,
"S3DestinationDescription": {
"BucketARN": self.redshift_s3_bucket_arn,
"BufferingHints": self.redshift_s3_buffering_hints,
"CompressionFormat": self.redshift_s3_compression_format,
"Prefix": self.redshift_s3_prefix,
"RoleARN": self.redshift_s3_role_arn,
},
"Username": self.redshift_username,
},
}
]
def to_dict(self):
return {
"DeliveryStreamDescription": {
"CreateTimestamp": time.mktime(self.created_at.timetuple()),
"DeliveryStreamARN": self.arn,
"DeliveryStreamName": self.name,
"DeliveryStreamStatus": self.status,
"Destinations": self.destinations_to_dict(),
"HasMoreDestinations": False,
"LastUpdateTimestamp": time.mktime(self.last_updated.timetuple()),
"VersionId": "string",
}
}
def put_record(self, record_data):
record = FirehoseRecord(record_data)
self.records.append(record)
return record
class KinesisBackend(BaseBackend):
def __init__(self):
self.streams = OrderedDict()
self.delivery_streams = {}
def create_stream(
self, stream_name, shard_count, retention_period_hours, region_name
@ -622,30 +502,6 @@ class KinesisBackend(BaseBackend):
raise InvalidArgumentError(retention_period_hours)
stream.retention_period_hours = retention_period_hours
""" Firehose """
def create_delivery_stream(self, stream_name, **stream_kwargs):
stream = DeliveryStream(stream_name, **stream_kwargs)
self.delivery_streams[stream_name] = stream
return stream
def get_delivery_stream(self, stream_name):
if stream_name in self.delivery_streams:
return self.delivery_streams[stream_name]
else:
raise StreamNotFoundError(stream_name)
def list_delivery_streams(self):
return self.delivery_streams.values()
def delete_delivery_stream(self, stream_name):
self.delivery_streams.pop(stream_name)
def put_firehose_record(self, stream_name, record_data):
stream = self.get_delivery_stream(stream_name)
record = stream.put_record(record_data)
return record
def list_tags_for_stream(
self, stream_name, exclusive_start_tag_key=None, limit=None
):

View File

@ -15,13 +15,6 @@ class KinesisResponse(BaseResponse):
def kinesis_backend(self):
return kinesis_backends[self.region]
@property
def is_firehose(self):
host = self.headers.get("host") or self.headers["Host"]
return host.startswith("firehose") or "firehose" in self.headers.get(
"Authorization", ""
)
def create_stream(self):
stream_name = self.parameters.get("StreamName")
shard_count = self.parameters.get("ShardCount")
@ -103,8 +96,6 @@ class KinesisResponse(BaseResponse):
)
def put_record(self):
if self.is_firehose:
return self.firehose_put_record()
stream_name = self.parameters.get("StreamName")
partition_key = self.parameters.get("PartitionKey")
explicit_hash_key = self.parameters.get("ExplicitHashKey")
@ -122,8 +113,6 @@ class KinesisResponse(BaseResponse):
return json.dumps({"SequenceNumber": sequence_number, "ShardId": shard_id})
def put_records(self):
if self.is_firehose:
return self.put_record_batch()
stream_name = self.parameters.get("StreamName")
records = self.parameters.get("Records")
@ -165,85 +154,6 @@ class KinesisResponse(BaseResponse):
)
return ""
""" Firehose """
def create_delivery_stream(self):
stream_name = self.parameters["DeliveryStreamName"]
redshift_config = self.parameters.get("RedshiftDestinationConfiguration")
s3_config = self.parameters.get("S3DestinationConfiguration")
extended_s3_config = self.parameters.get("ExtendedS3DestinationConfiguration")
elasticsearch_config = self.parameters.get(
"ElasticsearchDestinationConfiguration"
)
if redshift_config:
redshift_s3_config = redshift_config["S3Configuration"]
stream_kwargs = {
"redshift_username": redshift_config["Username"],
"redshift_password": redshift_config["Password"],
"redshift_jdbc_url": redshift_config["ClusterJDBCURL"],
"redshift_role_arn": redshift_config["RoleARN"],
"redshift_copy_command": redshift_config["CopyCommand"],
"redshift_s3_role_arn": redshift_s3_config["RoleARN"],
"redshift_s3_bucket_arn": redshift_s3_config["BucketARN"],
"redshift_s3_prefix": redshift_s3_config["Prefix"],
"redshift_s3_compression_format": redshift_s3_config.get(
"CompressionFormat"
),
"redshift_s3_buffering_hints": redshift_s3_config["BufferingHints"],
}
elif s3_config:
stream_kwargs = {"s3_config": s3_config}
elif extended_s3_config:
stream_kwargs = {"extended_s3_config": extended_s3_config}
elif elasticsearch_config:
stream_kwargs = {"elasticsearch_config": elasticsearch_config}
else:
stream_kwargs = {}
stream = self.kinesis_backend.create_delivery_stream(
stream_name, **stream_kwargs
)
return json.dumps({"DeliveryStreamARN": stream.arn})
def describe_delivery_stream(self):
stream_name = self.parameters["DeliveryStreamName"]
stream = self.kinesis_backend.get_delivery_stream(stream_name)
return json.dumps(stream.to_dict())
def list_delivery_streams(self):
streams = self.kinesis_backend.list_delivery_streams()
return json.dumps(
{
"DeliveryStreamNames": [stream.name for stream in streams],
"HasMoreDeliveryStreams": False,
}
)
def delete_delivery_stream(self):
stream_name = self.parameters["DeliveryStreamName"]
self.kinesis_backend.delete_delivery_stream(stream_name)
return json.dumps({})
def firehose_put_record(self):
stream_name = self.parameters["DeliveryStreamName"]
record_data = self.parameters["Record"]["Data"]
record = self.kinesis_backend.put_firehose_record(stream_name, record_data)
return json.dumps({"RecordId": record.record_id})
def put_record_batch(self):
stream_name = self.parameters["DeliveryStreamName"]
records = self.parameters["Records"]
request_responses = []
for record in records:
record_response = self.kinesis_backend.put_firehose_record(
stream_name, record["Data"]
)
request_responses.append({"RecordId": record_response.record_id})
return json.dumps({"FailedPutCount": 0, "RequestResponses": request_responses})
def add_tags_to_stream(self):
stream_name = self.parameters.get("StreamName")
tags = self.parameters.get("Tags")

View File

@ -4,7 +4,6 @@ from .responses import KinesisResponse
url_bases = [
# Need to avoid conflicting with kinesisvideo
r"https?://kinesis\.(.+)\.amazonaws\.com",
r"https?://firehose\.(.+)\.amazonaws\.com",
]
url_paths = {"{0}/$": KinesisResponse.dispatch}

View File

@ -828,21 +828,31 @@ class LogsBackend(BaseBackend):
def put_subscription_filter(
self, log_group_name, filter_name, filter_pattern, destination_arn, role_arn
):
# TODO: support other destinations like Kinesis stream
from moto.awslambda import lambda_backends # due to circular dependency
log_group = self.groups.get(log_group_name)
if not log_group:
raise ResourceNotFoundException()
lambda_func = lambda_backends[self.region_name].get_function(destination_arn)
service = destination_arn.split(":")[2]
if service == "lambda":
from moto.awslambda import ( # pylint: disable=import-outside-toplevel
lambda_backends,
)
# no specific permission check implemented
if not lambda_func:
lambda_func = lambda_backends[self.region_name].get_function(
destination_arn
)
# no specific permission check implemented
if not lambda_func:
raise InvalidParameterException(
"Could not execute the lambda function. Make sure you "
"have given CloudWatch Logs permission to execute your "
"function."
)
else:
raise InvalidParameterException(
"Could not execute the lambda function. "
"Make sure you have given CloudWatch Logs permission to execute your function."
f"Service '{service}' has not implemented for "
f"put_subscription_filter()"
)
log_group.put_subscription_filter(

View File

@ -261,11 +261,11 @@ def get_function_in_responses(service, operation, protocol):
for input_name, input_type in inputs.items():
type_name = input_type.type_name
if type_name == "integer":
arg_line_tmpl = ' {} = self._get_int_param("{}")\n'
arg_line_tmpl = ' {}=self._get_int_param("{}")\n'
elif type_name == "list":
arg_line_tmpl = ' {} = self._get_list_prefix("{}.member")\n'
arg_line_tmpl = ' {}=self._get_list_prefix("{}.member")\n'
else:
arg_line_tmpl = ' {} = self._get_param("{}")\n'
arg_line_tmpl = ' {}=self._get_param("{}")\n'
body += arg_line_tmpl.format(to_snake_case(input_name), input_name)
if output_names:
body += " {} = self.{}_backend.{}(\n".format(

View File

@ -5,7 +5,6 @@ from moto.core import BaseBackend, BaseModel
class {{ service_class }}Backend(BaseBackend):
"""Implementation of {{ service_class }} APIs."""
def __init__(self, region_name=None):

View File

@ -6,7 +6,6 @@ from .models import {{ escaped_service }}_backends
class {{ service_class }}Response(BaseResponse):
"""Handler for {{ service_class }} requests and responses."""
@property

View File

@ -2,7 +2,7 @@
from .responses import {{ service_class }}Response
url_bases = [
"https?://{{ endpoint_prefix }}.(.+).amazonaws.com",
r"https?://{{ endpoint_prefix }}\.(.+)\.amazonaws\.com",
]
{% if api_protocol == 'rest-json' %}

View File

View File

@ -0,0 +1,495 @@
"""Unit tests specific to basic Firehose Delivery Stream-related APIs."""
from unittest import SkipTest
import warnings
import boto3
from botocore.exceptions import ClientError
import pytest
from moto import mock_firehose
from moto import settings
from moto.core import ACCOUNT_ID
from moto.core.utils import get_random_hex
from moto.firehose.models import DeliveryStream
TEST_REGION = "us-east-1" if settings.TEST_SERVER_MODE else "us-west-2"
def sample_s3_dest_config():
"""Return a simple extended s3 destination configuration."""
return {
"RoleARN": f"arn:aws:iam::{ACCOUNT_ID}:role/firehose-test-role",
"BucketARN": "arn:aws:s3::firehosetestbucket",
}
@mock_firehose
def test_warnings():
"""Test features that raise a warning as they're unimplemented."""
if settings.TEST_SERVER_MODE:
raise SkipTest("Can't capture warnings in server mode")
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
# DeliveryStreamEncryption is not supported.
stream_name = f"test_warning_{get_random_hex(6)}"
with warnings.catch_warnings(record=True) as warn_msg:
client.create_delivery_stream(
DeliveryStreamName=stream_name,
ExtendedS3DestinationConfiguration=s3_dest_config,
DeliveryStreamEncryptionConfigurationInput={"KeyType": "AWS_OWNED_CMK"},
)
assert "server-side encryption enabled is not yet implemented" in str(
warn_msg[-1].message
)
# Can't create a delivery stream for Splunk as it's unimplemented.
stream_name = f"test_splunk_destination_{get_random_hex(6)}"
with warnings.catch_warnings(record=True) as warn_msg:
client.create_delivery_stream(
DeliveryStreamName=stream_name,
SplunkDestinationConfiguration={
"HECEndpoint": "foo",
"HECEndpointType": "foo",
"HECToken": "foo",
"S3Configuration": {"RoleARN": "foo", "BucketARN": "foo"},
},
)
assert "Splunk destination delivery stream is not yet implemented" in str(
warn_msg[-1].message
)
# Can't update a delivery stream to Splunk as it's unimplemented.
stream_name = f"test_update_splunk_destination_{get_random_hex(6)}"
client.create_delivery_stream(
DeliveryStreamName=stream_name, S3DestinationConfiguration=s3_dest_config,
)
with warnings.catch_warnings(record=True) as warn_msg:
client.update_destination(
DeliveryStreamName=stream_name,
CurrentDeliveryStreamVersionId="1",
DestinationId="destinationId-000000000001",
SplunkDestinationUpdate={
"HECEndpoint": "foo",
"HECEndpointType": "foo",
"HECToken": "foo",
},
)
assert "Splunk destination delivery stream is not yet implemented" in str(
warn_msg[-1].message
)
@mock_firehose
def test_create_delivery_stream_failures():
"""Test errors invoking create_delivery_stream()."""
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
failure_name = f"test_failure_{get_random_hex(6)}"
# Create too many streams.
for idx in range(DeliveryStream.MAX_STREAMS_PER_REGION):
client.create_delivery_stream(
DeliveryStreamName=f"{failure_name}_{idx}",
ExtendedS3DestinationConfiguration=s3_dest_config,
)
with pytest.raises(ClientError) as exc:
client.create_delivery_stream(
DeliveryStreamName=f"{failure_name}_99",
ExtendedS3DestinationConfiguration=s3_dest_config,
)
err = exc.value.response["Error"]
assert err["Code"] == "LimitExceededException"
assert "You have already consumed your firehose quota" in err["Message"]
# Free up the memory from the limits test.
for idx in range(DeliveryStream.MAX_STREAMS_PER_REGION):
client.delete_delivery_stream(DeliveryStreamName=f"{failure_name}_{idx}")
# Create a stream with the same name as an existing stream.
client.create_delivery_stream(
DeliveryStreamName=f"{failure_name}",
ExtendedS3DestinationConfiguration=s3_dest_config,
)
with pytest.raises(ClientError) as exc:
client.create_delivery_stream(
DeliveryStreamName=f"{failure_name}",
ExtendedS3DestinationConfiguration=s3_dest_config,
)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceInUseException"
assert (
f"Firehose {failure_name} under accountId {ACCOUNT_ID} already exists"
in err["Message"]
)
# Only one destination configuration is allowed.
with pytest.raises(ClientError) as exc:
client.create_delivery_stream(
DeliveryStreamName=f"{failure_name}_2manyconfigs",
ExtendedS3DestinationConfiguration=s3_dest_config,
HttpEndpointDestinationConfiguration={
"EndpointConfiguration": {"Url": "google.com"},
"S3Configuration": {"RoleARN": "foo", "BucketARN": "foo"},
},
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidArgumentException"
assert (
"Exactly one destination configuration is supported for a Firehose"
in err["Message"]
)
# Provide a Kinesis source configuration, but use DirectPut stream type.
with pytest.raises(ClientError) as exc:
client.create_delivery_stream(
DeliveryStreamName=f"{failure_name}_bad_source_type",
DeliveryStreamType="DirectPut",
KinesisStreamSourceConfiguration={
"KinesisStreamARN": "kinesis_test_ds",
"RoleARN": "foo",
},
ExtendedS3DestinationConfiguration=s3_dest_config,
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidArgumentException"
assert (
"KinesisSourceStreamConfig is only applicable for "
"KinesisStreamAsSource stream type" in err["Message"]
)
@mock_firehose
def test_delete_delivery_stream():
"""Test successful and failed invocations of delete_delivery_stream()."""
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
stream_name = f"test_delete_{get_random_hex(6)}"
# Create a couple of streams to test with.
for idx in range(5):
client.create_delivery_stream(
DeliveryStreamName=f"{stream_name}-{idx}",
S3DestinationConfiguration=s3_dest_config,
)
# Attempt to delete a non-existent stream.
with pytest.raises(ClientError) as exc:
client.delete_delivery_stream(DeliveryStreamName="foo")
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert f"Firehose foo under account {ACCOUNT_ID} not found." in err["Message"]
# Delete one stream, verify the remaining exist.
client.delete_delivery_stream(DeliveryStreamName=f"{stream_name}-0")
hoses = client.list_delivery_streams()
assert len(hoses["DeliveryStreamNames"]) == 4
expected_list = [f"{stream_name}-{x}" for x in range(1, 5)]
assert hoses["DeliveryStreamNames"] == expected_list
# Delete all streams, verify there are no more streams.
for idx in range(1, 5):
client.delete_delivery_stream(DeliveryStreamName=f"{stream_name}-{idx}")
hoses = client.list_delivery_streams()
assert len(hoses["DeliveryStreamNames"]) == 0
assert hoses["DeliveryStreamNames"] == []
@mock_firehose
def test_describe_delivery_stream():
"""Test successful, failed invocations of describe_delivery_stream()."""
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
stream_name = f"test_describe_{get_random_hex(6)}"
role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/testrole"
# Create delivery stream with S3 destination, kinesis type and source
# for testing purposes.
client.create_delivery_stream(
DeliveryStreamName=stream_name,
DeliveryStreamType="KinesisStreamAsSource",
KinesisStreamSourceConfiguration={
"KinesisStreamARN": f"arn:aws:kinesis:{TEST_REGION}:{ACCOUNT_ID}:stream/test-datastream",
"RoleARN": role_arn,
},
S3DestinationConfiguration=s3_dest_config,
)
labels_with_kinesis_source = {
"DeliveryStreamName",
"DeliveryStreamARN",
"DeliveryStreamStatus",
"DeliveryStreamType",
"VersionId",
"CreateTimestamp",
"Source",
"Destinations",
"LastUpdateTimestamp",
"HasMoreDestinations",
}
# Verify the created fields are as expected.
results = client.describe_delivery_stream(DeliveryStreamName=stream_name)
description = results["DeliveryStreamDescription"]
assert set(description.keys()) == labels_with_kinesis_source
assert description["DeliveryStreamName"] == stream_name
assert (
description["DeliveryStreamARN"]
== f"arn:aws:firehose:{TEST_REGION}:{ACCOUNT_ID}:/delivery_stream/{stream_name}"
)
assert description["DeliveryStreamStatus"] == "ACTIVE"
assert description["DeliveryStreamType"] == "KinesisStreamAsSource"
assert description["VersionId"] == "1"
assert (
description["Source"]["KinesisStreamSourceDescription"]["RoleARN"] == role_arn
)
assert len(description["Destinations"]) == 1
assert set(description["Destinations"][0].keys()) == {
"S3DestinationDescription",
"DestinationId",
}
# Update destination with ExtendedS3.
client.update_destination(
DeliveryStreamName=stream_name,
CurrentDeliveryStreamVersionId="1",
DestinationId="destinationId-000000000001",
ExtendedS3DestinationUpdate=s3_dest_config,
)
# Verify the created fields are as expected. There should be one
# destination with two destination types. The last update timestamp
# should be present and the version number updated.
results = client.describe_delivery_stream(DeliveryStreamName=stream_name)
description = results["DeliveryStreamDescription"]
assert set(description.keys()) == labels_with_kinesis_source | {
"LastUpdateTimestamp"
}
assert description["DeliveryStreamName"] == stream_name
assert (
description["DeliveryStreamARN"]
== f"arn:aws:firehose:{TEST_REGION}:{ACCOUNT_ID}:/delivery_stream/{stream_name}"
)
assert description["DeliveryStreamStatus"] == "ACTIVE"
assert description["DeliveryStreamType"] == "KinesisStreamAsSource"
assert description["VersionId"] == "2"
assert (
description["Source"]["KinesisStreamSourceDescription"]["RoleARN"] == role_arn
)
assert len(description["Destinations"]) == 1
assert set(description["Destinations"][0].keys()) == {
"S3DestinationDescription",
"DestinationId",
"ExtendedS3DestinationDescription",
}
# Update ExtendedS3 destination with a few different values.
client.update_destination(
DeliveryStreamName=stream_name,
CurrentDeliveryStreamVersionId="2",
DestinationId="destinationId-000000000001",
ExtendedS3DestinationUpdate={
"RoleARN": role_arn,
"BucketARN": "arn:aws:s3:::testbucket",
# IntervalInSeconds increased from 300 to 700.
"BufferingHints": {"IntervalInSeconds": 700, "SizeInMBs": 5},
},
)
results = client.describe_delivery_stream(DeliveryStreamName=stream_name)
description = results["DeliveryStreamDescription"]
assert description["VersionId"] == "3"
assert len(description["Destinations"]) == 1
destination = description["Destinations"][0]
assert (
destination["ExtendedS3DestinationDescription"]["BufferingHints"][
"IntervalInSeconds"
]
== 700
)
# Verify S3 was added when ExtendedS3 was added.
assert set(destination.keys()) == {
"S3DestinationDescription",
"DestinationId",
"ExtendedS3DestinationDescription",
}
assert set(destination["S3DestinationDescription"].keys()) == {
"RoleARN",
"BucketARN",
"BufferingHints",
}
# Update delivery stream from ExtendedS3 to S3.
client.update_destination(
DeliveryStreamName=stream_name,
CurrentDeliveryStreamVersionId="3",
DestinationId="destinationId-000000000001",
S3DestinationUpdate={
"RoleARN": role_arn,
"BucketARN": "arn:aws:s3:::testbucket",
# IntervalInSeconds decreased from 700 to 500.
"BufferingHints": {"IntervalInSeconds": 500, "SizeInMBs": 5},
},
)
results = client.describe_delivery_stream(DeliveryStreamName=stream_name)
description = results["DeliveryStreamDescription"]
assert description["VersionId"] == "4"
assert len(description["Destinations"]) == 1
assert set(description["Destinations"][0].keys()) == {
"S3DestinationDescription",
"ExtendedS3DestinationDescription",
"DestinationId",
}
destination = description["Destinations"][0]
assert (
destination["ExtendedS3DestinationDescription"]["BufferingHints"][
"IntervalInSeconds"
]
== 500
)
assert (
destination["S3DestinationDescription"]["BufferingHints"]["IntervalInSeconds"]
== 500
)
@mock_firehose
def test_list_delivery_streams():
"""Test successful and failed invocations of list_delivery_streams()."""
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
stream_name = f"test_list_{get_random_hex(6)}"
# Create a couple of streams of both types to test with.
for idx in range(5):
client.create_delivery_stream(
DeliveryStreamName=f"{stream_name}-{idx}",
DeliveryStreamType="DirectPut",
S3DestinationConfiguration=s3_dest_config,
)
for idx in range(5):
client.create_delivery_stream(
DeliveryStreamName=f"{stream_name}-{idx+5}",
DeliveryStreamType="KinesisStreamAsSource",
S3DestinationConfiguration=s3_dest_config,
)
# Verify limit works.
hoses = client.list_delivery_streams(Limit=1)
assert len(hoses["DeliveryStreamNames"]) == 1
assert hoses["DeliveryStreamNames"] == [f"{stream_name}-0"]
assert hoses["HasMoreDeliveryStreams"] is True
hoses = client.list_delivery_streams(Limit=10)
assert len(hoses["DeliveryStreamNames"]) == 10
assert hoses["HasMoreDeliveryStreams"] is False
# Verify delivery_stream_type works as a filter.
hoses = client.list_delivery_streams(DeliveryStreamType="DirectPut")
assert len(hoses["DeliveryStreamNames"]) == 5
expected_directput_list = [f"{stream_name}-{x}" for x in range(5)]
assert hoses["DeliveryStreamNames"] == expected_directput_list
assert hoses["HasMoreDeliveryStreams"] is False
hoses = client.list_delivery_streams(DeliveryStreamType="KinesisStreamAsSource")
assert len(hoses["DeliveryStreamNames"]) == 5
expected_kinesis_stream_list = [f"{stream_name}-{x+5}" for x in range(5)]
assert hoses["DeliveryStreamNames"] == expected_kinesis_stream_list
assert hoses["HasMoreDeliveryStreams"] is False
# Verify exclusive_start_delivery_stream_name returns truncated list.
hoses = client.list_delivery_streams(
ExclusiveStartDeliveryStreamName=f"{stream_name}-5"
)
assert len(hoses["DeliveryStreamNames"]) == 4
expected_stream_list = [f"{stream_name}-{x+5}" for x in range(1, 5)]
assert hoses["DeliveryStreamNames"] == expected_stream_list
assert hoses["HasMoreDeliveryStreams"] is False
hoses = client.list_delivery_streams(
ExclusiveStartDeliveryStreamName=f"{stream_name}-9"
)
assert len(hoses["DeliveryStreamNames"]) == 0
assert hoses["HasMoreDeliveryStreams"] is False
# boto3 ignores bad stream names for ExclusiveStartDeliveryStreamName.
hoses = client.list_delivery_streams(ExclusiveStartDeliveryStreamName="foo")
assert len(hoses["DeliveryStreamNames"]) == 10
assert (
hoses["DeliveryStreamNames"]
== expected_directput_list + expected_kinesis_stream_list
)
assert hoses["HasMoreDeliveryStreams"] is False
# Verify no parameters returns entire list.
client.list_delivery_streams()
assert len(hoses["DeliveryStreamNames"]) == 10
assert (
hoses["DeliveryStreamNames"]
== expected_directput_list + expected_kinesis_stream_list
)
assert hoses["HasMoreDeliveryStreams"] is False
@mock_firehose
def test_update_destination():
"""Test successful, failed invocations of update_destination()."""
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
# Can't update a non-existent stream.
with pytest.raises(ClientError) as exc:
client.update_destination(
DeliveryStreamName="foo",
CurrentDeliveryStreamVersionId="1",
DestinationId="destinationId-000000000001",
ExtendedS3DestinationUpdate=s3_dest_config,
)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert f"Firehose foo under accountId {ACCOUNT_ID} not found" in err["Message"]
# Create a delivery stream for testing purposes.
stream_name = f"test_update_{get_random_hex(6)}"
client.create_delivery_stream(
DeliveryStreamName=stream_name,
DeliveryStreamType="DirectPut",
S3DestinationConfiguration=s3_dest_config,
)
# Only one destination configuration is allowed.
with pytest.raises(ClientError) as exc:
client.update_destination(
DeliveryStreamName=stream_name,
CurrentDeliveryStreamVersionId="1",
DestinationId="destinationId-000000000001",
S3DestinationUpdate=s3_dest_config,
ExtendedS3DestinationUpdate=s3_dest_config,
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidArgumentException"
assert (
"Exactly one destination configuration is supported for a Firehose"
in err["Message"]
)
# Can't update to/from http or ES.
with pytest.raises(ClientError) as exc:
client.update_destination(
DeliveryStreamName=stream_name,
CurrentDeliveryStreamVersionId="1",
DestinationId="destinationId-000000000001",
HttpEndpointDestinationUpdate={
"EndpointConfiguration": {"Url": "https://google.com"},
"RetryOptions": {"DurationInSeconds": 100},
},
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidArgumentException"
assert (
"Changing the destination type to or from HttpEndpoint is not "
"supported at this time"
) in err["Message"]

View File

@ -1,22 +1,22 @@
from __future__ import unicode_literals
import datetime
from botocore.exceptions import ClientError
"""Unit tests verifying various delivery stream destination content."""
import boto3
import sure # noqa
from moto import mock_kinesis
from moto import mock_firehose
from moto import settings
from moto.core import ACCOUNT_ID
from moto.core.utils import get_random_hex
TEST_REGION = "us-east-1" if settings.TEST_SERVER_MODE else "us-west-2"
def create_s3_delivery_stream(client, stream_name):
def create_extended_s3_delivery_stream(client, stream_name):
"""Return ARN of a delivery stream created with an S3 destination."""
return client.create_delivery_stream(
DeliveryStreamName=stream_name,
DeliveryStreamType="DirectPut",
ExtendedS3DestinationConfiguration={
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(ACCOUNT_ID),
"BucketARN": "arn:aws:s3:::kinesis-test",
"BucketARN": "arn:aws:s3:::firehose-test",
"Prefix": "myFolder/",
"CompressionFormat": "UNCOMPRESSED",
"DataFormatConversionConfiguration": {
@ -38,6 +38,7 @@ def create_s3_delivery_stream(client, stream_name):
def create_redshift_delivery_stream(client, stream_name):
"""Return ARN of a delivery stream created with a Redshift destination."""
return client.create_delivery_stream(
DeliveryStreamName=stream_name,
RedshiftDestinationConfiguration={
@ -53,7 +54,7 @@ def create_redshift_delivery_stream(client, stream_name):
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
ACCOUNT_ID
),
"BucketARN": "arn:aws:s3:::kinesis-test",
"BucketARN": "arn:aws:s3:::firehose-test",
"Prefix": "myFolder/",
"BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124},
"CompressionFormat": "UNCOMPRESSED",
@ -63,12 +64,13 @@ def create_redshift_delivery_stream(client, stream_name):
def create_elasticsearch_delivery_stream(client, stream_name):
"""Return delivery stream ARN of an ElasticSearch destination."""
return client.create_delivery_stream(
DeliveryStreamName=stream_name,
DeliveryStreamType="DirectPut",
ElasticsearchDestinationConfiguration={
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(ACCOUNT_ID),
"DomainARN": "arn:aws:es:::domain/kinesis-test",
"DomainARN": "arn:aws:es:::domain/firehose-test",
"IndexName": "myIndex",
"TypeName": "UNCOMPRESSED",
"IndexRotationPeriod": "NoRotation",
@ -78,7 +80,7 @@ def create_elasticsearch_delivery_stream(client, stream_name):
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
ACCOUNT_ID
),
"BucketARN": "arn:aws:s3:::kinesis-test",
"BucketARN": "arn:aws:s3:::firehose-test",
"Prefix": "myFolder/",
"BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124},
"CompressionFormat": "UNCOMPRESSED",
@ -87,14 +89,39 @@ def create_elasticsearch_delivery_stream(client, stream_name):
)
@mock_kinesis
def test_create_redshift_delivery_stream():
client = boto3.client("firehose", region_name="us-east-1")
def create_http_delivery_stream(client, stream_name):
"""Return delivery stream ARN of an Http destination."""
return client.create_delivery_stream(
DeliveryStreamName=stream_name,
DeliveryStreamType="DirectPut",
HttpEndpointDestinationConfiguration={
"EndpointConfiguration": {"Url": "google.com"},
"RetryOptions": {"DurationInSeconds": 100},
"BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124},
"CloudWatchLoggingOptions": {"Enabled": False},
"S3Configuration": {
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
ACCOUNT_ID
),
"BucketARN": "arn:aws:s3:::firehose-test",
"Prefix": "myFolder/",
"BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124},
"CompressionFormat": "UNCOMPRESSED",
},
},
)
response = create_redshift_delivery_stream(client, "stream1")
@mock_firehose
def test_create_redshift_delivery_stream():
"""Verify fields of a Redshift delivery stream."""
client = boto3.client("firehose", region_name=TEST_REGION)
stream_name = f"stream_{get_random_hex(6)}"
response = create_redshift_delivery_stream(client, stream_name)
stream_arn = response["DeliveryStreamARN"]
response = client.describe_delivery_stream(DeliveryStreamName="stream1")
response = client.describe_delivery_stream(DeliveryStreamName=stream_name)
stream_description = response["DeliveryStreamDescription"]
# Sure and Freezegun don't play nicely together
@ -103,13 +130,14 @@ def test_create_redshift_delivery_stream():
stream_description.should.equal(
{
"DeliveryStreamName": "stream1",
"DeliveryStreamName": stream_name,
"DeliveryStreamARN": stream_arn,
"DeliveryStreamStatus": "ACTIVE",
"VersionId": "string",
"DeliveryStreamType": "DirectPut",
"VersionId": "1",
"Destinations": [
{
"DestinationId": "string",
"DestinationId": "destinationId-000000000001",
"RedshiftDestinationDescription": {
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
ACCOUNT_ID
@ -124,7 +152,7 @@ def test_create_redshift_delivery_stream():
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
ACCOUNT_ID
),
"BucketARN": "arn:aws:s3:::kinesis-test",
"BucketARN": "arn:aws:s3:::firehose-test",
"Prefix": "myFolder/",
"BufferingHints": {
"SizeInMBs": 123,
@ -140,14 +168,16 @@ def test_create_redshift_delivery_stream():
)
@mock_kinesis
def test_create_s3_delivery_stream():
client = boto3.client("firehose", region_name="us-east-1")
@mock_firehose
def test_create_extended_s3_delivery_stream():
"""Verify fields of a S3 delivery stream."""
client = boto3.client("firehose", region_name=TEST_REGION)
response = create_s3_delivery_stream(client, "stream1")
stream_name = f"stream_{get_random_hex(6)}"
response = create_extended_s3_delivery_stream(client, stream_name)
stream_arn = response["DeliveryStreamARN"]
response = client.describe_delivery_stream(DeliveryStreamName="stream1")
response = client.describe_delivery_stream(DeliveryStreamName=stream_name)
stream_description = response["DeliveryStreamDescription"]
# Sure and Freezegun don't play nicely together
@ -156,18 +186,19 @@ def test_create_s3_delivery_stream():
stream_description.should.equal(
{
"DeliveryStreamName": "stream1",
"DeliveryStreamName": stream_name,
"DeliveryStreamARN": stream_arn,
"DeliveryStreamStatus": "ACTIVE",
"VersionId": "string",
"DeliveryStreamType": "DirectPut",
"VersionId": "1",
"Destinations": [
{
"DestinationId": "string",
"DestinationId": "destinationId-000000000001",
"ExtendedS3DestinationDescription": {
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
ACCOUNT_ID
),
"BucketARN": "arn:aws:s3:::kinesis-test",
"BucketARN": "arn:aws:s3:::firehose-test",
"Prefix": "myFolder/",
"CompressionFormat": "UNCOMPRESSED",
"DataFormatConversionConfiguration": {
@ -181,7 +212,7 @@ def test_create_s3_delivery_stream():
}
},
"SchemaConfiguration": {
"DatabaseName": "stream1",
"DatabaseName": stream_name,
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
ACCOUNT_ID
),
@ -189,6 +220,14 @@ def test_create_s3_delivery_stream():
},
},
},
"S3DestinationDescription": {
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
ACCOUNT_ID
),
"BucketARN": "arn:aws:s3:::firehose-test",
"Prefix": "myFolder/",
"CompressionFormat": "UNCOMPRESSED",
},
}
],
"HasMoreDestinations": False,
@ -196,14 +235,16 @@ def test_create_s3_delivery_stream():
)
@mock_kinesis
@mock_firehose
def test_create_elasticsearch_delivery_stream():
client = boto3.client("firehose", region_name="us-east-1")
"""Verify fields of an Elasticsearch delivery stream."""
client = boto3.client("firehose", region_name=TEST_REGION)
response = create_elasticsearch_delivery_stream(client, "stream1")
stream_name = f"stream_{get_random_hex(6)}"
response = create_elasticsearch_delivery_stream(client, stream_name)
stream_arn = response["DeliveryStreamARN"]
response = client.describe_delivery_stream(DeliveryStreamName="stream1")
response = client.describe_delivery_stream(DeliveryStreamName=stream_name)
stream_description = response["DeliveryStreamDescription"]
# Sure and Freezegun don't play nicely together
@ -212,18 +253,19 @@ def test_create_elasticsearch_delivery_stream():
stream_description.should.equal(
{
"DeliveryStreamName": "stream1",
"DeliveryStreamName": stream_name,
"DeliveryStreamARN": stream_arn,
"DeliveryStreamStatus": "ACTIVE",
"VersionId": "string",
"DeliveryStreamType": "DirectPut",
"VersionId": "1",
"Destinations": [
{
"DestinationId": "string",
"DestinationId": "destinationId-000000000001",
"ElasticsearchDestinationDescription": {
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
ACCOUNT_ID
),
"DomainARN": "arn:aws:es:::domain/kinesis-test",
"DomainARN": "arn:aws:es:::domain/firehose-test",
"IndexName": "myIndex",
"TypeName": "UNCOMPRESSED",
"IndexRotationPeriod": "NoRotation",
@ -233,7 +275,7 @@ def test_create_elasticsearch_delivery_stream():
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
ACCOUNT_ID
),
"BucketARN": "arn:aws:s3:::kinesis-test",
"BucketARN": "arn:aws:s3:::firehose-test",
"Prefix": "myFolder/",
"BufferingHints": {
"SizeInMBs": 123,
@ -249,15 +291,17 @@ def test_create_elasticsearch_delivery_stream():
)
@mock_kinesis
def test_create_stream_without_redshift():
client = boto3.client("firehose", region_name="us-east-1")
@mock_firehose
def test_create_s3_delivery_stream():
"""Verify fields of an S3 delivery stream."""
client = boto3.client("firehose", region_name=TEST_REGION)
stream_name = f"stream_{get_random_hex(6)}"
response = client.create_delivery_stream(
DeliveryStreamName="stream1",
DeliveryStreamName=stream_name,
S3DestinationConfiguration={
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(ACCOUNT_ID),
"BucketARN": "arn:aws:s3:::kinesis-test",
"BucketARN": "arn:aws:s3:::firehose-test",
"Prefix": "myFolder/",
"BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124},
"CompressionFormat": "UNCOMPRESSED",
@ -265,7 +309,7 @@ def test_create_stream_without_redshift():
)
stream_arn = response["DeliveryStreamARN"]
response = client.describe_delivery_stream(DeliveryStreamName="stream1")
response = client.describe_delivery_stream(DeliveryStreamName=stream_name)
stream_description = response["DeliveryStreamDescription"]
# Sure and Freezegun don't play nicely together
@ -274,21 +318,19 @@ def test_create_stream_without_redshift():
stream_description.should.equal(
{
"DeliveryStreamName": "stream1",
"DeliveryStreamName": stream_name,
"DeliveryStreamARN": stream_arn,
"DeliveryStreamStatus": "ACTIVE",
"VersionId": "string",
"DeliveryStreamType": "DirectPut",
"VersionId": "1",
"Destinations": [
{
"DestinationId": "string",
"DestinationId": "destinationId-000000000001",
"S3DestinationDescription": {
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
ACCOUNT_ID
),
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
ACCOUNT_ID
),
"BucketARN": "arn:aws:s3:::kinesis-test",
"BucketARN": "arn:aws:s3:::firehose-test",
"Prefix": "myFolder/",
"BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124},
"CompressionFormat": "UNCOMPRESSED",
@ -300,47 +342,52 @@ def test_create_stream_without_redshift():
)
@mock_kinesis
def test_deescribe_non_existent_stream():
client = boto3.client("firehose", region_name="us-east-1")
@mock_firehose
def test_create_http_stream():
"""Verify fields of a HTTP delivery stream."""
client = boto3.client("firehose", region_name=TEST_REGION)
client.describe_delivery_stream.when.called_with(
DeliveryStreamName="not-a-stream"
).should.throw(ClientError)
stream_name = f"stream_{get_random_hex(6)}"
response = create_http_delivery_stream(client, stream_name)
stream_arn = response["DeliveryStreamARN"]
response = client.describe_delivery_stream(DeliveryStreamName=stream_name)
stream_description = response["DeliveryStreamDescription"]
@mock_kinesis
def test_list_and_delete_stream():
client = boto3.client("firehose", region_name="us-east-1")
# Sure and Freezegun don't play nicely together
_ = stream_description.pop("CreateTimestamp")
_ = stream_description.pop("LastUpdateTimestamp")
create_redshift_delivery_stream(client, "stream1")
create_redshift_delivery_stream(client, "stream2")
set(client.list_delivery_streams()["DeliveryStreamNames"]).should.equal(
set(["stream1", "stream2"])
)
client.delete_delivery_stream(DeliveryStreamName="stream1")
set(client.list_delivery_streams()["DeliveryStreamNames"]).should.equal(
set(["stream2"])
)
@mock_kinesis
def test_put_record():
client = boto3.client("firehose", region_name="us-east-1")
create_redshift_delivery_stream(client, "stream1")
client.put_record(DeliveryStreamName="stream1", Record={"Data": "some data"})
@mock_kinesis
def test_put_record_batch():
client = boto3.client("firehose", region_name="us-east-1")
create_redshift_delivery_stream(client, "stream1")
client.put_record_batch(
DeliveryStreamName="stream1",
Records=[{"Data": "some data1"}, {"Data": "some data2"}],
stream_description.should.equal(
{
"DeliveryStreamName": stream_name,
"DeliveryStreamARN": stream_arn,
"DeliveryStreamStatus": "ACTIVE",
"DeliveryStreamType": "DirectPut",
"VersionId": "1",
"Destinations": [
{
"DestinationId": "destinationId-000000000001",
"HttpEndpointDestinationDescription": {
"EndpointConfiguration": {"Url": "google.com"},
"RetryOptions": {"DurationInSeconds": 100},
"BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124},
"CloudWatchLoggingOptions": {"Enabled": False},
"S3DestinationDescription": {
"RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(
ACCOUNT_ID
),
"BucketARN": "arn:aws:s3:::firehose-test",
"Prefix": "myFolder/",
"BufferingHints": {
"SizeInMBs": 123,
"IntervalInSeconds": 124,
},
"CompressionFormat": "UNCOMPRESSED",
},
},
}
],
"HasMoreDestinations": False,
}
)

View File

@ -0,0 +1,148 @@
"""Unit tests verifying put-related delivery stream APIs."""
import boto3
import sure # noqa pylint: disable=unused-import
from moto import mock_firehose
from moto import mock_s3
from moto.core import ACCOUNT_ID
from moto.core.utils import get_random_hex
from tests.test_firehose.test_firehose import TEST_REGION
from tests.test_firehose.test_firehose import sample_s3_dest_config
from tests.test_firehose.test_firehose_destination_types import (
create_redshift_delivery_stream,
)
S3_LOCATION_CONSTRAINT = "us-west-1"
@mock_firehose
def test_put_record_redshift_destination():
"""Test invocations of put_record() to a Redshift destination.
At the moment, for Redshift or Elasticsearch destinations, the data
is just thrown away
"""
client = boto3.client("firehose", region_name=TEST_REGION)
stream_name = f"test_put_record_{get_random_hex(6)}"
create_redshift_delivery_stream(client, stream_name)
result = client.put_record(
DeliveryStreamName=stream_name, Record={"Data": "some test data"}
)
assert set(result.keys()) == {"RecordId", "Encrypted", "ResponseMetadata"}
@mock_firehose
def test_put_record_batch_redshift_destination():
"""Test invocations of put_record_batch() to a Redshift destination.
At the moment, for Redshift or Elasticsearch destinations, the data
is just thrown away
"""
client = boto3.client("firehose", region_name=TEST_REGION)
stream_name = f"test_put_record_{get_random_hex(6)}"
create_redshift_delivery_stream(client, stream_name)
records = [{"Data": "one"}, {"Data": "two"}, {"Data": "three"}]
result = client.put_record_batch(DeliveryStreamName=stream_name, Records=records)
assert set(result.keys()) == {
"FailedPutCount",
"Encrypted",
"RequestResponses",
"ResponseMetadata",
}
assert result["FailedPutCount"] == 0
assert result["Encrypted"] is False
for response in result["RequestResponses"]:
assert set(response.keys()) == {"RecordId"}
@mock_firehose
def test_put_record_http_destination():
"""Test invocations of put_record() to a Http destination."""
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
stream_name = f"test_put_record_{get_random_hex(6)}"
client.create_delivery_stream(
DeliveryStreamName=stream_name,
HttpEndpointDestinationConfiguration={
"EndpointConfiguration": {"Url": "https://google.com"},
"S3Configuration": s3_dest_config,
},
)
result = client.put_record(
DeliveryStreamName=stream_name, Record={"Data": "some test data"}
)
assert set(result.keys()) == {"RecordId", "Encrypted", "ResponseMetadata"}
@mock_firehose
def test_put_record_batch_http_destination():
"""Test invocations of put_record_batch() to a Http destination."""
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
stream_name = f"test_put_record_{get_random_hex(6)}"
client.create_delivery_stream(
DeliveryStreamName=stream_name,
HttpEndpointDestinationConfiguration={
"EndpointConfiguration": {"Url": "https://google.com"},
"S3Configuration": s3_dest_config,
},
)
records = [{"Data": "one"}, {"Data": "two"}, {"Data": "three"}]
result = client.put_record_batch(DeliveryStreamName=stream_name, Records=records)
assert set(result.keys()) == {
"FailedPutCount",
"Encrypted",
"RequestResponses",
"ResponseMetadata",
}
assert result["FailedPutCount"] == 0
assert result["Encrypted"] is False
for response in result["RequestResponses"]:
assert set(response.keys()) == {"RecordId"}
@mock_s3
@mock_firehose
def test_put_record_batch_extended_s3_destination():
"""Test invocations of put_record_batch() to a S3 destination."""
client = boto3.client("firehose", region_name=TEST_REGION)
# Create a S3 bucket.
bucket_name = "firehosetestbucket"
s3_client = boto3.client("s3", region_name=TEST_REGION)
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": S3_LOCATION_CONSTRAINT},
)
stream_name = f"test_put_record_{get_random_hex(6)}"
client.create_delivery_stream(
DeliveryStreamName=stream_name,
ExtendedS3DestinationConfiguration={
"RoleARN": f"arn:aws:iam::{ACCOUNT_ID}:role/firehose-test-role",
"BucketARN": f"arn:aws:s3::{bucket_name}",
},
)
records = [{"Data": "one"}, {"Data": "two"}, {"Data": "three"}]
result = client.put_record_batch(DeliveryStreamName=stream_name, Records=records)
assert set(result.keys()) == {
"FailedPutCount",
"Encrypted",
"RequestResponses",
"ResponseMetadata",
}
assert result["FailedPutCount"] == 0
assert result["Encrypted"] is False
for response in result["RequestResponses"]:
assert set(response.keys()) == {"RecordId"}
# Pull data from S3 bucket.
bucket_objects = s3_client.list_objects_v2(Bucket=bucket_name)
response = s3_client.get_object(
Bucket=bucket_name, Key=bucket_objects["Contents"][0]["Key"]
)
assert response["Body"].read() == b"onetwothree"

View File

@ -0,0 +1,158 @@
"""Unit tests verifying tag-related delivery stream APIs."""
import boto3
from botocore.exceptions import ClientError
import pytest
from moto import mock_firehose
from moto.core import ACCOUNT_ID
from moto.core.utils import get_random_hex
from moto.firehose.models import MAX_TAGS_PER_DELIVERY_STREAM
from tests.test_firehose.test_firehose import TEST_REGION
from tests.test_firehose.test_firehose import sample_s3_dest_config
@mock_firehose
def test_list_tags_for_delivery_stream():
"""Test invocations of list_tags_for_delivery_stream()."""
client = boto3.client("firehose", region_name=TEST_REGION)
stream_name = f"test_list_tags_{get_random_hex(6)}"
number_of_tags = 50
tags = [{"Key": f"{x}_k", "Value": f"{x}_v"} for x in range(1, number_of_tags + 1)]
# Create a delivery stream to work with.
client.create_delivery_stream(
DeliveryStreamName=stream_name,
S3DestinationConfiguration=sample_s3_dest_config(),
Tags=tags,
)
# Verify limit works.
result = client.list_tags_for_delivery_stream(
DeliveryStreamName=stream_name, Limit=1
)
assert len(result["Tags"]) == 1
assert result["Tags"] == [{"Key": "1_k", "Value": "1_v"}]
assert result["HasMoreTags"] is True
result = client.list_tags_for_delivery_stream(
DeliveryStreamName=stream_name, Limit=number_of_tags
)
assert len(result["Tags"]) == number_of_tags
assert result["HasMoreTags"] is False
# Verify exclusive_start_tag_key returns truncated list.
result = client.list_tags_for_delivery_stream(
DeliveryStreamName=stream_name, ExclusiveStartTagKey="30_k"
)
assert len(result["Tags"]) == number_of_tags - 30
expected_tags = [
{"Key": f"{x}_k", "Value": f"{x}_v"} for x in range(31, number_of_tags + 1)
]
assert result["Tags"] == expected_tags
assert result["HasMoreTags"] is False
result = client.list_tags_for_delivery_stream(
DeliveryStreamName=stream_name, ExclusiveStartTagKey=f"{number_of_tags}_k"
)
assert len(result["Tags"]) == 0
assert result["HasMoreTags"] is False
# boto3 ignores bad stream names for ExclusiveStartTagKey.
result = client.list_tags_for_delivery_stream(
DeliveryStreamName=stream_name, ExclusiveStartTagKey="foo"
)
assert len(result["Tags"]) == number_of_tags
assert result["Tags"] == tags
assert result["HasMoreTags"] is False
# Verify no parameters returns entire list.
client.list_tags_for_delivery_stream(DeliveryStreamName=stream_name)
assert len(result["Tags"]) == number_of_tags
assert result["Tags"] == tags
assert result["HasMoreTags"] is False
@mock_firehose
def test_tag_delivery_stream():
"""Test successful, failed invocations of tag_delivery_stream()."""
client = boto3.client("firehose", region_name=TEST_REGION)
# Create a delivery stream for testing purposes.
stream_name = f"test_tags_{get_random_hex(6)}"
client.create_delivery_stream(
DeliveryStreamName=stream_name,
ExtendedS3DestinationConfiguration=sample_s3_dest_config(),
)
# Unknown stream name.
unknown_name = "foo"
with pytest.raises(ClientError) as exc:
client.tag_delivery_stream(
DeliveryStreamName=unknown_name, Tags=[{"Key": "foo", "Value": "bar"}]
)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert (
f"Firehose {unknown_name} under account {ACCOUNT_ID} not found"
in err["Message"]
)
# Too many tags.
with pytest.raises(ClientError) as exc:
client.tag_delivery_stream(
DeliveryStreamName=stream_name,
Tags=[{"Key": f"{x}", "Value": f"{x}"} for x in range(51)],
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
f"failed to satisify contstraint: Member must have length "
f"less than or equal to {MAX_TAGS_PER_DELIVERY_STREAM}"
) in err["Message"]
# Bad tags.
with pytest.raises(ClientError) as exc:
client.tag_delivery_stream(
DeliveryStreamName=stream_name, Tags=[{"Key": "foo!", "Value": "bar"}],
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
"1 validation error detected: Value 'foo!' at 'tags.1.member.key' "
"failed to satisfy constraint: Member must satisfy regular "
"expression pattern"
) in err["Message"]
# Successful addition of tags.
added_tags = [{"Key": f"{x}", "Value": f"{x}"} for x in range(10)]
client.tag_delivery_stream(DeliveryStreamName=stream_name, Tags=added_tags)
results = client.list_tags_for_delivery_stream(DeliveryStreamName=stream_name)
assert len(results["Tags"]) == 10
assert results["Tags"] == added_tags
@mock_firehose
def test_untag_delivery_stream():
"""Test successful, failed invocations of untag_delivery_stream()."""
client = boto3.client("firehose", region_name=TEST_REGION)
# Create a delivery stream for testing purposes.
stream_name = f"test_untag_{get_random_hex(6)}"
tag_list = [
{"Key": "one", "Value": "1"},
{"Key": "two", "Value": "2"},
{"Key": "three", "Value": "3"},
]
client.create_delivery_stream(
DeliveryStreamName=stream_name,
ExtendedS3DestinationConfiguration=sample_s3_dest_config(),
Tags=tag_list,
)
# Untag all of the tags. Verify there are no more tags.
tag_keys = [x["Key"] for x in tag_list]
client.untag_delivery_stream(DeliveryStreamName=stream_name, TagKeys=tag_keys)
results = client.list_tags_for_delivery_stream(DeliveryStreamName=stream_name)
assert not results["Tags"]
assert not results["HasMoreTags"]