diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index f05450858..ce2a77809 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -2986,7 +2986,7 @@ ## firehose
-83% implemented +100% implemented - [X] create_delivery_stream - [X] delete_delivery_stream @@ -2995,8 +2995,8 @@ - [X] list_tags_for_delivery_stream - [X] put_record - [X] put_record_batch -- [ ] start_delivery_stream_encryption -- [ ] stop_delivery_stream_encryption +- [X] start_delivery_stream_encryption +- [X] stop_delivery_stream_encryption - [X] tag_delivery_stream - [X] untag_delivery_stream - [X] update_destination diff --git a/docs/docs/services/firehose.rst b/docs/docs/services/firehose.rst index b5c62e5d4..c472063d9 100644 --- a/docs/docs/services/firehose.rst +++ b/docs/docs/services/firehose.rst @@ -56,8 +56,8 @@ firehose - [X] put_record_batch Write multiple data records into a Kinesis Data firehose stream. -- [ ] start_delivery_stream_encryption -- [ ] stop_delivery_stream_encryption +- [X] start_delivery_stream_encryption +- [X] stop_delivery_stream_encryption - [X] tag_delivery_stream Add/update tags for specified delivery stream. @@ -65,6 +65,4 @@ firehose Removes tags from specified delivery stream. - [X] update_destination - Updates specified destination of specified delivery stream. - diff --git a/moto/firehose/models.py b/moto/firehose/models.py index e6410dd82..f47af5774 100644 --- a/moto/firehose/models.py +++ b/moto/firehose/models.py @@ -1,10 +1,6 @@ """FirehoseBackend class with methods for supported APIs. Incomplete list of unfinished items: - - The create_delivery_stream() argument - DeliveryStreamEncryptionConfigurationInput is not supported. - - The S3BackupMode argument is ignored as are most of the other - destination arguments. - Data record size and number of transactions are ignored. - Better validation of delivery destination parameters, e.g., validation of the url for an http endpoint (boto3 does this). @@ -116,12 +112,20 @@ class DeliveryStream( MAX_STREAMS_PER_REGION = 50 + ALTERNATIVE_FIELD_NAMES = [ + ("S3Configuration", "S3DestinationDescription"), + ("S3Update", "S3DestinationDescription"), + ("S3BackupConfiguration", "S3BackupDescription"), + ("S3BackupUpdate", "S3BackupDescription"), + ] + def __init__( self, account_id: str, region: str, delivery_stream_name: str, delivery_stream_type: str, + encryption: Dict[str, Any], kinesis_stream_source_configuration: Dict[str, Any], destination_name: str, destination_config: Dict[str, Any], @@ -131,6 +135,7 @@ class DeliveryStream( self.delivery_stream_type = ( delivery_stream_type if delivery_stream_type else "DirectPut" ) + self.delivery_stream_encryption_configuration = encryption self.source = kinesis_stream_source_configuration self.destinations: List[Dict[str, Any]] = [ @@ -139,18 +144,19 @@ class DeliveryStream( destination_name: destination_config, } ] + if destination_name == "ExtendedS3": # Add a S3 destination as well, minus a few ExtendedS3 fields. self.destinations[0]["S3"] = create_s3_destination_config( destination_config ) - elif "S3Configuration" in destination_config: - # S3Configuration becomes S3DestinationDescription for the - # other destinations. - self.destinations[0][destination_name][ - "S3DestinationDescription" - ] = destination_config["S3Configuration"] - del self.destinations[0][destination_name]["S3Configuration"] + + # S3Configuration becomes S3DestinationDescription for the + # other destinations. Same for S3Backup + for old, new in DeliveryStream.ALTERNATIVE_FIELD_NAMES: + if old in destination_config: + self.destinations[0][destination_name][new] = destination_config[old] + del self.destinations[0][destination_name][old] self.delivery_stream_status = "ACTIVE" self.delivery_stream_arn = f"arn:aws:firehose:{region}:{account_id}:deliverystream/{delivery_stream_name}" @@ -213,12 +219,6 @@ class FirehoseBackend(BaseBackend): ) # Rule out situations that are not yet implemented. - if delivery_stream_encryption_configuration_input: - warnings.warn( - "A delivery stream with server-side encryption enabled is not " - "yet implemented" - ) - if destination_name == "Splunk": warnings.warn("A Splunk destination delivery stream is not yet implemented") @@ -247,13 +247,14 @@ class FirehoseBackend(BaseBackend): # by delivery stream name. This instance will update the state and # create the ARN. delivery_stream = DeliveryStream( - self.account_id, - region, - delivery_stream_name, - delivery_stream_type, - kinesis_stream_source_configuration, - destination_name, - destination_config, + account_id=self.account_id, + region=region, + delivery_stream_name=delivery_stream_name, + delivery_stream_type=delivery_stream_type, + encryption=delivery_stream_encryption_configuration_input, + kinesis_stream_source_configuration=kinesis_stream_source_configuration, + destination_name=destination_name, + destination_config=destination_config, ) self.tagger.tag_resource(delivery_stream.delivery_stream_arn, tags or []) @@ -562,6 +563,27 @@ class FirehoseBackend(BaseBackend): delivery_stream.delivery_stream_arn, tag_keys ) + def start_delivery_stream_encryption( + self, stream_name: str, encryption_config: Dict[str, Any] + ) -> None: + delivery_stream = self.delivery_streams.get(stream_name) + if not delivery_stream: + raise ResourceNotFoundException( + f"Firehose {stream_name} under account {self.account_id} not found." + ) + + delivery_stream.delivery_stream_encryption_configuration = encryption_config + delivery_stream.delivery_stream_encryption_configuration["Status"] = "ENABLED" + + def stop_delivery_stream_encryption(self, stream_name: str) -> None: + delivery_stream = self.delivery_streams.get(stream_name) + if not delivery_stream: + raise ResourceNotFoundException( + f"Firehose {stream_name} under account {self.account_id} not found." + ) + + delivery_stream.delivery_stream_encryption_configuration["Status"] = "DISABLED" + def update_destination( # pylint: disable=unused-argument self, delivery_stream_name: str, @@ -575,10 +597,7 @@ class FirehoseBackend(BaseBackend): splunk_destination_update: Dict[str, Any], http_endpoint_destination_update: Dict[str, Any], ) -> None: - """Updates specified destination of specified delivery stream.""" - (destination_name, destination_config) = find_destination_config_in_args( - locals() - ) + (dest_name, dest_config) = find_destination_config_in_args(locals()) delivery_stream = self.delivery_streams.get(delivery_stream_name) if not delivery_stream: @@ -586,7 +605,7 @@ class FirehoseBackend(BaseBackend): f"Firehose {delivery_stream_name} under accountId {self.account_id} not found." ) - if destination_name == "Splunk": + if dest_name == "Splunk": warnings.warn("A Splunk destination delivery stream is not yet implemented") if delivery_stream.version_id != current_delivery_stream_version_id: @@ -609,35 +628,42 @@ class FirehoseBackend(BaseBackend): # Switching between Amazon ES and other services is not supported. # For an Amazon ES destination, you can only update to another Amazon # ES destination. Same with HTTP. Didn't test Splunk. - if ( - destination_name == "Elasticsearch" and "Elasticsearch" not in destination - ) or (destination_name == "HttpEndpoint" and "HttpEndpoint" not in destination): + if (dest_name == "Elasticsearch" and "Elasticsearch" not in destination) or ( + dest_name == "HttpEndpoint" and "HttpEndpoint" not in destination + ): raise InvalidArgumentException( - f"Changing the destination type to or from {destination_name} " + f"Changing the destination type to or from {dest_name} " f"is not supported at this time." ) # If this is a different type of destination configuration, # the existing configuration is reset first. - if destination_name in destination: - delivery_stream.destinations[destination_idx][destination_name].update( - destination_config - ) + if dest_name in destination: + delivery_stream.destinations[destination_idx][dest_name].update(dest_config) else: delivery_stream.destinations[destination_idx] = { "destination_id": destination_id, - destination_name: destination_config, + dest_name: dest_config, } + # Some names in the Update-request differ from the names used by Describe + for old, new in DeliveryStream.ALTERNATIVE_FIELD_NAMES: + if old in dest_config: + if new not in delivery_stream.destinations[destination_idx][dest_name]: + delivery_stream.destinations[destination_idx][dest_name][new] = {} + delivery_stream.destinations[destination_idx][dest_name][new].update( + dest_config[old] + ) + # Once S3 is updated to an ExtendedS3 destination, both remain in # the destination. That means when one is updated, the other needs # to be updated as well. The problem is that they don't have the # same fields. - if destination_name == "ExtendedS3": + if dest_name == "ExtendedS3": delivery_stream.destinations[destination_idx][ "S3" - ] = create_s3_destination_config(destination_config) - elif destination_name == "S3" and "ExtendedS3" in destination: + ] = create_s3_destination_config(dest_config) + elif dest_name == "S3" and "ExtendedS3" in destination: destination["ExtendedS3"] = { k: v for k, v in destination["S3"].items() diff --git a/moto/firehose/responses.py b/moto/firehose/responses.py index 2f858692d..c36125fd5 100644 --- a/moto/firehose/responses.py +++ b/moto/firehose/responses.py @@ -109,3 +109,18 @@ class FirehoseResponse(BaseResponse): self._get_param("HttpEndpointDestinationUpdate"), ) return json.dumps({}) + + def start_delivery_stream_encryption(self) -> str: + stream_name = self._get_param("DeliveryStreamName") + encryption_config = self._get_param( + "DeliveryStreamEncryptionConfigurationInput" + ) + self.firehose_backend.start_delivery_stream_encryption( + stream_name, encryption_config + ) + return "{}" + + def stop_delivery_stream_encryption(self) -> str: + stream_name = self._get_param("DeliveryStreamName") + self.firehose_backend.stop_delivery_stream_encryption(stream_name) + return "{}" diff --git a/moto/glue/responses.py b/moto/glue/responses.py index 50c009eab..7439dbbd7 100644 --- a/moto/glue/responses.py +++ b/moto/glue/responses.py @@ -514,3 +514,6 @@ class GlueResponse(BaseResponse): "CrawlersNotFound": crawlers_not_found, } ) + + def get_partition_indexes(self): + return json.dumps({"PartitionIndexDescriptorList": []}) diff --git a/tests/terraformtests/terraform-tests.success.txt b/tests/terraformtests/terraform-tests.success.txt index 10a5a3029..dfea7c1e4 100644 --- a/tests/terraformtests/terraform-tests.success.txt +++ b/tests/terraformtests/terraform-tests.success.txt @@ -265,6 +265,22 @@ events: - TestAccEventsConnection - TestAccEventsConnectionDataSource - TestAccEventsPermission +firehose: + - TestAccFirehoseDeliveryStreamDataSource_basic + - TestAccFirehoseDeliveryStream_basic + - TestAccFirehoseDeliveryStream_missingProcessing + - TestAccFirehoseDeliveryStream_HTTPEndpoint_retryDuration + - TestAccFirehoseDeliveryStream_ExtendedS3_ + - TestAccFirehoseDeliveryStream_extendedS3DynamicPartitioningUpdate + - TestAccFirehoseDeliveryStream_extendedS3DynamicPartitioning + - TestAccFirehoseDeliveryStream_extendedS3KMSKeyARN + - TestAccFirehoseDeliveryStream_ExtendedS3Processing_empty + - TestAccFirehoseDeliveryStream_extendedS3Updates + - TestAccFirehoseDeliveryStream_s3 + - TestAccFirehoseDeliveryStream_HTTP + - TestAccFirehoseDeliveryStream_http + - TestAccFirehoseDeliveryStream_splunk + - TestAccFirehoseDeliveryStream_Splunk glue: - TestAccGlueSchema_ guardduty: diff --git a/tests/test_firehose/test_firehose.py b/tests/test_firehose/test_firehose.py index 223d9baeb..9ce7ce7d1 100644 --- a/tests/test_firehose/test_firehose.py +++ b/tests/test_firehose/test_firehose.py @@ -32,18 +32,6 @@ def test_warnings(): client = boto3.client("firehose", region_name=TEST_REGION) s3_dest_config = sample_s3_dest_config() - # DeliveryStreamEncryption is not supported. - stream_name = f"test_warning_{mock_random.get_random_hex(6)}" - with warnings.catch_warnings(record=True) as warn_msg: - client.create_delivery_stream( - DeliveryStreamName=stream_name, - ExtendedS3DestinationConfiguration=s3_dest_config, - DeliveryStreamEncryptionConfigurationInput={"KeyType": "AWS_OWNED_CMK"}, - ) - assert "server-side encryption enabled is not yet implemented" in str( - warn_msg[-1].message - ) - # Can't create a delivery stream for Splunk as it's unimplemented. stream_name = f"test_splunk_destination_{mock_random.get_random_hex(6)}" with warnings.catch_warnings(record=True) as warn_msg: diff --git a/tests/test_firehose/test_firehose_destination_types.py b/tests/test_firehose/test_firehose_destination_types.py index 8390db197..5f135e32c 100644 --- a/tests/test_firehose/test_firehose_destination_types.py +++ b/tests/test_firehose/test_firehose_destination_types.py @@ -317,52 +317,3 @@ def test_create_s3_delivery_stream(): "HasMoreDestinations": False, } ) - - -@mock_firehose -def test_create_http_stream(): - """Verify fields of a HTTP delivery stream.""" - client = boto3.client("firehose", region_name=TEST_REGION) - - stream_name = f"stream_{mock_random.get_random_hex(6)}" - response = create_http_delivery_stream(client, stream_name) - stream_arn = response["DeliveryStreamARN"] - - response = client.describe_delivery_stream(DeliveryStreamName=stream_name) - stream_description = response["DeliveryStreamDescription"] - - # Sure and Freezegun don't play nicely together - _ = stream_description.pop("CreateTimestamp") - _ = stream_description.pop("LastUpdateTimestamp") - - stream_description.should.equal( - { - "DeliveryStreamName": stream_name, - "DeliveryStreamARN": stream_arn, - "DeliveryStreamStatus": "ACTIVE", - "DeliveryStreamType": "DirectPut", - "VersionId": "1", - "Destinations": [ - { - "DestinationId": "destinationId-000000000001", - "HttpEndpointDestinationDescription": { - "EndpointConfiguration": {"Url": "google.com"}, - "RetryOptions": {"DurationInSeconds": 100}, - "BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124}, - "CloudWatchLoggingOptions": {"Enabled": False}, - "S3DestinationDescription": { - "RoleARN": f"arn:aws:iam::{ACCOUNT_ID}:role/firehose_delivery_role", - "BucketARN": "arn:aws:s3:::firehose-test", - "Prefix": "myFolder/", - "BufferingHints": { - "SizeInMBs": 123, - "IntervalInSeconds": 124, - }, - "CompressionFormat": "UNCOMPRESSED", - }, - }, - } - ], - "HasMoreDestinations": False, - } - ) diff --git a/tests/test_firehose/test_firehose_encryption.py b/tests/test_firehose/test_firehose_encryption.py new file mode 100644 index 000000000..a7510e691 --- /dev/null +++ b/tests/test_firehose/test_firehose_encryption.py @@ -0,0 +1,89 @@ +import boto3 +import pytest + +from botocore.exceptions import ClientError +from moto import mock_firehose +from uuid import uuid4 +from .test_firehose import sample_s3_dest_config + + +@mock_firehose +def test_firehose_without_encryption(): + client = boto3.client("firehose", region_name="us-east-2") + name = str(uuid4())[0:6] + client.create_delivery_stream( + DeliveryStreamName=name, + ExtendedS3DestinationConfiguration=sample_s3_dest_config(), + ) + + resp = client.describe_delivery_stream(DeliveryStreamName=name)[ + "DeliveryStreamDescription" + ] + resp.shouldnt.have.key("DeliveryStreamEncryptionConfiguration") + + client.start_delivery_stream_encryption( + DeliveryStreamName=name, + DeliveryStreamEncryptionConfigurationInput={"KeyType": "AWS_OWNED_CMK"}, + ) + + stream = client.describe_delivery_stream(DeliveryStreamName=name)[ + "DeliveryStreamDescription" + ] + stream.should.have.key("DeliveryStreamEncryptionConfiguration").equals( + { + "KeyType": "AWS_OWNED_CMK", + "Status": "ENABLED", + } + ) + + +@mock_firehose +def test_firehose_with_encryption(): + client = boto3.client("firehose", region_name="us-east-2") + name = str(uuid4())[0:6] + client.create_delivery_stream( + DeliveryStreamName=name, + ExtendedS3DestinationConfiguration=sample_s3_dest_config(), + DeliveryStreamEncryptionConfigurationInput={"KeyType": "AWS_OWNED_CMK"}, + ) + + stream = client.describe_delivery_stream(DeliveryStreamName=name)[ + "DeliveryStreamDescription" + ] + stream.should.have.key("DeliveryStreamEncryptionConfiguration").equals( + {"KeyType": "AWS_OWNED_CMK"} + ) + + client.stop_delivery_stream_encryption(DeliveryStreamName=name) + + stream = client.describe_delivery_stream(DeliveryStreamName=name)[ + "DeliveryStreamDescription" + ] + stream.should.have.key("DeliveryStreamEncryptionConfiguration").should.have.key( + "Status" + ).equals("DISABLED") + + +@mock_firehose +def test_start_encryption_on_unknown_stream(): + client = boto3.client("firehose", region_name="us-east-2") + + with pytest.raises(ClientError) as exc: + client.start_delivery_stream_encryption( + DeliveryStreamName="?", + DeliveryStreamEncryptionConfigurationInput={"KeyType": "AWS_OWNED_CMK"}, + ) + err = exc.value.response["Error"] + err["Code"].should.equal("ResourceNotFoundException") + err["Message"].should.equal("Firehose ? under account 123456789012 not found.") + + +@mock_firehose +def test_stop_encryption_on_unknown_stream(): + client = boto3.client("firehose", region_name="us-east-2") + + with pytest.raises(ClientError) as exc: + client.stop_delivery_stream_encryption(DeliveryStreamName="?") + err = exc.value.response["Error"] + err["Code"].should.equal("ResourceNotFoundException") + err["Message"].should.equal("Firehose ? under account 123456789012 not found.") diff --git a/tests/test_firehose/test_http_destinations.py b/tests/test_firehose/test_http_destinations.py new file mode 100644 index 000000000..1fd5077be --- /dev/null +++ b/tests/test_firehose/test_http_destinations.py @@ -0,0 +1,85 @@ +import boto3 +import sure # noqa # pylint: disable=unused-import + +from moto import mock_firehose +from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID +from moto.moto_api._internal import mock_random +from .test_firehose_destination_types import create_http_delivery_stream + +TEST_REGION = "us-west-1" + + +@mock_firehose +def test_create_http_stream(): + """Verify fields of a HTTP delivery stream.""" + client = boto3.client("firehose", region_name=TEST_REGION) + + stream_name = f"stream_{mock_random.get_random_hex(6)}" + response = create_http_delivery_stream(client, stream_name) + stream_arn = response["DeliveryStreamARN"] + + response = client.describe_delivery_stream(DeliveryStreamName=stream_name) + stream_description = response["DeliveryStreamDescription"] + + # Sure and Freezegun don't play nicely together + _ = stream_description.pop("CreateTimestamp") + _ = stream_description.pop("LastUpdateTimestamp") + + stream_description.should.equal( + { + "DeliveryStreamName": stream_name, + "DeliveryStreamARN": stream_arn, + "DeliveryStreamStatus": "ACTIVE", + "DeliveryStreamType": "DirectPut", + "VersionId": "1", + "Destinations": [ + { + "DestinationId": "destinationId-000000000001", + "HttpEndpointDestinationDescription": { + "EndpointConfiguration": {"Url": "google.com"}, + "RetryOptions": {"DurationInSeconds": 100}, + "BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124}, + "CloudWatchLoggingOptions": {"Enabled": False}, + "S3DestinationDescription": { + "RoleARN": f"arn:aws:iam::{ACCOUNT_ID}:role/firehose_delivery_role", + "BucketARN": "arn:aws:s3:::firehose-test", + "Prefix": "myFolder/", + "BufferingHints": { + "SizeInMBs": 123, + "IntervalInSeconds": 124, + }, + "CompressionFormat": "UNCOMPRESSED", + }, + }, + } + ], + "HasMoreDestinations": False, + } + ) + + +@mock_firehose +def test_update_s3_for_http_stream(): + """Verify fields of a HTTP delivery stream.""" + client = boto3.client("firehose", region_name=TEST_REGION) + + stream_name = f"stream_{mock_random.get_random_hex(6)}" + create_http_delivery_stream(client, stream_name) + + stream = client.describe_delivery_stream(DeliveryStreamName=stream_name) + version_id = stream["DeliveryStreamDescription"]["VersionId"] + + client.update_destination( + DeliveryStreamName=stream_name, + CurrentDeliveryStreamVersionId=version_id, + DestinationId="destinationId-000000000001", + HttpEndpointDestinationUpdate={"S3Update": {"ErrorOutputPrefix": "prefix2"}}, + ) + + desc = client.describe_delivery_stream(DeliveryStreamName=stream_name)[ + "DeliveryStreamDescription" + ] + s3_desc = desc["Destinations"][0]["HttpEndpointDestinationDescription"][ + "S3DestinationDescription" + ] + s3_desc.should.have.key("ErrorOutputPrefix").equals("prefix2")