diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index 9af30b87e..4050ccfb0 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -4525,18 +4525,22 @@
0% implemented -- [ ] create_delivery_stream -- [ ] delete_delivery_stream -- [ ] describe_delivery_stream -- [ ] list_delivery_streams -- [ ] list_tags_for_delivery_stream -- [ ] put_record -- [ ] put_record_batch +- [ ] can_paginate +- [X] create_delivery_stream +- [X] delete_delivery_stream +- [X] describe_delivery_stream +- [ ] generate_presigned_url +- [ ] get_paginator +- [ ] get_waiter +- [X] list_delivery_streams +- [X] list_tags_for_delivery_stream +- [X] put_record +- [X] put_record_batch - [ ] start_delivery_stream_encryption - [ ] stop_delivery_stream_encryption -- [ ] tag_delivery_stream -- [ ] untag_delivery_stream -- [ ] update_destination +- [X] tag_delivery_stream +- [X] untag_delivery_stream +- [X] update_destination
## fis diff --git a/docs/index.rst b/docs/index.rst index a5fc2ad8b..c769cb26e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -62,6 +62,8 @@ Currently implemented Services: +---------------------------+-----------------------+------------------------------------+ | EMR | @mock_emr | core endpoints done | +---------------------------+-----------------------+------------------------------------+ +| Firehose | @mock_firehose | basic endpoints done | ++---------------------------+-----------------------+------------------------------------+ | Forecast | @mock_forecast | basic endpoints done | +---------------------------+-----------------------+------------------------------------+ | Glacier | @mock_glacier | core endpoints done | diff --git a/moto/__init__.py b/moto/__init__.py index f5e982c80..9967c87d1 100644 --- a/moto/__init__.py +++ b/moto/__init__.py @@ -75,6 +75,7 @@ mock_elbv2 = lazy_load(".elbv2", "mock_elbv2") mock_emr = lazy_load(".emr", "mock_emr") mock_emr_deprecated = lazy_load(".emr", "mock_emr_deprecated") mock_events = lazy_load(".events", "mock_events") +mock_firehose = lazy_load(".firehose", "mock_firehose") mock_forecast = lazy_load(".forecast", "mock_forecast") mock_glacier = lazy_load(".glacier", "mock_glacier") mock_glacier_deprecated = lazy_load(".glacier", "mock_glacier_deprecated") diff --git a/moto/backend_index.py b/moto/backend_index.py index fa37cf31f..d300f2484 100644 --- a/moto/backend_index.py +++ b/moto/backend_index.py @@ -47,6 +47,7 @@ backend_url_patterns = [ ("emr", re.compile("https?://(.+).elasticmapreduce.amazonaws.com")), ("emr", re.compile("https?://elasticmapreduce.(.+).amazonaws.com")), ("events", re.compile("https?://events\\.(.+)\\.amazonaws\\.com")), + ("firehose", re.compile("https?://firehose\\.(.+)\\.amazonaws\\.com")), ("forecast", re.compile("https?://forecast\\.(.+)\\.amazonaws\\.com")), ("glacier", re.compile("https?://glacier.(.+).amazonaws.com")), ("glue", re.compile("https?://glue\\.(.+)\\.amazonaws\\.com")), @@ -54,7 +55,6 @@ backend_url_patterns = [ ("iot", re.compile("https?://iot\\.(.+)\\.amazonaws\\.com")), ("iot-data", re.compile("https?://data.iot.(.+).amazonaws.com")), ("kinesis", re.compile("https?://kinesis\\.(.+)\\.amazonaws\\.com")), - ("kinesis", re.compile("https?://firehose\\.(.+)\\.amazonaws\\.com")), ("kinesisvideo", re.compile("https?://kinesisvideo.(.+).amazonaws.com")), ( "kinesis-video-archived-media", diff --git a/moto/firehose/__init__.py b/moto/firehose/__init__.py new file mode 100644 index 000000000..06964e274 --- /dev/null +++ b/moto/firehose/__init__.py @@ -0,0 +1,5 @@ +"""Firehose module initialization; sets value for base decorator.""" +from .models import firehose_backends +from ..core.models import base_decorator + +mock_firehose = base_decorator(firehose_backends) diff --git a/moto/firehose/exceptions.py b/moto/firehose/exceptions.py new file mode 100644 index 000000000..a556965e2 --- /dev/null +++ b/moto/firehose/exceptions.py @@ -0,0 +1,56 @@ +"""Exceptions raised by the Firehose service.""" +from moto.core.exceptions import JsonRESTError + + +class ConcurrentModificationException(JsonRESTError): + """Existing config has a version ID that does not match given ID.""" + + code = 400 + + def __init__(self, message): + super().__init__("ConcurrentModificationException", message) + + +class InvalidArgumentException(JsonRESTError): + """The specified input parameter has a value that is not valid.""" + + code = 400 + + def __init__(self, message): + super().__init__("InvalidArgumentException", message) + + +class LimitExceededException(JsonRESTError): + """You have already reached the limit for a requested resource.""" + + code = 400 + + def __init__(self, message): + super().__init__("LimitExceededException", message) + + +class ResourceInUseException(JsonRESTError): + """The resource is already in use and not available for this operation.""" + + code = 400 + + def __init__(self, message): + super().__init__("ResourceInUseException", message) + + +class ResourceNotFoundException(JsonRESTError): + """The specified resource could not be found.""" + + code = 400 + + def __init__(self, message): + super().__init__("ResourceNotFoundException", message) + + +class ValidationException(JsonRESTError): + """The tag key or tag value is not valid.""" + + code = 400 + + def __init__(self, message): + super().__init__("ValidationException", message) diff --git a/moto/firehose/models.py b/moto/firehose/models.py new file mode 100644 index 000000000..9c3d24d40 --- /dev/null +++ b/moto/firehose/models.py @@ -0,0 +1,635 @@ +"""FirehoseBackend class with methods for supported APIs. + +Incomplete list of unfinished items: + - The create_delivery_stream() argument + DeliveryStreamEncryptionConfigurationInput is not supported. + - Better validation of delivery destination parameters, e.g., + validation of the url for an http endpoint (boto3 does this), + - Better handling of the put_record_batch() API. Not only is + the existing logic bare bones, but for the ElasticSearch and + RedShift destinations, the data is just ignored. + - put_record_batch() handling of errors is minimal and no errors + are reported back to the user. Instead an exception is raised. + - put_record(), put_record_batch() always set "Encrypted" to False. +""" +from base64 import b64decode +from datetime import datetime +from time import time +from uuid import uuid4 +import warnings + +import requests + +from boto3 import Session + +from moto.core import BaseBackend, BaseModel +from moto.core import ACCOUNT_ID +from moto.firehose.exceptions import ( + ConcurrentModificationException, + InvalidArgumentException, + LimitExceededException, + ResourceInUseException, + ResourceNotFoundException, + ValidationException, +) +from moto.core.utils import get_random_hex +from moto.s3 import s3_backend +from moto.utilities.tagging_service import TaggingService + +MAX_TAGS_PER_DELIVERY_STREAM = 50 + +DESTINATION_TYPES_TO_NAMES = { + "s3": "S3", + "extended_s3": "ExtendedS3", + "http_endpoint": "HttpEndpoint", + "elasticsearch": "Elasticsearch", + "redshift": "Redshift", + "splunk": "Splunk", # Unimplemented +} + + +def find_destination_config_in_args(api_args): + """Return (config_arg, config_name) tuple for destination config. + + Determines which destination config(s) have been specified. The + alternative is to use a bunch of 'if' statements to check each + destination configuration. If more than one destination config is + specified, than an exception is raised. + + A logical name for the destination type is returned along with the + destination config as it's useful way to compare current and replacement + destinations. + """ + destination_names = DESTINATION_TYPES_TO_NAMES.keys() + configs = [] + for arg_name, arg_value in api_args.items(): + # Ignore arguments that are not destination configs. + if "_destination" not in arg_name: + continue + + # If the destination config value is non-null, save it. + name = arg_name.split("_destination")[0] + if name in destination_names and arg_value: + configs.append((DESTINATION_TYPES_TO_NAMES[name], arg_value)) + + # Only a single destination configuration is allowed. + if len(configs) > 1: + raise InvalidArgumentException( + "Exactly one destination configuration is supported for a Firehose" + ) + return configs[0] + + +def create_s3_destination_config(extended_s3_destination_config): + """Return dict with selected fields copied from ExtendedS3 config. + + When an ExtendedS3 config is chosen, AWS tacks on a S3 config as + well. When the same field names for S3 and ExtendedS3 exists, + the ExtendedS3 fields are copied to the added S3 destination. + """ + fields_not_needed = [ + "S3BackupMode", + "S3Description", + "DataFormatconversionConfiguration", + "DynamicPartitionConfiguration", + ] + destination = {} + for field, value in extended_s3_destination_config.items(): + if field in fields_not_needed: + continue + destination[field] = value + return destination + + +class DeliveryStream( + BaseModel +): # pylint: disable=too-few-public-methods,too-many-instance-attributes + """Represents a delivery stream, its source and destination configs.""" + + STATES = {"CREATING", "ACTIVE", "CREATING_FAILED"} + + MAX_STREAMS_PER_REGION = 50 + + def __init__( + self, + region, + delivery_stream_name, + delivery_stream_type, + kinesis_stream_source_configuration, + destination_name, + destination_config, + ): # pylint: disable=too-many-arguments + self.delivery_stream_status = "CREATING" + self.delivery_stream_name = delivery_stream_name + self.delivery_stream_type = ( + delivery_stream_type if delivery_stream_type else "DirectPut" + ) + + self.source = kinesis_stream_source_configuration + self.destinations = [ + { + "destination_id": "destinationId-000000000001", + destination_name: destination_config, + } + ] + if destination_name == "ExtendedS3": + # Add a S3 destination as well, minus a few ExtendedS3 fields. + self.destinations[0]["S3"] = create_s3_destination_config( + destination_config + ) + elif "S3Configuration" in destination_config: + # S3Configuration becomes S3DestinationDescription for the + # other destinations. + self.destinations[0][destination_name][ + "S3DestinationDescription" + ] = destination_config["S3Configuration"] + del self.destinations[0][destination_name]["S3Configuration"] + + self.delivery_stream_status = "ACTIVE" + self.delivery_stream_arn = f"arn:aws:firehose:{region}:{ACCOUNT_ID}:/delivery_stream/{delivery_stream_name}" + + self.create_timestamp = str(datetime.utcnow()) + self.version_id = "1" # Used to track updates of destination configs + + # I believe boto3 only adds this field after an update ... + self.last_update_timestamp = str(datetime.utcnow()) + + +class FirehoseBackend(BaseBackend): + """Implementation of Firehose APIs.""" + + def __init__(self, region_name=None): + self.region_name = region_name + self.delivery_streams = {} + self.tagger = TaggingService() + + def lookup_name_from_arn(self, arn): + """Given an ARN, return the associated delivery stream name.""" + # TODO - need to test + return self.delivery_streams.get(arn.split("/")[-1]) + + def reset(self): + """Re-initializes all attributes for this instance.""" + region_name = self.region_name + self.__dict__ = {} + self.__init__(region_name) + + def create_delivery_stream( + self, + region, + delivery_stream_name, + delivery_stream_type, + kinesis_stream_source_configuration, + delivery_stream_encryption_configuration_input, + s3_destination_configuration, + extended_s3_destination_configuration, + redshift_destination_configuration, + elasticsearch_destination_configuration, + splunk_destination_configuration, + http_endpoint_destination_configuration, + tags, + ): # pylint: disable=too-many-arguments,too-many-locals,unused-argument + """Create a Kinesis Data Firehose delivery stream.""" + (destination_name, destination_config) = find_destination_config_in_args( + locals() + ) + + if delivery_stream_name in self.delivery_streams: + raise ResourceInUseException( + f"Firehose {delivery_stream_name} under accountId {ACCOUNT_ID} " + f"already exists" + ) + + if len(self.delivery_streams) == DeliveryStream.MAX_STREAMS_PER_REGION: + raise LimitExceededException( + f"You have already consumed your firehose quota of " + f"{DeliveryStream.MAX_STREAMS_PER_REGION} hoses. Firehose " + f"names: {list(self.delivery_streams.keys())}" + ) + + # Rule out situations that are not yet implemented. + if delivery_stream_encryption_configuration_input: + warnings.warn( + "A delivery stream with server-side encryption enabled is not " + "yet implemented" + ) + + if destination_name == "Splunk": + warnings.warn("A Splunk destination delivery stream is not yet implemented") + + if ( + kinesis_stream_source_configuration + and delivery_stream_type != "KinesisStreamAsSource" + ): + raise InvalidArgumentException( + "KinesisSourceStreamConfig is only applicable for " + "KinesisStreamAsSource stream type" + ) + + # Validate the tags before proceeding. + errmsg = self.tagger.validate_tags(tags or []) + if errmsg: + raise ValidationException(errmsg) + + # Create a DeliveryStream instance that will be stored and indexed + # by delivery stream name. This instance will update the state and + # create the ARN. + delivery_stream = DeliveryStream( + region, + delivery_stream_name, + delivery_stream_type, + kinesis_stream_source_configuration, + destination_name, + destination_config, + ) + self.tagger.tag_resource(delivery_stream.delivery_stream_arn, tags or []) + + self.delivery_streams[delivery_stream_name] = delivery_stream + return self.delivery_streams[delivery_stream_name].delivery_stream_arn + + def delete_delivery_stream( + self, delivery_stream_name, allow_force_delete=False + ): # pylint: disable=unused-argument + """Delete a delivery stream and its data. + + AllowForceDelete option is ignored as we only superficially + apply state. + """ + delivery_stream = self.delivery_streams.get(delivery_stream_name) + if not delivery_stream: + raise ResourceNotFoundException( + f"Firehose {delivery_stream_name} under account {ACCOUNT_ID} " + f"not found." + ) + + self.tagger.delete_all_tags_for_resource(delivery_stream.delivery_stream_arn) + + delivery_stream.delivery_stream_status = "DELETING" + self.delivery_streams.pop(delivery_stream_name) + + def describe_delivery_stream( + self, delivery_stream_name, limit, exclusive_start_destination_id, + ): # pylint: disable=unused-argument + """Return description of specified delivery stream and its status. + + Note: the 'limit' and 'exclusive_start_destination_id' parameters + are not currently processed/implemented. + """ + delivery_stream = self.delivery_streams.get(delivery_stream_name) + if not delivery_stream: + raise ResourceNotFoundException( + f"Firehose {delivery_stream_name} under account {ACCOUNT_ID} " + f"not found." + ) + + result = {"DeliveryStreamDescription": {"HasMoreDestinations": False}} + for attribute, attribute_value in vars(delivery_stream).items(): + if not attribute_value: + continue + + # Convert from attribute's snake case to camel case for outgoing + # JSON. + name = "".join([x.capitalize() for x in attribute.split("_")]) + + # Fooey ... always an exception to the rule: + if name == "DeliveryStreamArn": + name = "DeliveryStreamARN" + + if name != "Destinations": + if name == "Source": + result["DeliveryStreamDescription"][name] = { + "KinesisStreamSourceDescription": attribute_value + } + else: + result["DeliveryStreamDescription"][name] = attribute_value + continue + + result["DeliveryStreamDescription"]["Destinations"] = [] + for destination in attribute_value: + description = {} + for key, value in destination.items(): + if key == "destination_id": + description["DestinationId"] = value + else: + description[f"{key}DestinationDescription"] = value + + result["DeliveryStreamDescription"]["Destinations"].append(description) + + return result + + def list_delivery_streams( + self, limit, delivery_stream_type, exclusive_start_delivery_stream_name + ): + """Return list of delivery streams in alphabetic order of names.""" + result = {"DeliveryStreamNames": [], "HasMoreDeliveryStreams": False} + if not self.delivery_streams: + return result + + # If delivery_stream_type is specified, filter out any stream that's + # not of that type. + stream_list = self.delivery_streams.keys() + if delivery_stream_type: + stream_list = [ + x + for x in stream_list + if self.delivery_streams[x].delivery_stream_type == delivery_stream_type + ] + + # The list is sorted alphabetically, not alphanumerically. + sorted_list = sorted(stream_list) + + # Determine the limit or number of names to return in the list. + limit = limit or DeliveryStream.MAX_STREAMS_PER_REGION + + # If a starting delivery stream name is given, find the index into + # the sorted list, then add one to get the name following it. If the + # exclusive_start_delivery_stream_name doesn't exist, it's ignored. + start = 0 + if exclusive_start_delivery_stream_name: + if self.delivery_streams.get(exclusive_start_delivery_stream_name): + start = sorted_list.index(exclusive_start_delivery_stream_name) + 1 + + result["DeliveryStreamNames"] = sorted_list[start : start + limit] + if len(sorted_list) > (start + limit): + result["HasMoreDeliveryStreams"] = True + return result + + def list_tags_for_delivery_stream( + self, delivery_stream_name, exclusive_start_tag_key, limit, + ): + """Return list of tags.""" + result = {"Tags": [], "HasMoreTags": False} + delivery_stream = self.delivery_streams.get(delivery_stream_name) + if not delivery_stream: + raise ResourceNotFoundException( + f"Firehose {delivery_stream_name} under account {ACCOUNT_ID} " + f"not found." + ) + + tags = self.tagger.list_tags_for_resource(delivery_stream.delivery_stream_arn)[ + "Tags" + ] + keys = self.tagger.extract_tag_names(tags) + + # If a starting tag is given and can be found, find the index into + # tags, then add one to get the tag following it. + start = 0 + if exclusive_start_tag_key: + if exclusive_start_tag_key in keys: + start = keys.index(exclusive_start_tag_key) + 1 + + limit = limit or MAX_TAGS_PER_DELIVERY_STREAM + result["Tags"] = tags[start : start + limit] + if len(tags) > (start + limit): + result["HasMoreTags"] = True + return result + + def put_record(self, delivery_stream_name, record): + """Write a single data record into a Kinesis Data firehose stream.""" + result = self.put_record_batch(delivery_stream_name, [record]) + return { + "RecordId": result["RequestResponses"][0]["RecordId"], + "Encrypted": False, + } + + @staticmethod + def put_http_records(http_destination, records): + """Put records to a HTTP destination.""" + # Mostly copied from localstack + url = http_destination["EndpointConfiguration"]["Url"] + headers = {"Content-Type": "application/json"} + record_to_send = { + "requestId": str(uuid4()), + "timestamp": int(time()), + "records": [{"data": record["Data"]} for record in records], + } + try: + requests.post(url, json=record_to_send, headers=headers) + except Exception as exc: + # This could be better ... + raise RuntimeError( + "Firehose PutRecord(Batch) to HTTP destination failed" + ) from exc + return [{"RecordId": str(uuid4())} for _ in range(len(records))] + + @staticmethod + def _format_s3_object_path(delivery_stream_name, version_id, prefix): + """Return a S3 object path in the expected format.""" + # Taken from LocalStack's firehose logic, with minor changes. + # See https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#s3-object-name + # Path prefix pattern: myApp/YYYY/MM/DD/HH/ + # Object name pattern: + # DeliveryStreamName-DeliveryStreamVersion-YYYY-MM-DD-HH-MM-SS-RandomString + prefix = f"{prefix}{'' if prefix.endswith('/') else '/'}" + now = datetime.utcnow() + return ( + f"{prefix}{now.strftime('%Y/%m/%d/%H')}/" + f"{delivery_stream_name}-{version_id}-" + f"{now.strftime('%Y-%m-%d-%H-%M-%S')}-{get_random_hex()}" + ) + + def put_s3_records(self, delivery_stream_name, version_id, s3_destination, records): + """Put records to a ExtendedS3 or S3 destination.""" + # Taken from LocalStack's firehose logic, with minor changes. + bucket_name = s3_destination["BucketARN"].split(":")[-1] + prefix = s3_destination.get("Prefix", "") + object_path = self._format_s3_object_path( + delivery_stream_name, version_id, prefix + ) + + batched_data = b"".join([b64decode(r["Data"]) for r in records]) + try: + s3_backend.put_object(bucket_name, object_path, batched_data) + except Exception as exc: + # This could be better ... + raise RuntimeError( + "Firehose PutRecord(Batch to S3 destination failed" + ) from exc + return [{"RecordId": str(uuid4())} for _ in range(len(records))] + + def put_record_batch(self, delivery_stream_name, records): + """Write multiple data records into a Kinesis Data firehose stream.""" + delivery_stream = self.delivery_streams.get(delivery_stream_name) + if not delivery_stream: + raise ResourceNotFoundException( + f"Firehose {delivery_stream_name} under account {ACCOUNT_ID} " + f"not found." + ) + + request_responses = [] + for destination in delivery_stream.destinations: + if "ExtendedS3" in destination: + # ExtendedS3 will be handled like S3,but in the future + # this will probably need to be revisited. This destination + # must be listed before S3 otherwise both destinations will + # be processed instead of just ExtendedS3. + request_responses = self.put_s3_records( + delivery_stream_name, + delivery_stream.version_id, + destination["ExtendedS3"], + records, + ) + elif "S3" in destination: + request_responses = self.put_s3_records( + delivery_stream_name, + delivery_stream.version_id, + destination["S3"], + records, + ) + elif "HttpEndpoint" in destination: + request_responses = self.put_http_records( + destination["HttpEndpoint"], records + ) + elif "Elasticsearch" in destination or "Redshift" in destination: + # This isn't implmented as these services aren't implemented, + # so ignore the data, but return a "proper" response. + request_responses = [ + {"RecordId": str(uuid4())} for _ in range(len(records)) + ] + + return { + "FailedPutCount": 0, + "Encrypted": False, + "RequestResponses": request_responses, + } + + def tag_delivery_stream(self, delivery_stream_name, tags): + """Add/update tags for specified delivery stream.""" + delivery_stream = self.delivery_streams.get(delivery_stream_name) + if not delivery_stream: + raise ResourceNotFoundException( + f"Firehose {delivery_stream_name} under account {ACCOUNT_ID} " + f"not found." + ) + + if len(tags) >= MAX_TAGS_PER_DELIVERY_STREAM: + raise ValidationException( + f"1 validation error detected: Value '{tags}' at 'tags' " + f"failed to satisify contstraint: Member must have length " + f"less than or equal to {MAX_TAGS_PER_DELIVERY_STREAM}" + ) + + errmsg = self.tagger.validate_tags(tags) + if errmsg: + raise ValidationException(errmsg) + + self.tagger.tag_resource(delivery_stream.delivery_stream_arn, tags) + + def untag_delivery_stream(self, delivery_stream_name, tag_keys): + """Removes tags from specified delivery stream.""" + delivery_stream = self.delivery_streams.get(delivery_stream_name) + if not delivery_stream: + raise ResourceNotFoundException( + f"Firehose {delivery_stream_name} under account {ACCOUNT_ID} " + f"not found." + ) + + # If a tag key doesn't exist for the stream, boto3 ignores it. + self.tagger.untag_resource_using_names( + delivery_stream.delivery_stream_arn, tag_keys + ) + + def update_destination( + self, + delivery_stream_name, + current_delivery_stream_version_id, + destination_id, + s3_destination_update, + extended_s3_destination_update, + s3_backup_mode, + redshift_destination_update, + elasticsearch_destination_update, + splunk_destination_update, + http_endpoint_destination_update, + ): # pylint: disable=unused-argument,too-many-arguments,too-many-locals + """Updates specified destination of specified delivery stream.""" + (destination_name, destination_config) = find_destination_config_in_args( + locals() + ) + + delivery_stream = self.delivery_streams.get(delivery_stream_name) + if not delivery_stream: + raise ResourceNotFoundException( + f"Firehose {delivery_stream_name} under accountId " + f"{ACCOUNT_ID} not found." + ) + + if destination_name == "Splunk": + warnings.warn("A Splunk destination delivery stream is not yet implemented") + + if delivery_stream.version_id != current_delivery_stream_version_id: + raise ConcurrentModificationException( + f"Cannot update firehose: {delivery_stream_name} since the " + f"current version id: {delivery_stream.version_id} and " + f"specified version id: {current_delivery_stream_version_id} " + f"do not match" + ) + + destination = {} + destination_idx = 0 + for destination in delivery_stream.destinations: + if destination["destination_id"] == destination_id: + break + destination_idx += 1 + else: + raise InvalidArgumentException("Destination Id {destination_id} not found") + + # Switching between Amazon ES and other services is not supported. + # For an Amazon ES destination, you can only update to another Amazon + # ES destination. Same with HTTP. Didn't test Splunk. + if ( + destination_name == "Elasticsearch" and "Elasticsearch" not in destination + ) or (destination_name == "HttpEndpoint" and "HttpEndpoint" not in destination): + raise InvalidArgumentException( + f"Changing the destination type to or from {destination_name} " + f"is not supported at this time." + ) + + # If this is a different type of destination configuration, + # the existing configuration is reset first. + if destination_name in destination: + delivery_stream.destinations[destination_idx][destination_name].update( + destination_config + ) + else: + delivery_stream.destinations[destination_idx] = { + "destination_id": destination_id, + destination_name: destination_config, + } + + # Once S3 is updated to an ExtendedS3 destination, both remain in + # the destination. That means when one is updated, the other needs + # to be updated as well. The problem is that they don't have the + # same fields. + if destination_name == "ExtendedS3": + delivery_stream.destinations[destination_idx][ + "S3" + ] = create_s3_destination_config(destination_config) + elif destination_name == "S3" and "ExtendedS3" in destination: + destination["ExtendedS3"] = { + k: v + for k, v in destination["S3"].items() + if k in destination["ExtendedS3"] + } + + # Increment version number and update the timestamp. + delivery_stream.version_id = str(int(current_delivery_stream_version_id) + 1) + delivery_stream.last_update_timestamp = str(datetime.utcnow()) + + # Unimplemented: processing of the "S3BackupMode" parameter. Per the + # documentation: "You can update a delivery stream to enable Amazon + # S3 backup if it is disabled. If backup is enabled, you can't update + # the delivery stream to disable it." + + +firehose_backends = {} +for available_region in Session().get_available_regions("firehose"): + firehose_backends[available_region] = FirehoseBackend() +for available_region in Session().get_available_regions( + "firehose", partition_name="aws-us-gov" +): + firehose_backends[available_region] = FirehoseBackend() +for available_region in Session().get_available_regions( + "firehose", partition_name="aws-cn" +): + firehose_backends[available_region] = FirehoseBackend() diff --git a/moto/firehose/responses.py b/moto/firehose/responses.py new file mode 100644 index 000000000..70d0d1209 --- /dev/null +++ b/moto/firehose/responses.py @@ -0,0 +1,110 @@ +"""Handles Firehose API requests, invokes method and returns response.""" +import json + +from moto.core.responses import BaseResponse +from .models import firehose_backends + + +class FirehoseResponse(BaseResponse): + """Handler for Firehose requests and responses.""" + + @property + def firehose_backend(self): + """Return backend instance specific to this region.""" + return firehose_backends[self.region] + + def create_delivery_stream(self): + """Prepare arguments and respond to CreateDeliveryStream request.""" + delivery_stream_arn = self.firehose_backend.create_delivery_stream( + self.region, + self._get_param("DeliveryStreamName"), + self._get_param("DeliveryStreamType"), + self._get_param("KinesisStreamSourceConfiguration"), + self._get_param("DeliveryStreamEncryptionConfigurationInput"), + self._get_param("S3DestinationConfiguration"), + self._get_param("ExtendedS3DestinationConfiguration"), + self._get_param("RedshiftDestinationConfiguration"), + self._get_param("ElasticsearchDestinationConfiguration"), + self._get_param("SplunkDestinationConfiguration"), + self._get_param("HttpEndpointDestinationConfiguration"), + self._get_param("Tags"), + ) + return json.dumps({"DeliveryStreamARN": delivery_stream_arn}) + + def delete_delivery_stream(self): + """Prepare arguments and respond to DeleteDeliveryStream request.""" + self.firehose_backend.delete_delivery_stream( + self._get_param("DeliveryStreamName"), self._get_param("AllowForceDelete"), + ) + return json.dumps({}) + + def describe_delivery_stream(self): + """Prepare arguments and respond to DescribeDeliveryStream request.""" + result = self.firehose_backend.describe_delivery_stream( + self._get_param("DeliveryStreamName"), + self._get_param("Limit"), + self._get_param("ExclusiveStartDestinationId"), + ) + return json.dumps(result) + + def list_delivery_streams(self): + """Prepare arguments and respond to ListDeliveryStreams request.""" + stream_list = self.firehose_backend.list_delivery_streams( + self._get_param("Limit"), + self._get_param("DeliveryStreamType"), + self._get_param("ExclusiveStartDeliveryStreamName"), + ) + return json.dumps(stream_list) + + def list_tags_for_delivery_stream(self): + """Prepare arguments and respond to ListTagsForDeliveryStream().""" + result = self.firehose_backend.list_tags_for_delivery_stream( + self._get_param("DeliveryStreamName"), + self._get_param("ExclusiveStartTagKey"), + self._get_param("Limit"), + ) + return json.dumps(result) + + def put_record(self): + """Prepare arguments and response to PutRecord().""" + result = self.firehose_backend.put_record( + self._get_param("DeliveryStreamName"), self._get_param("Record") + ) + return json.dumps(result) + + def put_record_batch(self): + """Prepare arguments and response to PutRecordBatch().""" + result = self.firehose_backend.put_record_batch( + self._get_param("DeliveryStreamName"), self._get_param("Records") + ) + return json.dumps(result) + + def tag_delivery_stream(self): + """Prepare arguments and respond to TagDeliveryStream request.""" + self.firehose_backend.tag_delivery_stream( + self._get_param("DeliveryStreamName"), self._get_param("Tags"), + ) + return json.dumps({}) + + def untag_delivery_stream(self): + """Prepare arguments and respond to UntagDeliveryStream().""" + self.firehose_backend.untag_delivery_stream( + self._get_param("DeliveryStreamName"), self._get_param("TagKeys"), + ) + return json.dumps({}) + + def update_destination(self): + """Prepare arguments and respond to UpdateDestination().""" + self.firehose_backend.update_destination( + self._get_param("DeliveryStreamName"), + self._get_param("CurrentDeliveryStreamVersionId"), + self._get_param("DestinationId"), + self._get_param("S3DestinationUpdate"), + self._get_param("ExtendedS3DestinationUpdate"), + self._get_param("S3BackupMode"), + self._get_param("RedshiftDestinationUpdate"), + self._get_param("ElasticsearchDestinationUpdate"), + self._get_param("SplunkDestinationUpdate"), + self._get_param("HttpEndpointDestinationUpdate"), + ) + return json.dumps({}) diff --git a/moto/firehose/urls.py b/moto/firehose/urls.py new file mode 100644 index 000000000..f3d6f49b8 --- /dev/null +++ b/moto/firehose/urls.py @@ -0,0 +1,6 @@ +"""Firehose base URL and path.""" +from .responses import FirehoseResponse + + +url_bases = [r"https?://firehose\.(.+)\.amazonaws\.com"] +url_paths = {"{0}/$": FirehoseResponse.dispatch} diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index 108519209..93764feb2 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -1,7 +1,7 @@ from __future__ import unicode_literals +from collections import OrderedDict import datetime -import time import re import itertools @@ -10,7 +10,6 @@ from hashlib import md5 from boto3 import Session -from collections import OrderedDict from moto.core import BaseBackend, BaseModel, CloudFormationModel from moto.core.utils import unix_time from moto.core import ACCOUNT_ID @@ -322,128 +321,9 @@ class Stream(CloudFormationModel): return self.stream_name -class FirehoseRecord(BaseModel): - def __init__(self, record_data): - self.record_id = 12345678 - self.record_data = record_data - - -class DeliveryStream(BaseModel): - def __init__(self, stream_name, **stream_kwargs): - self.name = stream_name - self.redshift_username = stream_kwargs.get("redshift_username") - self.redshift_password = stream_kwargs.get("redshift_password") - self.redshift_jdbc_url = stream_kwargs.get("redshift_jdbc_url") - self.redshift_role_arn = stream_kwargs.get("redshift_role_arn") - self.redshift_copy_command = stream_kwargs.get("redshift_copy_command") - - self.s3_config = stream_kwargs.get("s3_config") - self.extended_s3_config = stream_kwargs.get("extended_s3_config") - - self.redshift_s3_role_arn = stream_kwargs.get("redshift_s3_role_arn") - self.redshift_s3_bucket_arn = stream_kwargs.get("redshift_s3_bucket_arn") - self.redshift_s3_prefix = stream_kwargs.get("redshift_s3_prefix") - self.redshift_s3_compression_format = stream_kwargs.get( - "redshift_s3_compression_format", "UNCOMPRESSED" - ) - self.redshift_s3_buffering_hints = stream_kwargs.get( - "redshift_s3_buffering_hints" - ) - - self.elasticsearch_config = stream_kwargs.get("elasticsearch_config") - - self.records = [] - self.status = "ACTIVE" - self.created_at = datetime.datetime.utcnow() - self.last_updated = datetime.datetime.utcnow() - - @property - def arn(self): - return "arn:aws:firehose:us-east-1:{1}:deliverystream/{0}".format( - self.name, ACCOUNT_ID - ) - - def destinations_to_dict(self): - if self.s3_config: - return [ - {"DestinationId": "string", "S3DestinationDescription": self.s3_config} - ] - elif self.extended_s3_config: - return [ - { - "DestinationId": "string", - "ExtendedS3DestinationDescription": self.extended_s3_config, - } - ] - elif self.elasticsearch_config: - return [ - { - "DestinationId": "string", - "ElasticsearchDestinationDescription": { - "RoleARN": self.elasticsearch_config.get("RoleARN"), - "DomainARN": self.elasticsearch_config.get("DomainARN"), - "ClusterEndpoint": self.elasticsearch_config.get( - "ClusterEndpoint" - ), - "IndexName": self.elasticsearch_config.get("IndexName"), - "TypeName": self.elasticsearch_config.get("TypeName"), - "IndexRotationPeriod": self.elasticsearch_config.get( - "IndexRotationPeriod" - ), - "BufferingHints": self.elasticsearch_config.get( - "BufferingHints" - ), - "RetryOptions": self.elasticsearch_config.get("RetryOptions"), - "S3DestinationDescription": self.elasticsearch_config.get( - "S3Configuration" - ), - }, - } - ] - else: - return [ - { - "DestinationId": "string", - "RedshiftDestinationDescription": { - "ClusterJDBCURL": self.redshift_jdbc_url, - "CopyCommand": self.redshift_copy_command, - "RoleARN": self.redshift_role_arn, - "S3DestinationDescription": { - "BucketARN": self.redshift_s3_bucket_arn, - "BufferingHints": self.redshift_s3_buffering_hints, - "CompressionFormat": self.redshift_s3_compression_format, - "Prefix": self.redshift_s3_prefix, - "RoleARN": self.redshift_s3_role_arn, - }, - "Username": self.redshift_username, - }, - } - ] - - def to_dict(self): - return { - "DeliveryStreamDescription": { - "CreateTimestamp": time.mktime(self.created_at.timetuple()), - "DeliveryStreamARN": self.arn, - "DeliveryStreamName": self.name, - "DeliveryStreamStatus": self.status, - "Destinations": self.destinations_to_dict(), - "HasMoreDestinations": False, - "LastUpdateTimestamp": time.mktime(self.last_updated.timetuple()), - "VersionId": "string", - } - } - - def put_record(self, record_data): - record = FirehoseRecord(record_data) - self.records.append(record) - return record - - class KinesisBackend(BaseBackend): def __init__(self): self.streams = OrderedDict() - self.delivery_streams = {} def create_stream( self, stream_name, shard_count, retention_period_hours, region_name @@ -622,30 +502,6 @@ class KinesisBackend(BaseBackend): raise InvalidArgumentError(retention_period_hours) stream.retention_period_hours = retention_period_hours - """ Firehose """ - - def create_delivery_stream(self, stream_name, **stream_kwargs): - stream = DeliveryStream(stream_name, **stream_kwargs) - self.delivery_streams[stream_name] = stream - return stream - - def get_delivery_stream(self, stream_name): - if stream_name in self.delivery_streams: - return self.delivery_streams[stream_name] - else: - raise StreamNotFoundError(stream_name) - - def list_delivery_streams(self): - return self.delivery_streams.values() - - def delete_delivery_stream(self, stream_name): - self.delivery_streams.pop(stream_name) - - def put_firehose_record(self, stream_name, record_data): - stream = self.get_delivery_stream(stream_name) - record = stream.put_record(record_data) - return record - def list_tags_for_stream( self, stream_name, exclusive_start_tag_key=None, limit=None ): diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index 280663b33..7f8ab233e 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -15,13 +15,6 @@ class KinesisResponse(BaseResponse): def kinesis_backend(self): return kinesis_backends[self.region] - @property - def is_firehose(self): - host = self.headers.get("host") or self.headers["Host"] - return host.startswith("firehose") or "firehose" in self.headers.get( - "Authorization", "" - ) - def create_stream(self): stream_name = self.parameters.get("StreamName") shard_count = self.parameters.get("ShardCount") @@ -103,8 +96,6 @@ class KinesisResponse(BaseResponse): ) def put_record(self): - if self.is_firehose: - return self.firehose_put_record() stream_name = self.parameters.get("StreamName") partition_key = self.parameters.get("PartitionKey") explicit_hash_key = self.parameters.get("ExplicitHashKey") @@ -122,8 +113,6 @@ class KinesisResponse(BaseResponse): return json.dumps({"SequenceNumber": sequence_number, "ShardId": shard_id}) def put_records(self): - if self.is_firehose: - return self.put_record_batch() stream_name = self.parameters.get("StreamName") records = self.parameters.get("Records") @@ -165,85 +154,6 @@ class KinesisResponse(BaseResponse): ) return "" - """ Firehose """ - - def create_delivery_stream(self): - stream_name = self.parameters["DeliveryStreamName"] - redshift_config = self.parameters.get("RedshiftDestinationConfiguration") - s3_config = self.parameters.get("S3DestinationConfiguration") - extended_s3_config = self.parameters.get("ExtendedS3DestinationConfiguration") - elasticsearch_config = self.parameters.get( - "ElasticsearchDestinationConfiguration" - ) - - if redshift_config: - redshift_s3_config = redshift_config["S3Configuration"] - stream_kwargs = { - "redshift_username": redshift_config["Username"], - "redshift_password": redshift_config["Password"], - "redshift_jdbc_url": redshift_config["ClusterJDBCURL"], - "redshift_role_arn": redshift_config["RoleARN"], - "redshift_copy_command": redshift_config["CopyCommand"], - "redshift_s3_role_arn": redshift_s3_config["RoleARN"], - "redshift_s3_bucket_arn": redshift_s3_config["BucketARN"], - "redshift_s3_prefix": redshift_s3_config["Prefix"], - "redshift_s3_compression_format": redshift_s3_config.get( - "CompressionFormat" - ), - "redshift_s3_buffering_hints": redshift_s3_config["BufferingHints"], - } - elif s3_config: - stream_kwargs = {"s3_config": s3_config} - elif extended_s3_config: - stream_kwargs = {"extended_s3_config": extended_s3_config} - elif elasticsearch_config: - stream_kwargs = {"elasticsearch_config": elasticsearch_config} - else: - stream_kwargs = {} - - stream = self.kinesis_backend.create_delivery_stream( - stream_name, **stream_kwargs - ) - return json.dumps({"DeliveryStreamARN": stream.arn}) - - def describe_delivery_stream(self): - stream_name = self.parameters["DeliveryStreamName"] - stream = self.kinesis_backend.get_delivery_stream(stream_name) - return json.dumps(stream.to_dict()) - - def list_delivery_streams(self): - streams = self.kinesis_backend.list_delivery_streams() - return json.dumps( - { - "DeliveryStreamNames": [stream.name for stream in streams], - "HasMoreDeliveryStreams": False, - } - ) - - def delete_delivery_stream(self): - stream_name = self.parameters["DeliveryStreamName"] - self.kinesis_backend.delete_delivery_stream(stream_name) - return json.dumps({}) - - def firehose_put_record(self): - stream_name = self.parameters["DeliveryStreamName"] - record_data = self.parameters["Record"]["Data"] - - record = self.kinesis_backend.put_firehose_record(stream_name, record_data) - return json.dumps({"RecordId": record.record_id}) - - def put_record_batch(self): - stream_name = self.parameters["DeliveryStreamName"] - records = self.parameters["Records"] - - request_responses = [] - for record in records: - record_response = self.kinesis_backend.put_firehose_record( - stream_name, record["Data"] - ) - request_responses.append({"RecordId": record_response.record_id}) - return json.dumps({"FailedPutCount": 0, "RequestResponses": request_responses}) - def add_tags_to_stream(self): stream_name = self.parameters.get("StreamName") tags = self.parameters.get("Tags") diff --git a/moto/kinesis/urls.py b/moto/kinesis/urls.py index 33d6bf215..085b35b15 100644 --- a/moto/kinesis/urls.py +++ b/moto/kinesis/urls.py @@ -4,7 +4,6 @@ from .responses import KinesisResponse url_bases = [ # Need to avoid conflicting with kinesisvideo r"https?://kinesis\.(.+)\.amazonaws\.com", - r"https?://firehose\.(.+)\.amazonaws\.com", ] url_paths = {"{0}/$": KinesisResponse.dispatch} diff --git a/moto/logs/models.py b/moto/logs/models.py index 0d9135698..eb62cab7a 100644 --- a/moto/logs/models.py +++ b/moto/logs/models.py @@ -828,21 +828,31 @@ class LogsBackend(BaseBackend): def put_subscription_filter( self, log_group_name, filter_name, filter_pattern, destination_arn, role_arn ): - # TODO: support other destinations like Kinesis stream - from moto.awslambda import lambda_backends # due to circular dependency - log_group = self.groups.get(log_group_name) if not log_group: raise ResourceNotFoundException() - lambda_func = lambda_backends[self.region_name].get_function(destination_arn) + service = destination_arn.split(":")[2] + if service == "lambda": + from moto.awslambda import ( # pylint: disable=import-outside-toplevel + lambda_backends, + ) - # no specific permission check implemented - if not lambda_func: + lambda_func = lambda_backends[self.region_name].get_function( + destination_arn + ) + # no specific permission check implemented + if not lambda_func: + raise InvalidParameterException( + "Could not execute the lambda function. Make sure you " + "have given CloudWatch Logs permission to execute your " + "function." + ) + else: raise InvalidParameterException( - "Could not execute the lambda function. " - "Make sure you have given CloudWatch Logs permission to execute your function." + f"Service '{service}' has not implemented for " + f"put_subscription_filter()" ) log_group.put_subscription_filter( diff --git a/scripts/scaffold.py b/scripts/scaffold.py index b5952fea3..85546631f 100755 --- a/scripts/scaffold.py +++ b/scripts/scaffold.py @@ -261,11 +261,11 @@ def get_function_in_responses(service, operation, protocol): for input_name, input_type in inputs.items(): type_name = input_type.type_name if type_name == "integer": - arg_line_tmpl = ' {} = self._get_int_param("{}")\n' + arg_line_tmpl = ' {}=self._get_int_param("{}")\n' elif type_name == "list": - arg_line_tmpl = ' {} = self._get_list_prefix("{}.member")\n' + arg_line_tmpl = ' {}=self._get_list_prefix("{}.member")\n' else: - arg_line_tmpl = ' {} = self._get_param("{}")\n' + arg_line_tmpl = ' {}=self._get_param("{}")\n' body += arg_line_tmpl.format(to_snake_case(input_name), input_name) if output_names: body += " {} = self.{}_backend.{}(\n".format( diff --git a/scripts/template/lib/models.py.j2 b/scripts/template/lib/models.py.j2 index ce41fc823..0cdb84eb3 100644 --- a/scripts/template/lib/models.py.j2 +++ b/scripts/template/lib/models.py.j2 @@ -5,7 +5,6 @@ from moto.core import BaseBackend, BaseModel class {{ service_class }}Backend(BaseBackend): - """Implementation of {{ service_class }} APIs.""" def __init__(self, region_name=None): diff --git a/scripts/template/lib/responses.py.j2 b/scripts/template/lib/responses.py.j2 index 7aa279794..def142151 100644 --- a/scripts/template/lib/responses.py.j2 +++ b/scripts/template/lib/responses.py.j2 @@ -6,7 +6,6 @@ from .models import {{ escaped_service }}_backends class {{ service_class }}Response(BaseResponse): - """Handler for {{ service_class }} requests and responses.""" @property diff --git a/scripts/template/lib/urls.py.j2 b/scripts/template/lib/urls.py.j2 index 981a02812..1a189bfe7 100644 --- a/scripts/template/lib/urls.py.j2 +++ b/scripts/template/lib/urls.py.j2 @@ -2,7 +2,7 @@ from .responses import {{ service_class }}Response url_bases = [ - "https?://{{ endpoint_prefix }}.(.+).amazonaws.com", + r"https?://{{ endpoint_prefix }}\.(.+)\.amazonaws\.com", ] {% if api_protocol == 'rest-json' %} diff --git a/tests/test_firehose/__init__.py b/tests/test_firehose/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_firehose/test_firehose.py b/tests/test_firehose/test_firehose.py new file mode 100644 index 000000000..4733fd636 --- /dev/null +++ b/tests/test_firehose/test_firehose.py @@ -0,0 +1,495 @@ +"""Unit tests specific to basic Firehose Delivery Stream-related APIs.""" +from unittest import SkipTest +import warnings + +import boto3 +from botocore.exceptions import ClientError +import pytest + +from moto import mock_firehose +from moto import settings +from moto.core import ACCOUNT_ID +from moto.core.utils import get_random_hex +from moto.firehose.models import DeliveryStream + +TEST_REGION = "us-east-1" if settings.TEST_SERVER_MODE else "us-west-2" + + +def sample_s3_dest_config(): + """Return a simple extended s3 destination configuration.""" + return { + "RoleARN": f"arn:aws:iam::{ACCOUNT_ID}:role/firehose-test-role", + "BucketARN": "arn:aws:s3::firehosetestbucket", + } + + +@mock_firehose +def test_warnings(): + """Test features that raise a warning as they're unimplemented.""" + if settings.TEST_SERVER_MODE: + raise SkipTest("Can't capture warnings in server mode") + + client = boto3.client("firehose", region_name=TEST_REGION) + s3_dest_config = sample_s3_dest_config() + + # DeliveryStreamEncryption is not supported. + stream_name = f"test_warning_{get_random_hex(6)}" + with warnings.catch_warnings(record=True) as warn_msg: + client.create_delivery_stream( + DeliveryStreamName=stream_name, + ExtendedS3DestinationConfiguration=s3_dest_config, + DeliveryStreamEncryptionConfigurationInput={"KeyType": "AWS_OWNED_CMK"}, + ) + assert "server-side encryption enabled is not yet implemented" in str( + warn_msg[-1].message + ) + + # Can't create a delivery stream for Splunk as it's unimplemented. + stream_name = f"test_splunk_destination_{get_random_hex(6)}" + with warnings.catch_warnings(record=True) as warn_msg: + client.create_delivery_stream( + DeliveryStreamName=stream_name, + SplunkDestinationConfiguration={ + "HECEndpoint": "foo", + "HECEndpointType": "foo", + "HECToken": "foo", + "S3Configuration": {"RoleARN": "foo", "BucketARN": "foo"}, + }, + ) + assert "Splunk destination delivery stream is not yet implemented" in str( + warn_msg[-1].message + ) + + # Can't update a delivery stream to Splunk as it's unimplemented. + stream_name = f"test_update_splunk_destination_{get_random_hex(6)}" + client.create_delivery_stream( + DeliveryStreamName=stream_name, S3DestinationConfiguration=s3_dest_config, + ) + + with warnings.catch_warnings(record=True) as warn_msg: + client.update_destination( + DeliveryStreamName=stream_name, + CurrentDeliveryStreamVersionId="1", + DestinationId="destinationId-000000000001", + SplunkDestinationUpdate={ + "HECEndpoint": "foo", + "HECEndpointType": "foo", + "HECToken": "foo", + }, + ) + assert "Splunk destination delivery stream is not yet implemented" in str( + warn_msg[-1].message + ) + + +@mock_firehose +def test_create_delivery_stream_failures(): + """Test errors invoking create_delivery_stream().""" + client = boto3.client("firehose", region_name=TEST_REGION) + s3_dest_config = sample_s3_dest_config() + failure_name = f"test_failure_{get_random_hex(6)}" + + # Create too many streams. + for idx in range(DeliveryStream.MAX_STREAMS_PER_REGION): + client.create_delivery_stream( + DeliveryStreamName=f"{failure_name}_{idx}", + ExtendedS3DestinationConfiguration=s3_dest_config, + ) + + with pytest.raises(ClientError) as exc: + client.create_delivery_stream( + DeliveryStreamName=f"{failure_name}_99", + ExtendedS3DestinationConfiguration=s3_dest_config, + ) + err = exc.value.response["Error"] + assert err["Code"] == "LimitExceededException" + assert "You have already consumed your firehose quota" in err["Message"] + + # Free up the memory from the limits test. + for idx in range(DeliveryStream.MAX_STREAMS_PER_REGION): + client.delete_delivery_stream(DeliveryStreamName=f"{failure_name}_{idx}") + + # Create a stream with the same name as an existing stream. + client.create_delivery_stream( + DeliveryStreamName=f"{failure_name}", + ExtendedS3DestinationConfiguration=s3_dest_config, + ) + with pytest.raises(ClientError) as exc: + client.create_delivery_stream( + DeliveryStreamName=f"{failure_name}", + ExtendedS3DestinationConfiguration=s3_dest_config, + ) + err = exc.value.response["Error"] + assert err["Code"] == "ResourceInUseException" + assert ( + f"Firehose {failure_name} under accountId {ACCOUNT_ID} already exists" + in err["Message"] + ) + + # Only one destination configuration is allowed. + with pytest.raises(ClientError) as exc: + client.create_delivery_stream( + DeliveryStreamName=f"{failure_name}_2manyconfigs", + ExtendedS3DestinationConfiguration=s3_dest_config, + HttpEndpointDestinationConfiguration={ + "EndpointConfiguration": {"Url": "google.com"}, + "S3Configuration": {"RoleARN": "foo", "BucketARN": "foo"}, + }, + ) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidArgumentException" + assert ( + "Exactly one destination configuration is supported for a Firehose" + in err["Message"] + ) + + # Provide a Kinesis source configuration, but use DirectPut stream type. + with pytest.raises(ClientError) as exc: + client.create_delivery_stream( + DeliveryStreamName=f"{failure_name}_bad_source_type", + DeliveryStreamType="DirectPut", + KinesisStreamSourceConfiguration={ + "KinesisStreamARN": "kinesis_test_ds", + "RoleARN": "foo", + }, + ExtendedS3DestinationConfiguration=s3_dest_config, + ) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidArgumentException" + assert ( + "KinesisSourceStreamConfig is only applicable for " + "KinesisStreamAsSource stream type" in err["Message"] + ) + + +@mock_firehose +def test_delete_delivery_stream(): + """Test successful and failed invocations of delete_delivery_stream().""" + client = boto3.client("firehose", region_name=TEST_REGION) + s3_dest_config = sample_s3_dest_config() + stream_name = f"test_delete_{get_random_hex(6)}" + + # Create a couple of streams to test with. + for idx in range(5): + client.create_delivery_stream( + DeliveryStreamName=f"{stream_name}-{idx}", + S3DestinationConfiguration=s3_dest_config, + ) + + # Attempt to delete a non-existent stream. + with pytest.raises(ClientError) as exc: + client.delete_delivery_stream(DeliveryStreamName="foo") + err = exc.value.response["Error"] + assert err["Code"] == "ResourceNotFoundException" + assert f"Firehose foo under account {ACCOUNT_ID} not found." in err["Message"] + + # Delete one stream, verify the remaining exist. + client.delete_delivery_stream(DeliveryStreamName=f"{stream_name}-0") + hoses = client.list_delivery_streams() + assert len(hoses["DeliveryStreamNames"]) == 4 + expected_list = [f"{stream_name}-{x}" for x in range(1, 5)] + assert hoses["DeliveryStreamNames"] == expected_list + + # Delete all streams, verify there are no more streams. + for idx in range(1, 5): + client.delete_delivery_stream(DeliveryStreamName=f"{stream_name}-{idx}") + hoses = client.list_delivery_streams() + assert len(hoses["DeliveryStreamNames"]) == 0 + assert hoses["DeliveryStreamNames"] == [] + + +@mock_firehose +def test_describe_delivery_stream(): + """Test successful, failed invocations of describe_delivery_stream().""" + client = boto3.client("firehose", region_name=TEST_REGION) + s3_dest_config = sample_s3_dest_config() + stream_name = f"test_describe_{get_random_hex(6)}" + role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/testrole" + + # Create delivery stream with S3 destination, kinesis type and source + # for testing purposes. + client.create_delivery_stream( + DeliveryStreamName=stream_name, + DeliveryStreamType="KinesisStreamAsSource", + KinesisStreamSourceConfiguration={ + "KinesisStreamARN": f"arn:aws:kinesis:{TEST_REGION}:{ACCOUNT_ID}:stream/test-datastream", + "RoleARN": role_arn, + }, + S3DestinationConfiguration=s3_dest_config, + ) + + labels_with_kinesis_source = { + "DeliveryStreamName", + "DeliveryStreamARN", + "DeliveryStreamStatus", + "DeliveryStreamType", + "VersionId", + "CreateTimestamp", + "Source", + "Destinations", + "LastUpdateTimestamp", + "HasMoreDestinations", + } + + # Verify the created fields are as expected. + results = client.describe_delivery_stream(DeliveryStreamName=stream_name) + description = results["DeliveryStreamDescription"] + assert set(description.keys()) == labels_with_kinesis_source + assert description["DeliveryStreamName"] == stream_name + assert ( + description["DeliveryStreamARN"] + == f"arn:aws:firehose:{TEST_REGION}:{ACCOUNT_ID}:/delivery_stream/{stream_name}" + ) + assert description["DeliveryStreamStatus"] == "ACTIVE" + assert description["DeliveryStreamType"] == "KinesisStreamAsSource" + assert description["VersionId"] == "1" + assert ( + description["Source"]["KinesisStreamSourceDescription"]["RoleARN"] == role_arn + ) + assert len(description["Destinations"]) == 1 + assert set(description["Destinations"][0].keys()) == { + "S3DestinationDescription", + "DestinationId", + } + + # Update destination with ExtendedS3. + client.update_destination( + DeliveryStreamName=stream_name, + CurrentDeliveryStreamVersionId="1", + DestinationId="destinationId-000000000001", + ExtendedS3DestinationUpdate=s3_dest_config, + ) + + # Verify the created fields are as expected. There should be one + # destination with two destination types. The last update timestamp + # should be present and the version number updated. + results = client.describe_delivery_stream(DeliveryStreamName=stream_name) + description = results["DeliveryStreamDescription"] + assert set(description.keys()) == labels_with_kinesis_source | { + "LastUpdateTimestamp" + } + assert description["DeliveryStreamName"] == stream_name + assert ( + description["DeliveryStreamARN"] + == f"arn:aws:firehose:{TEST_REGION}:{ACCOUNT_ID}:/delivery_stream/{stream_name}" + ) + assert description["DeliveryStreamStatus"] == "ACTIVE" + assert description["DeliveryStreamType"] == "KinesisStreamAsSource" + assert description["VersionId"] == "2" + assert ( + description["Source"]["KinesisStreamSourceDescription"]["RoleARN"] == role_arn + ) + assert len(description["Destinations"]) == 1 + assert set(description["Destinations"][0].keys()) == { + "S3DestinationDescription", + "DestinationId", + "ExtendedS3DestinationDescription", + } + + # Update ExtendedS3 destination with a few different values. + client.update_destination( + DeliveryStreamName=stream_name, + CurrentDeliveryStreamVersionId="2", + DestinationId="destinationId-000000000001", + ExtendedS3DestinationUpdate={ + "RoleARN": role_arn, + "BucketARN": "arn:aws:s3:::testbucket", + # IntervalInSeconds increased from 300 to 700. + "BufferingHints": {"IntervalInSeconds": 700, "SizeInMBs": 5}, + }, + ) + results = client.describe_delivery_stream(DeliveryStreamName=stream_name) + description = results["DeliveryStreamDescription"] + assert description["VersionId"] == "3" + assert len(description["Destinations"]) == 1 + destination = description["Destinations"][0] + assert ( + destination["ExtendedS3DestinationDescription"]["BufferingHints"][ + "IntervalInSeconds" + ] + == 700 + ) + + # Verify S3 was added when ExtendedS3 was added. + assert set(destination.keys()) == { + "S3DestinationDescription", + "DestinationId", + "ExtendedS3DestinationDescription", + } + assert set(destination["S3DestinationDescription"].keys()) == { + "RoleARN", + "BucketARN", + "BufferingHints", + } + + # Update delivery stream from ExtendedS3 to S3. + client.update_destination( + DeliveryStreamName=stream_name, + CurrentDeliveryStreamVersionId="3", + DestinationId="destinationId-000000000001", + S3DestinationUpdate={ + "RoleARN": role_arn, + "BucketARN": "arn:aws:s3:::testbucket", + # IntervalInSeconds decreased from 700 to 500. + "BufferingHints": {"IntervalInSeconds": 500, "SizeInMBs": 5}, + }, + ) + results = client.describe_delivery_stream(DeliveryStreamName=stream_name) + description = results["DeliveryStreamDescription"] + assert description["VersionId"] == "4" + assert len(description["Destinations"]) == 1 + assert set(description["Destinations"][0].keys()) == { + "S3DestinationDescription", + "ExtendedS3DestinationDescription", + "DestinationId", + } + destination = description["Destinations"][0] + assert ( + destination["ExtendedS3DestinationDescription"]["BufferingHints"][ + "IntervalInSeconds" + ] + == 500 + ) + assert ( + destination["S3DestinationDescription"]["BufferingHints"]["IntervalInSeconds"] + == 500 + ) + + +@mock_firehose +def test_list_delivery_streams(): + """Test successful and failed invocations of list_delivery_streams().""" + client = boto3.client("firehose", region_name=TEST_REGION) + s3_dest_config = sample_s3_dest_config() + stream_name = f"test_list_{get_random_hex(6)}" + + # Create a couple of streams of both types to test with. + for idx in range(5): + client.create_delivery_stream( + DeliveryStreamName=f"{stream_name}-{idx}", + DeliveryStreamType="DirectPut", + S3DestinationConfiguration=s3_dest_config, + ) + for idx in range(5): + client.create_delivery_stream( + DeliveryStreamName=f"{stream_name}-{idx+5}", + DeliveryStreamType="KinesisStreamAsSource", + S3DestinationConfiguration=s3_dest_config, + ) + + # Verify limit works. + hoses = client.list_delivery_streams(Limit=1) + assert len(hoses["DeliveryStreamNames"]) == 1 + assert hoses["DeliveryStreamNames"] == [f"{stream_name}-0"] + assert hoses["HasMoreDeliveryStreams"] is True + + hoses = client.list_delivery_streams(Limit=10) + assert len(hoses["DeliveryStreamNames"]) == 10 + assert hoses["HasMoreDeliveryStreams"] is False + + # Verify delivery_stream_type works as a filter. + hoses = client.list_delivery_streams(DeliveryStreamType="DirectPut") + assert len(hoses["DeliveryStreamNames"]) == 5 + expected_directput_list = [f"{stream_name}-{x}" for x in range(5)] + assert hoses["DeliveryStreamNames"] == expected_directput_list + assert hoses["HasMoreDeliveryStreams"] is False + + hoses = client.list_delivery_streams(DeliveryStreamType="KinesisStreamAsSource") + assert len(hoses["DeliveryStreamNames"]) == 5 + expected_kinesis_stream_list = [f"{stream_name}-{x+5}" for x in range(5)] + assert hoses["DeliveryStreamNames"] == expected_kinesis_stream_list + assert hoses["HasMoreDeliveryStreams"] is False + + # Verify exclusive_start_delivery_stream_name returns truncated list. + hoses = client.list_delivery_streams( + ExclusiveStartDeliveryStreamName=f"{stream_name}-5" + ) + assert len(hoses["DeliveryStreamNames"]) == 4 + expected_stream_list = [f"{stream_name}-{x+5}" for x in range(1, 5)] + assert hoses["DeliveryStreamNames"] == expected_stream_list + assert hoses["HasMoreDeliveryStreams"] is False + + hoses = client.list_delivery_streams( + ExclusiveStartDeliveryStreamName=f"{stream_name}-9" + ) + assert len(hoses["DeliveryStreamNames"]) == 0 + assert hoses["HasMoreDeliveryStreams"] is False + + # boto3 ignores bad stream names for ExclusiveStartDeliveryStreamName. + hoses = client.list_delivery_streams(ExclusiveStartDeliveryStreamName="foo") + assert len(hoses["DeliveryStreamNames"]) == 10 + assert ( + hoses["DeliveryStreamNames"] + == expected_directput_list + expected_kinesis_stream_list + ) + assert hoses["HasMoreDeliveryStreams"] is False + + # Verify no parameters returns entire list. + client.list_delivery_streams() + assert len(hoses["DeliveryStreamNames"]) == 10 + assert ( + hoses["DeliveryStreamNames"] + == expected_directput_list + expected_kinesis_stream_list + ) + assert hoses["HasMoreDeliveryStreams"] is False + + +@mock_firehose +def test_update_destination(): + """Test successful, failed invocations of update_destination().""" + client = boto3.client("firehose", region_name=TEST_REGION) + s3_dest_config = sample_s3_dest_config() + + # Can't update a non-existent stream. + with pytest.raises(ClientError) as exc: + client.update_destination( + DeliveryStreamName="foo", + CurrentDeliveryStreamVersionId="1", + DestinationId="destinationId-000000000001", + ExtendedS3DestinationUpdate=s3_dest_config, + ) + err = exc.value.response["Error"] + assert err["Code"] == "ResourceNotFoundException" + assert f"Firehose foo under accountId {ACCOUNT_ID} not found" in err["Message"] + + # Create a delivery stream for testing purposes. + stream_name = f"test_update_{get_random_hex(6)}" + client.create_delivery_stream( + DeliveryStreamName=stream_name, + DeliveryStreamType="DirectPut", + S3DestinationConfiguration=s3_dest_config, + ) + + # Only one destination configuration is allowed. + with pytest.raises(ClientError) as exc: + client.update_destination( + DeliveryStreamName=stream_name, + CurrentDeliveryStreamVersionId="1", + DestinationId="destinationId-000000000001", + S3DestinationUpdate=s3_dest_config, + ExtendedS3DestinationUpdate=s3_dest_config, + ) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidArgumentException" + assert ( + "Exactly one destination configuration is supported for a Firehose" + in err["Message"] + ) + + # Can't update to/from http or ES. + with pytest.raises(ClientError) as exc: + client.update_destination( + DeliveryStreamName=stream_name, + CurrentDeliveryStreamVersionId="1", + DestinationId="destinationId-000000000001", + HttpEndpointDestinationUpdate={ + "EndpointConfiguration": {"Url": "https://google.com"}, + "RetryOptions": {"DurationInSeconds": 100}, + }, + ) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidArgumentException" + assert ( + "Changing the destination type to or from HttpEndpoint is not " + "supported at this time" + ) in err["Message"] diff --git a/tests/test_kinesis/test_firehose.py b/tests/test_firehose/test_firehose_destination_types.py similarity index 60% rename from tests/test_kinesis/test_firehose.py rename to tests/test_firehose/test_firehose_destination_types.py index 3bee09825..ac07d6994 100644 --- a/tests/test_kinesis/test_firehose.py +++ b/tests/test_firehose/test_firehose_destination_types.py @@ -1,22 +1,22 @@ -from __future__ import unicode_literals - -import datetime - -from botocore.exceptions import ClientError +"""Unit tests verifying various delivery stream destination content.""" import boto3 -import sure # noqa -from moto import mock_kinesis +from moto import mock_firehose +from moto import settings from moto.core import ACCOUNT_ID +from moto.core.utils import get_random_hex + +TEST_REGION = "us-east-1" if settings.TEST_SERVER_MODE else "us-west-2" -def create_s3_delivery_stream(client, stream_name): +def create_extended_s3_delivery_stream(client, stream_name): + """Return ARN of a delivery stream created with an S3 destination.""" return client.create_delivery_stream( DeliveryStreamName=stream_name, DeliveryStreamType="DirectPut", ExtendedS3DestinationConfiguration={ "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(ACCOUNT_ID), - "BucketARN": "arn:aws:s3:::kinesis-test", + "BucketARN": "arn:aws:s3:::firehose-test", "Prefix": "myFolder/", "CompressionFormat": "UNCOMPRESSED", "DataFormatConversionConfiguration": { @@ -38,6 +38,7 @@ def create_s3_delivery_stream(client, stream_name): def create_redshift_delivery_stream(client, stream_name): + """Return ARN of a delivery stream created with a Redshift destination.""" return client.create_delivery_stream( DeliveryStreamName=stream_name, RedshiftDestinationConfiguration={ @@ -53,7 +54,7 @@ def create_redshift_delivery_stream(client, stream_name): "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format( ACCOUNT_ID ), - "BucketARN": "arn:aws:s3:::kinesis-test", + "BucketARN": "arn:aws:s3:::firehose-test", "Prefix": "myFolder/", "BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124}, "CompressionFormat": "UNCOMPRESSED", @@ -63,12 +64,13 @@ def create_redshift_delivery_stream(client, stream_name): def create_elasticsearch_delivery_stream(client, stream_name): + """Return delivery stream ARN of an ElasticSearch destination.""" return client.create_delivery_stream( DeliveryStreamName=stream_name, DeliveryStreamType="DirectPut", ElasticsearchDestinationConfiguration={ "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(ACCOUNT_ID), - "DomainARN": "arn:aws:es:::domain/kinesis-test", + "DomainARN": "arn:aws:es:::domain/firehose-test", "IndexName": "myIndex", "TypeName": "UNCOMPRESSED", "IndexRotationPeriod": "NoRotation", @@ -78,7 +80,7 @@ def create_elasticsearch_delivery_stream(client, stream_name): "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format( ACCOUNT_ID ), - "BucketARN": "arn:aws:s3:::kinesis-test", + "BucketARN": "arn:aws:s3:::firehose-test", "Prefix": "myFolder/", "BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124}, "CompressionFormat": "UNCOMPRESSED", @@ -87,14 +89,39 @@ def create_elasticsearch_delivery_stream(client, stream_name): ) -@mock_kinesis -def test_create_redshift_delivery_stream(): - client = boto3.client("firehose", region_name="us-east-1") +def create_http_delivery_stream(client, stream_name): + """Return delivery stream ARN of an Http destination.""" + return client.create_delivery_stream( + DeliveryStreamName=stream_name, + DeliveryStreamType="DirectPut", + HttpEndpointDestinationConfiguration={ + "EndpointConfiguration": {"Url": "google.com"}, + "RetryOptions": {"DurationInSeconds": 100}, + "BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124}, + "CloudWatchLoggingOptions": {"Enabled": False}, + "S3Configuration": { + "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format( + ACCOUNT_ID + ), + "BucketARN": "arn:aws:s3:::firehose-test", + "Prefix": "myFolder/", + "BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124}, + "CompressionFormat": "UNCOMPRESSED", + }, + }, + ) - response = create_redshift_delivery_stream(client, "stream1") + +@mock_firehose +def test_create_redshift_delivery_stream(): + """Verify fields of a Redshift delivery stream.""" + client = boto3.client("firehose", region_name=TEST_REGION) + + stream_name = f"stream_{get_random_hex(6)}" + response = create_redshift_delivery_stream(client, stream_name) stream_arn = response["DeliveryStreamARN"] - response = client.describe_delivery_stream(DeliveryStreamName="stream1") + response = client.describe_delivery_stream(DeliveryStreamName=stream_name) stream_description = response["DeliveryStreamDescription"] # Sure and Freezegun don't play nicely together @@ -103,13 +130,14 @@ def test_create_redshift_delivery_stream(): stream_description.should.equal( { - "DeliveryStreamName": "stream1", + "DeliveryStreamName": stream_name, "DeliveryStreamARN": stream_arn, "DeliveryStreamStatus": "ACTIVE", - "VersionId": "string", + "DeliveryStreamType": "DirectPut", + "VersionId": "1", "Destinations": [ { - "DestinationId": "string", + "DestinationId": "destinationId-000000000001", "RedshiftDestinationDescription": { "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format( ACCOUNT_ID @@ -124,7 +152,7 @@ def test_create_redshift_delivery_stream(): "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format( ACCOUNT_ID ), - "BucketARN": "arn:aws:s3:::kinesis-test", + "BucketARN": "arn:aws:s3:::firehose-test", "Prefix": "myFolder/", "BufferingHints": { "SizeInMBs": 123, @@ -140,14 +168,16 @@ def test_create_redshift_delivery_stream(): ) -@mock_kinesis -def test_create_s3_delivery_stream(): - client = boto3.client("firehose", region_name="us-east-1") +@mock_firehose +def test_create_extended_s3_delivery_stream(): + """Verify fields of a S3 delivery stream.""" + client = boto3.client("firehose", region_name=TEST_REGION) - response = create_s3_delivery_stream(client, "stream1") + stream_name = f"stream_{get_random_hex(6)}" + response = create_extended_s3_delivery_stream(client, stream_name) stream_arn = response["DeliveryStreamARN"] - response = client.describe_delivery_stream(DeliveryStreamName="stream1") + response = client.describe_delivery_stream(DeliveryStreamName=stream_name) stream_description = response["DeliveryStreamDescription"] # Sure and Freezegun don't play nicely together @@ -156,18 +186,19 @@ def test_create_s3_delivery_stream(): stream_description.should.equal( { - "DeliveryStreamName": "stream1", + "DeliveryStreamName": stream_name, "DeliveryStreamARN": stream_arn, "DeliveryStreamStatus": "ACTIVE", - "VersionId": "string", + "DeliveryStreamType": "DirectPut", + "VersionId": "1", "Destinations": [ { - "DestinationId": "string", + "DestinationId": "destinationId-000000000001", "ExtendedS3DestinationDescription": { "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format( ACCOUNT_ID ), - "BucketARN": "arn:aws:s3:::kinesis-test", + "BucketARN": "arn:aws:s3:::firehose-test", "Prefix": "myFolder/", "CompressionFormat": "UNCOMPRESSED", "DataFormatConversionConfiguration": { @@ -181,7 +212,7 @@ def test_create_s3_delivery_stream(): } }, "SchemaConfiguration": { - "DatabaseName": "stream1", + "DatabaseName": stream_name, "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format( ACCOUNT_ID ), @@ -189,6 +220,14 @@ def test_create_s3_delivery_stream(): }, }, }, + "S3DestinationDescription": { + "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format( + ACCOUNT_ID + ), + "BucketARN": "arn:aws:s3:::firehose-test", + "Prefix": "myFolder/", + "CompressionFormat": "UNCOMPRESSED", + }, } ], "HasMoreDestinations": False, @@ -196,14 +235,16 @@ def test_create_s3_delivery_stream(): ) -@mock_kinesis +@mock_firehose def test_create_elasticsearch_delivery_stream(): - client = boto3.client("firehose", region_name="us-east-1") + """Verify fields of an Elasticsearch delivery stream.""" + client = boto3.client("firehose", region_name=TEST_REGION) - response = create_elasticsearch_delivery_stream(client, "stream1") + stream_name = f"stream_{get_random_hex(6)}" + response = create_elasticsearch_delivery_stream(client, stream_name) stream_arn = response["DeliveryStreamARN"] - response = client.describe_delivery_stream(DeliveryStreamName="stream1") + response = client.describe_delivery_stream(DeliveryStreamName=stream_name) stream_description = response["DeliveryStreamDescription"] # Sure and Freezegun don't play nicely together @@ -212,18 +253,19 @@ def test_create_elasticsearch_delivery_stream(): stream_description.should.equal( { - "DeliveryStreamName": "stream1", + "DeliveryStreamName": stream_name, "DeliveryStreamARN": stream_arn, "DeliveryStreamStatus": "ACTIVE", - "VersionId": "string", + "DeliveryStreamType": "DirectPut", + "VersionId": "1", "Destinations": [ { - "DestinationId": "string", + "DestinationId": "destinationId-000000000001", "ElasticsearchDestinationDescription": { "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format( ACCOUNT_ID ), - "DomainARN": "arn:aws:es:::domain/kinesis-test", + "DomainARN": "arn:aws:es:::domain/firehose-test", "IndexName": "myIndex", "TypeName": "UNCOMPRESSED", "IndexRotationPeriod": "NoRotation", @@ -233,7 +275,7 @@ def test_create_elasticsearch_delivery_stream(): "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format( ACCOUNT_ID ), - "BucketARN": "arn:aws:s3:::kinesis-test", + "BucketARN": "arn:aws:s3:::firehose-test", "Prefix": "myFolder/", "BufferingHints": { "SizeInMBs": 123, @@ -249,15 +291,17 @@ def test_create_elasticsearch_delivery_stream(): ) -@mock_kinesis -def test_create_stream_without_redshift(): - client = boto3.client("firehose", region_name="us-east-1") +@mock_firehose +def test_create_s3_delivery_stream(): + """Verify fields of an S3 delivery stream.""" + client = boto3.client("firehose", region_name=TEST_REGION) + stream_name = f"stream_{get_random_hex(6)}" response = client.create_delivery_stream( - DeliveryStreamName="stream1", + DeliveryStreamName=stream_name, S3DestinationConfiguration={ "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format(ACCOUNT_ID), - "BucketARN": "arn:aws:s3:::kinesis-test", + "BucketARN": "arn:aws:s3:::firehose-test", "Prefix": "myFolder/", "BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124}, "CompressionFormat": "UNCOMPRESSED", @@ -265,7 +309,7 @@ def test_create_stream_without_redshift(): ) stream_arn = response["DeliveryStreamARN"] - response = client.describe_delivery_stream(DeliveryStreamName="stream1") + response = client.describe_delivery_stream(DeliveryStreamName=stream_name) stream_description = response["DeliveryStreamDescription"] # Sure and Freezegun don't play nicely together @@ -274,21 +318,19 @@ def test_create_stream_without_redshift(): stream_description.should.equal( { - "DeliveryStreamName": "stream1", + "DeliveryStreamName": stream_name, "DeliveryStreamARN": stream_arn, "DeliveryStreamStatus": "ACTIVE", - "VersionId": "string", + "DeliveryStreamType": "DirectPut", + "VersionId": "1", "Destinations": [ { - "DestinationId": "string", + "DestinationId": "destinationId-000000000001", "S3DestinationDescription": { "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format( ACCOUNT_ID ), - "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format( - ACCOUNT_ID - ), - "BucketARN": "arn:aws:s3:::kinesis-test", + "BucketARN": "arn:aws:s3:::firehose-test", "Prefix": "myFolder/", "BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124}, "CompressionFormat": "UNCOMPRESSED", @@ -300,47 +342,52 @@ def test_create_stream_without_redshift(): ) -@mock_kinesis -def test_deescribe_non_existent_stream(): - client = boto3.client("firehose", region_name="us-east-1") +@mock_firehose +def test_create_http_stream(): + """Verify fields of a HTTP delivery stream.""" + client = boto3.client("firehose", region_name=TEST_REGION) - client.describe_delivery_stream.when.called_with( - DeliveryStreamName="not-a-stream" - ).should.throw(ClientError) + stream_name = f"stream_{get_random_hex(6)}" + response = create_http_delivery_stream(client, stream_name) + stream_arn = response["DeliveryStreamARN"] + response = client.describe_delivery_stream(DeliveryStreamName=stream_name) + stream_description = response["DeliveryStreamDescription"] -@mock_kinesis -def test_list_and_delete_stream(): - client = boto3.client("firehose", region_name="us-east-1") + # Sure and Freezegun don't play nicely together + _ = stream_description.pop("CreateTimestamp") + _ = stream_description.pop("LastUpdateTimestamp") - create_redshift_delivery_stream(client, "stream1") - create_redshift_delivery_stream(client, "stream2") - - set(client.list_delivery_streams()["DeliveryStreamNames"]).should.equal( - set(["stream1", "stream2"]) - ) - - client.delete_delivery_stream(DeliveryStreamName="stream1") - - set(client.list_delivery_streams()["DeliveryStreamNames"]).should.equal( - set(["stream2"]) - ) - - -@mock_kinesis -def test_put_record(): - client = boto3.client("firehose", region_name="us-east-1") - - create_redshift_delivery_stream(client, "stream1") - client.put_record(DeliveryStreamName="stream1", Record={"Data": "some data"}) - - -@mock_kinesis -def test_put_record_batch(): - client = boto3.client("firehose", region_name="us-east-1") - - create_redshift_delivery_stream(client, "stream1") - client.put_record_batch( - DeliveryStreamName="stream1", - Records=[{"Data": "some data1"}, {"Data": "some data2"}], + stream_description.should.equal( + { + "DeliveryStreamName": stream_name, + "DeliveryStreamARN": stream_arn, + "DeliveryStreamStatus": "ACTIVE", + "DeliveryStreamType": "DirectPut", + "VersionId": "1", + "Destinations": [ + { + "DestinationId": "destinationId-000000000001", + "HttpEndpointDestinationDescription": { + "EndpointConfiguration": {"Url": "google.com"}, + "RetryOptions": {"DurationInSeconds": 100}, + "BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124}, + "CloudWatchLoggingOptions": {"Enabled": False}, + "S3DestinationDescription": { + "RoleARN": "arn:aws:iam::{}:role/firehose_delivery_role".format( + ACCOUNT_ID + ), + "BucketARN": "arn:aws:s3:::firehose-test", + "Prefix": "myFolder/", + "BufferingHints": { + "SizeInMBs": 123, + "IntervalInSeconds": 124, + }, + "CompressionFormat": "UNCOMPRESSED", + }, + }, + } + ], + "HasMoreDestinations": False, + } ) diff --git a/tests/test_firehose/test_firehose_put.py b/tests/test_firehose/test_firehose_put.py new file mode 100644 index 000000000..4eeab28d5 --- /dev/null +++ b/tests/test_firehose/test_firehose_put.py @@ -0,0 +1,148 @@ +"""Unit tests verifying put-related delivery stream APIs.""" +import boto3 +import sure # noqa pylint: disable=unused-import + +from moto import mock_firehose +from moto import mock_s3 +from moto.core import ACCOUNT_ID +from moto.core.utils import get_random_hex +from tests.test_firehose.test_firehose import TEST_REGION +from tests.test_firehose.test_firehose import sample_s3_dest_config +from tests.test_firehose.test_firehose_destination_types import ( + create_redshift_delivery_stream, +) + +S3_LOCATION_CONSTRAINT = "us-west-1" + + +@mock_firehose +def test_put_record_redshift_destination(): + """Test invocations of put_record() to a Redshift destination. + + At the moment, for Redshift or Elasticsearch destinations, the data + is just thrown away + """ + client = boto3.client("firehose", region_name=TEST_REGION) + + stream_name = f"test_put_record_{get_random_hex(6)}" + create_redshift_delivery_stream(client, stream_name) + result = client.put_record( + DeliveryStreamName=stream_name, Record={"Data": "some test data"} + ) + assert set(result.keys()) == {"RecordId", "Encrypted", "ResponseMetadata"} + + +@mock_firehose +def test_put_record_batch_redshift_destination(): + """Test invocations of put_record_batch() to a Redshift destination. + + At the moment, for Redshift or Elasticsearch destinations, the data + is just thrown away + """ + client = boto3.client("firehose", region_name=TEST_REGION) + + stream_name = f"test_put_record_{get_random_hex(6)}" + create_redshift_delivery_stream(client, stream_name) + records = [{"Data": "one"}, {"Data": "two"}, {"Data": "three"}] + result = client.put_record_batch(DeliveryStreamName=stream_name, Records=records) + assert set(result.keys()) == { + "FailedPutCount", + "Encrypted", + "RequestResponses", + "ResponseMetadata", + } + assert result["FailedPutCount"] == 0 + assert result["Encrypted"] is False + for response in result["RequestResponses"]: + assert set(response.keys()) == {"RecordId"} + + +@mock_firehose +def test_put_record_http_destination(): + """Test invocations of put_record() to a Http destination.""" + client = boto3.client("firehose", region_name=TEST_REGION) + s3_dest_config = sample_s3_dest_config() + + stream_name = f"test_put_record_{get_random_hex(6)}" + client.create_delivery_stream( + DeliveryStreamName=stream_name, + HttpEndpointDestinationConfiguration={ + "EndpointConfiguration": {"Url": "https://google.com"}, + "S3Configuration": s3_dest_config, + }, + ) + result = client.put_record( + DeliveryStreamName=stream_name, Record={"Data": "some test data"} + ) + assert set(result.keys()) == {"RecordId", "Encrypted", "ResponseMetadata"} + + +@mock_firehose +def test_put_record_batch_http_destination(): + """Test invocations of put_record_batch() to a Http destination.""" + client = boto3.client("firehose", region_name=TEST_REGION) + s3_dest_config = sample_s3_dest_config() + + stream_name = f"test_put_record_{get_random_hex(6)}" + client.create_delivery_stream( + DeliveryStreamName=stream_name, + HttpEndpointDestinationConfiguration={ + "EndpointConfiguration": {"Url": "https://google.com"}, + "S3Configuration": s3_dest_config, + }, + ) + records = [{"Data": "one"}, {"Data": "two"}, {"Data": "three"}] + result = client.put_record_batch(DeliveryStreamName=stream_name, Records=records) + assert set(result.keys()) == { + "FailedPutCount", + "Encrypted", + "RequestResponses", + "ResponseMetadata", + } + assert result["FailedPutCount"] == 0 + assert result["Encrypted"] is False + for response in result["RequestResponses"]: + assert set(response.keys()) == {"RecordId"} + + +@mock_s3 +@mock_firehose +def test_put_record_batch_extended_s3_destination(): + """Test invocations of put_record_batch() to a S3 destination.""" + client = boto3.client("firehose", region_name=TEST_REGION) + + # Create a S3 bucket. + bucket_name = "firehosetestbucket" + s3_client = boto3.client("s3", region_name=TEST_REGION) + s3_client.create_bucket( + Bucket=bucket_name, + CreateBucketConfiguration={"LocationConstraint": S3_LOCATION_CONSTRAINT}, + ) + + stream_name = f"test_put_record_{get_random_hex(6)}" + client.create_delivery_stream( + DeliveryStreamName=stream_name, + ExtendedS3DestinationConfiguration={ + "RoleARN": f"arn:aws:iam::{ACCOUNT_ID}:role/firehose-test-role", + "BucketARN": f"arn:aws:s3::{bucket_name}", + }, + ) + records = [{"Data": "one"}, {"Data": "two"}, {"Data": "three"}] + result = client.put_record_batch(DeliveryStreamName=stream_name, Records=records) + assert set(result.keys()) == { + "FailedPutCount", + "Encrypted", + "RequestResponses", + "ResponseMetadata", + } + assert result["FailedPutCount"] == 0 + assert result["Encrypted"] is False + for response in result["RequestResponses"]: + assert set(response.keys()) == {"RecordId"} + + # Pull data from S3 bucket. + bucket_objects = s3_client.list_objects_v2(Bucket=bucket_name) + response = s3_client.get_object( + Bucket=bucket_name, Key=bucket_objects["Contents"][0]["Key"] + ) + assert response["Body"].read() == b"onetwothree" diff --git a/tests/test_firehose/test_firehose_tags.py b/tests/test_firehose/test_firehose_tags.py new file mode 100644 index 000000000..76172952d --- /dev/null +++ b/tests/test_firehose/test_firehose_tags.py @@ -0,0 +1,158 @@ +"""Unit tests verifying tag-related delivery stream APIs.""" +import boto3 +from botocore.exceptions import ClientError +import pytest + +from moto import mock_firehose +from moto.core import ACCOUNT_ID +from moto.core.utils import get_random_hex +from moto.firehose.models import MAX_TAGS_PER_DELIVERY_STREAM +from tests.test_firehose.test_firehose import TEST_REGION +from tests.test_firehose.test_firehose import sample_s3_dest_config + + +@mock_firehose +def test_list_tags_for_delivery_stream(): + """Test invocations of list_tags_for_delivery_stream().""" + client = boto3.client("firehose", region_name=TEST_REGION) + stream_name = f"test_list_tags_{get_random_hex(6)}" + + number_of_tags = 50 + tags = [{"Key": f"{x}_k", "Value": f"{x}_v"} for x in range(1, number_of_tags + 1)] + + # Create a delivery stream to work with. + client.create_delivery_stream( + DeliveryStreamName=stream_name, + S3DestinationConfiguration=sample_s3_dest_config(), + Tags=tags, + ) + + # Verify limit works. + result = client.list_tags_for_delivery_stream( + DeliveryStreamName=stream_name, Limit=1 + ) + assert len(result["Tags"]) == 1 + assert result["Tags"] == [{"Key": "1_k", "Value": "1_v"}] + assert result["HasMoreTags"] is True + + result = client.list_tags_for_delivery_stream( + DeliveryStreamName=stream_name, Limit=number_of_tags + ) + assert len(result["Tags"]) == number_of_tags + assert result["HasMoreTags"] is False + + # Verify exclusive_start_tag_key returns truncated list. + result = client.list_tags_for_delivery_stream( + DeliveryStreamName=stream_name, ExclusiveStartTagKey="30_k" + ) + assert len(result["Tags"]) == number_of_tags - 30 + expected_tags = [ + {"Key": f"{x}_k", "Value": f"{x}_v"} for x in range(31, number_of_tags + 1) + ] + assert result["Tags"] == expected_tags + assert result["HasMoreTags"] is False + + result = client.list_tags_for_delivery_stream( + DeliveryStreamName=stream_name, ExclusiveStartTagKey=f"{number_of_tags}_k" + ) + assert len(result["Tags"]) == 0 + assert result["HasMoreTags"] is False + + # boto3 ignores bad stream names for ExclusiveStartTagKey. + result = client.list_tags_for_delivery_stream( + DeliveryStreamName=stream_name, ExclusiveStartTagKey="foo" + ) + assert len(result["Tags"]) == number_of_tags + assert result["Tags"] == tags + assert result["HasMoreTags"] is False + + # Verify no parameters returns entire list. + client.list_tags_for_delivery_stream(DeliveryStreamName=stream_name) + assert len(result["Tags"]) == number_of_tags + assert result["Tags"] == tags + assert result["HasMoreTags"] is False + + +@mock_firehose +def test_tag_delivery_stream(): + """Test successful, failed invocations of tag_delivery_stream().""" + client = boto3.client("firehose", region_name=TEST_REGION) + + # Create a delivery stream for testing purposes. + stream_name = f"test_tags_{get_random_hex(6)}" + client.create_delivery_stream( + DeliveryStreamName=stream_name, + ExtendedS3DestinationConfiguration=sample_s3_dest_config(), + ) + + # Unknown stream name. + unknown_name = "foo" + with pytest.raises(ClientError) as exc: + client.tag_delivery_stream( + DeliveryStreamName=unknown_name, Tags=[{"Key": "foo", "Value": "bar"}] + ) + err = exc.value.response["Error"] + assert err["Code"] == "ResourceNotFoundException" + assert ( + f"Firehose {unknown_name} under account {ACCOUNT_ID} not found" + in err["Message"] + ) + + # Too many tags. + with pytest.raises(ClientError) as exc: + client.tag_delivery_stream( + DeliveryStreamName=stream_name, + Tags=[{"Key": f"{x}", "Value": f"{x}"} for x in range(51)], + ) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert ( + f"failed to satisify contstraint: Member must have length " + f"less than or equal to {MAX_TAGS_PER_DELIVERY_STREAM}" + ) in err["Message"] + + # Bad tags. + with pytest.raises(ClientError) as exc: + client.tag_delivery_stream( + DeliveryStreamName=stream_name, Tags=[{"Key": "foo!", "Value": "bar"}], + ) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert ( + "1 validation error detected: Value 'foo!' at 'tags.1.member.key' " + "failed to satisfy constraint: Member must satisfy regular " + "expression pattern" + ) in err["Message"] + + # Successful addition of tags. + added_tags = [{"Key": f"{x}", "Value": f"{x}"} for x in range(10)] + client.tag_delivery_stream(DeliveryStreamName=stream_name, Tags=added_tags) + results = client.list_tags_for_delivery_stream(DeliveryStreamName=stream_name) + assert len(results["Tags"]) == 10 + assert results["Tags"] == added_tags + + +@mock_firehose +def test_untag_delivery_stream(): + """Test successful, failed invocations of untag_delivery_stream().""" + client = boto3.client("firehose", region_name=TEST_REGION) + + # Create a delivery stream for testing purposes. + stream_name = f"test_untag_{get_random_hex(6)}" + tag_list = [ + {"Key": "one", "Value": "1"}, + {"Key": "two", "Value": "2"}, + {"Key": "three", "Value": "3"}, + ] + client.create_delivery_stream( + DeliveryStreamName=stream_name, + ExtendedS3DestinationConfiguration=sample_s3_dest_config(), + Tags=tag_list, + ) + + # Untag all of the tags. Verify there are no more tags. + tag_keys = [x["Key"] for x in tag_list] + client.untag_delivery_stream(DeliveryStreamName=stream_name, TagKeys=tag_keys) + results = client.list_tags_for_delivery_stream(DeliveryStreamName=stream_name) + assert not results["Tags"] + assert not results["HasMoreTags"]