Firehose Improvements (#6028)

This commit is contained in:
Bert Blommers 2023-03-07 22:08:55 -01:00 committed by GitHub
parent adfbff1095
commit 2b00b050de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 280 additions and 109 deletions

View File

@ -2986,7 +2986,7 @@
## firehose
<details>
<summary>83% implemented</summary>
<summary>100% implemented</summary>
- [X] create_delivery_stream
- [X] delete_delivery_stream
@ -2995,8 +2995,8 @@
- [X] list_tags_for_delivery_stream
- [X] put_record
- [X] put_record_batch
- [ ] start_delivery_stream_encryption
- [ ] stop_delivery_stream_encryption
- [X] start_delivery_stream_encryption
- [X] stop_delivery_stream_encryption
- [X] tag_delivery_stream
- [X] untag_delivery_stream
- [X] update_destination

View File

@ -56,8 +56,8 @@ firehose
- [X] put_record_batch
Write multiple data records into a Kinesis Data firehose stream.
- [ ] start_delivery_stream_encryption
- [ ] stop_delivery_stream_encryption
- [X] start_delivery_stream_encryption
- [X] stop_delivery_stream_encryption
- [X] tag_delivery_stream
Add/update tags for specified delivery stream.
@ -65,6 +65,4 @@ firehose
Removes tags from specified delivery stream.
- [X] update_destination
Updates specified destination of specified delivery stream.

View File

@ -1,10 +1,6 @@
"""FirehoseBackend class with methods for supported APIs.
Incomplete list of unfinished items:
- The create_delivery_stream() argument
DeliveryStreamEncryptionConfigurationInput is not supported.
- The S3BackupMode argument is ignored as are most of the other
destination arguments.
- Data record size and number of transactions are ignored.
- Better validation of delivery destination parameters, e.g.,
validation of the url for an http endpoint (boto3 does this).
@ -116,12 +112,20 @@ class DeliveryStream(
MAX_STREAMS_PER_REGION = 50
ALTERNATIVE_FIELD_NAMES = [
("S3Configuration", "S3DestinationDescription"),
("S3Update", "S3DestinationDescription"),
("S3BackupConfiguration", "S3BackupDescription"),
("S3BackupUpdate", "S3BackupDescription"),
]
def __init__(
self,
account_id: str,
region: str,
delivery_stream_name: str,
delivery_stream_type: str,
encryption: Dict[str, Any],
kinesis_stream_source_configuration: Dict[str, Any],
destination_name: str,
destination_config: Dict[str, Any],
@ -131,6 +135,7 @@ class DeliveryStream(
self.delivery_stream_type = (
delivery_stream_type if delivery_stream_type else "DirectPut"
)
self.delivery_stream_encryption_configuration = encryption
self.source = kinesis_stream_source_configuration
self.destinations: List[Dict[str, Any]] = [
@ -139,18 +144,19 @@ class DeliveryStream(
destination_name: destination_config,
}
]
if destination_name == "ExtendedS3":
# Add a S3 destination as well, minus a few ExtendedS3 fields.
self.destinations[0]["S3"] = create_s3_destination_config(
destination_config
)
elif "S3Configuration" in destination_config:
# S3Configuration becomes S3DestinationDescription for the
# other destinations.
self.destinations[0][destination_name][
"S3DestinationDescription"
] = destination_config["S3Configuration"]
del self.destinations[0][destination_name]["S3Configuration"]
# S3Configuration becomes S3DestinationDescription for the
# other destinations. Same for S3Backup
for old, new in DeliveryStream.ALTERNATIVE_FIELD_NAMES:
if old in destination_config:
self.destinations[0][destination_name][new] = destination_config[old]
del self.destinations[0][destination_name][old]
self.delivery_stream_status = "ACTIVE"
self.delivery_stream_arn = f"arn:aws:firehose:{region}:{account_id}:deliverystream/{delivery_stream_name}"
@ -213,12 +219,6 @@ class FirehoseBackend(BaseBackend):
)
# Rule out situations that are not yet implemented.
if delivery_stream_encryption_configuration_input:
warnings.warn(
"A delivery stream with server-side encryption enabled is not "
"yet implemented"
)
if destination_name == "Splunk":
warnings.warn("A Splunk destination delivery stream is not yet implemented")
@ -247,13 +247,14 @@ class FirehoseBackend(BaseBackend):
# by delivery stream name. This instance will update the state and
# create the ARN.
delivery_stream = DeliveryStream(
self.account_id,
region,
delivery_stream_name,
delivery_stream_type,
kinesis_stream_source_configuration,
destination_name,
destination_config,
account_id=self.account_id,
region=region,
delivery_stream_name=delivery_stream_name,
delivery_stream_type=delivery_stream_type,
encryption=delivery_stream_encryption_configuration_input,
kinesis_stream_source_configuration=kinesis_stream_source_configuration,
destination_name=destination_name,
destination_config=destination_config,
)
self.tagger.tag_resource(delivery_stream.delivery_stream_arn, tags or [])
@ -562,6 +563,27 @@ class FirehoseBackend(BaseBackend):
delivery_stream.delivery_stream_arn, tag_keys
)
def start_delivery_stream_encryption(
self, stream_name: str, encryption_config: Dict[str, Any]
) -> None:
delivery_stream = self.delivery_streams.get(stream_name)
if not delivery_stream:
raise ResourceNotFoundException(
f"Firehose {stream_name} under account {self.account_id} not found."
)
delivery_stream.delivery_stream_encryption_configuration = encryption_config
delivery_stream.delivery_stream_encryption_configuration["Status"] = "ENABLED"
def stop_delivery_stream_encryption(self, stream_name: str) -> None:
delivery_stream = self.delivery_streams.get(stream_name)
if not delivery_stream:
raise ResourceNotFoundException(
f"Firehose {stream_name} under account {self.account_id} not found."
)
delivery_stream.delivery_stream_encryption_configuration["Status"] = "DISABLED"
def update_destination( # pylint: disable=unused-argument
self,
delivery_stream_name: str,
@ -575,10 +597,7 @@ class FirehoseBackend(BaseBackend):
splunk_destination_update: Dict[str, Any],
http_endpoint_destination_update: Dict[str, Any],
) -> None:
"""Updates specified destination of specified delivery stream."""
(destination_name, destination_config) = find_destination_config_in_args(
locals()
)
(dest_name, dest_config) = find_destination_config_in_args(locals())
delivery_stream = self.delivery_streams.get(delivery_stream_name)
if not delivery_stream:
@ -586,7 +605,7 @@ class FirehoseBackend(BaseBackend):
f"Firehose {delivery_stream_name} under accountId {self.account_id} not found."
)
if destination_name == "Splunk":
if dest_name == "Splunk":
warnings.warn("A Splunk destination delivery stream is not yet implemented")
if delivery_stream.version_id != current_delivery_stream_version_id:
@ -609,35 +628,42 @@ class FirehoseBackend(BaseBackend):
# Switching between Amazon ES and other services is not supported.
# For an Amazon ES destination, you can only update to another Amazon
# ES destination. Same with HTTP. Didn't test Splunk.
if (
destination_name == "Elasticsearch" and "Elasticsearch" not in destination
) or (destination_name == "HttpEndpoint" and "HttpEndpoint" not in destination):
if (dest_name == "Elasticsearch" and "Elasticsearch" not in destination) or (
dest_name == "HttpEndpoint" and "HttpEndpoint" not in destination
):
raise InvalidArgumentException(
f"Changing the destination type to or from {destination_name} "
f"Changing the destination type to or from {dest_name} "
f"is not supported at this time."
)
# If this is a different type of destination configuration,
# the existing configuration is reset first.
if destination_name in destination:
delivery_stream.destinations[destination_idx][destination_name].update(
destination_config
)
if dest_name in destination:
delivery_stream.destinations[destination_idx][dest_name].update(dest_config)
else:
delivery_stream.destinations[destination_idx] = {
"destination_id": destination_id,
destination_name: destination_config,
dest_name: dest_config,
}
# Some names in the Update-request differ from the names used by Describe
for old, new in DeliveryStream.ALTERNATIVE_FIELD_NAMES:
if old in dest_config:
if new not in delivery_stream.destinations[destination_idx][dest_name]:
delivery_stream.destinations[destination_idx][dest_name][new] = {}
delivery_stream.destinations[destination_idx][dest_name][new].update(
dest_config[old]
)
# Once S3 is updated to an ExtendedS3 destination, both remain in
# the destination. That means when one is updated, the other needs
# to be updated as well. The problem is that they don't have the
# same fields.
if destination_name == "ExtendedS3":
if dest_name == "ExtendedS3":
delivery_stream.destinations[destination_idx][
"S3"
] = create_s3_destination_config(destination_config)
elif destination_name == "S3" and "ExtendedS3" in destination:
] = create_s3_destination_config(dest_config)
elif dest_name == "S3" and "ExtendedS3" in destination:
destination["ExtendedS3"] = {
k: v
for k, v in destination["S3"].items()

View File

@ -109,3 +109,18 @@ class FirehoseResponse(BaseResponse):
self._get_param("HttpEndpointDestinationUpdate"),
)
return json.dumps({})
def start_delivery_stream_encryption(self) -> str:
stream_name = self._get_param("DeliveryStreamName")
encryption_config = self._get_param(
"DeliveryStreamEncryptionConfigurationInput"
)
self.firehose_backend.start_delivery_stream_encryption(
stream_name, encryption_config
)
return "{}"
def stop_delivery_stream_encryption(self) -> str:
stream_name = self._get_param("DeliveryStreamName")
self.firehose_backend.stop_delivery_stream_encryption(stream_name)
return "{}"

View File

@ -514,3 +514,6 @@ class GlueResponse(BaseResponse):
"CrawlersNotFound": crawlers_not_found,
}
)
def get_partition_indexes(self):
return json.dumps({"PartitionIndexDescriptorList": []})

View File

@ -265,6 +265,22 @@ events:
- TestAccEventsConnection
- TestAccEventsConnectionDataSource
- TestAccEventsPermission
firehose:
- TestAccFirehoseDeliveryStreamDataSource_basic
- TestAccFirehoseDeliveryStream_basic
- TestAccFirehoseDeliveryStream_missingProcessing
- TestAccFirehoseDeliveryStream_HTTPEndpoint_retryDuration
- TestAccFirehoseDeliveryStream_ExtendedS3_
- TestAccFirehoseDeliveryStream_extendedS3DynamicPartitioningUpdate
- TestAccFirehoseDeliveryStream_extendedS3DynamicPartitioning
- TestAccFirehoseDeliveryStream_extendedS3KMSKeyARN
- TestAccFirehoseDeliveryStream_ExtendedS3Processing_empty
- TestAccFirehoseDeliveryStream_extendedS3Updates
- TestAccFirehoseDeliveryStream_s3
- TestAccFirehoseDeliveryStream_HTTP
- TestAccFirehoseDeliveryStream_http
- TestAccFirehoseDeliveryStream_splunk
- TestAccFirehoseDeliveryStream_Splunk
glue:
- TestAccGlueSchema_
guardduty:

View File

@ -32,18 +32,6 @@ def test_warnings():
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
# DeliveryStreamEncryption is not supported.
stream_name = f"test_warning_{mock_random.get_random_hex(6)}"
with warnings.catch_warnings(record=True) as warn_msg:
client.create_delivery_stream(
DeliveryStreamName=stream_name,
ExtendedS3DestinationConfiguration=s3_dest_config,
DeliveryStreamEncryptionConfigurationInput={"KeyType": "AWS_OWNED_CMK"},
)
assert "server-side encryption enabled is not yet implemented" in str(
warn_msg[-1].message
)
# Can't create a delivery stream for Splunk as it's unimplemented.
stream_name = f"test_splunk_destination_{mock_random.get_random_hex(6)}"
with warnings.catch_warnings(record=True) as warn_msg:

View File

@ -317,52 +317,3 @@ def test_create_s3_delivery_stream():
"HasMoreDestinations": False,
}
)
@mock_firehose
def test_create_http_stream():
"""Verify fields of a HTTP delivery stream."""
client = boto3.client("firehose", region_name=TEST_REGION)
stream_name = f"stream_{mock_random.get_random_hex(6)}"
response = create_http_delivery_stream(client, stream_name)
stream_arn = response["DeliveryStreamARN"]
response = client.describe_delivery_stream(DeliveryStreamName=stream_name)
stream_description = response["DeliveryStreamDescription"]
# Sure and Freezegun don't play nicely together
_ = stream_description.pop("CreateTimestamp")
_ = stream_description.pop("LastUpdateTimestamp")
stream_description.should.equal(
{
"DeliveryStreamName": stream_name,
"DeliveryStreamARN": stream_arn,
"DeliveryStreamStatus": "ACTIVE",
"DeliveryStreamType": "DirectPut",
"VersionId": "1",
"Destinations": [
{
"DestinationId": "destinationId-000000000001",
"HttpEndpointDestinationDescription": {
"EndpointConfiguration": {"Url": "google.com"},
"RetryOptions": {"DurationInSeconds": 100},
"BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124},
"CloudWatchLoggingOptions": {"Enabled": False},
"S3DestinationDescription": {
"RoleARN": f"arn:aws:iam::{ACCOUNT_ID}:role/firehose_delivery_role",
"BucketARN": "arn:aws:s3:::firehose-test",
"Prefix": "myFolder/",
"BufferingHints": {
"SizeInMBs": 123,
"IntervalInSeconds": 124,
},
"CompressionFormat": "UNCOMPRESSED",
},
},
}
],
"HasMoreDestinations": False,
}
)

View File

@ -0,0 +1,89 @@
import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_firehose
from uuid import uuid4
from .test_firehose import sample_s3_dest_config
@mock_firehose
def test_firehose_without_encryption():
client = boto3.client("firehose", region_name="us-east-2")
name = str(uuid4())[0:6]
client.create_delivery_stream(
DeliveryStreamName=name,
ExtendedS3DestinationConfiguration=sample_s3_dest_config(),
)
resp = client.describe_delivery_stream(DeliveryStreamName=name)[
"DeliveryStreamDescription"
]
resp.shouldnt.have.key("DeliveryStreamEncryptionConfiguration")
client.start_delivery_stream_encryption(
DeliveryStreamName=name,
DeliveryStreamEncryptionConfigurationInput={"KeyType": "AWS_OWNED_CMK"},
)
stream = client.describe_delivery_stream(DeliveryStreamName=name)[
"DeliveryStreamDescription"
]
stream.should.have.key("DeliveryStreamEncryptionConfiguration").equals(
{
"KeyType": "AWS_OWNED_CMK",
"Status": "ENABLED",
}
)
@mock_firehose
def test_firehose_with_encryption():
client = boto3.client("firehose", region_name="us-east-2")
name = str(uuid4())[0:6]
client.create_delivery_stream(
DeliveryStreamName=name,
ExtendedS3DestinationConfiguration=sample_s3_dest_config(),
DeliveryStreamEncryptionConfigurationInput={"KeyType": "AWS_OWNED_CMK"},
)
stream = client.describe_delivery_stream(DeliveryStreamName=name)[
"DeliveryStreamDescription"
]
stream.should.have.key("DeliveryStreamEncryptionConfiguration").equals(
{"KeyType": "AWS_OWNED_CMK"}
)
client.stop_delivery_stream_encryption(DeliveryStreamName=name)
stream = client.describe_delivery_stream(DeliveryStreamName=name)[
"DeliveryStreamDescription"
]
stream.should.have.key("DeliveryStreamEncryptionConfiguration").should.have.key(
"Status"
).equals("DISABLED")
@mock_firehose
def test_start_encryption_on_unknown_stream():
client = boto3.client("firehose", region_name="us-east-2")
with pytest.raises(ClientError) as exc:
client.start_delivery_stream_encryption(
DeliveryStreamName="?",
DeliveryStreamEncryptionConfigurationInput={"KeyType": "AWS_OWNED_CMK"},
)
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal("Firehose ? under account 123456789012 not found.")
@mock_firehose
def test_stop_encryption_on_unknown_stream():
client = boto3.client("firehose", region_name="us-east-2")
with pytest.raises(ClientError) as exc:
client.stop_delivery_stream_encryption(DeliveryStreamName="?")
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal("Firehose ? under account 123456789012 not found.")

View File

@ -0,0 +1,85 @@
import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_firehose
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from moto.moto_api._internal import mock_random
from .test_firehose_destination_types import create_http_delivery_stream
TEST_REGION = "us-west-1"
@mock_firehose
def test_create_http_stream():
"""Verify fields of a HTTP delivery stream."""
client = boto3.client("firehose", region_name=TEST_REGION)
stream_name = f"stream_{mock_random.get_random_hex(6)}"
response = create_http_delivery_stream(client, stream_name)
stream_arn = response["DeliveryStreamARN"]
response = client.describe_delivery_stream(DeliveryStreamName=stream_name)
stream_description = response["DeliveryStreamDescription"]
# Sure and Freezegun don't play nicely together
_ = stream_description.pop("CreateTimestamp")
_ = stream_description.pop("LastUpdateTimestamp")
stream_description.should.equal(
{
"DeliveryStreamName": stream_name,
"DeliveryStreamARN": stream_arn,
"DeliveryStreamStatus": "ACTIVE",
"DeliveryStreamType": "DirectPut",
"VersionId": "1",
"Destinations": [
{
"DestinationId": "destinationId-000000000001",
"HttpEndpointDestinationDescription": {
"EndpointConfiguration": {"Url": "google.com"},
"RetryOptions": {"DurationInSeconds": 100},
"BufferingHints": {"SizeInMBs": 123, "IntervalInSeconds": 124},
"CloudWatchLoggingOptions": {"Enabled": False},
"S3DestinationDescription": {
"RoleARN": f"arn:aws:iam::{ACCOUNT_ID}:role/firehose_delivery_role",
"BucketARN": "arn:aws:s3:::firehose-test",
"Prefix": "myFolder/",
"BufferingHints": {
"SizeInMBs": 123,
"IntervalInSeconds": 124,
},
"CompressionFormat": "UNCOMPRESSED",
},
},
}
],
"HasMoreDestinations": False,
}
)
@mock_firehose
def test_update_s3_for_http_stream():
"""Verify fields of a HTTP delivery stream."""
client = boto3.client("firehose", region_name=TEST_REGION)
stream_name = f"stream_{mock_random.get_random_hex(6)}"
create_http_delivery_stream(client, stream_name)
stream = client.describe_delivery_stream(DeliveryStreamName=stream_name)
version_id = stream["DeliveryStreamDescription"]["VersionId"]
client.update_destination(
DeliveryStreamName=stream_name,
CurrentDeliveryStreamVersionId=version_id,
DestinationId="destinationId-000000000001",
HttpEndpointDestinationUpdate={"S3Update": {"ErrorOutputPrefix": "prefix2"}},
)
desc = client.describe_delivery_stream(DeliveryStreamName=stream_name)[
"DeliveryStreamDescription"
]
s3_desc = desc["Destinations"][0]["HttpEndpointDestinationDescription"][
"S3DestinationDescription"
]
s3_desc.should.have.key("ErrorOutputPrefix").equals("prefix2")