moto/tests/test_firehose/test_firehose.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

521 lines
20 KiB
Python
Raw Normal View History

import warnings
from unittest import SkipTest
import boto3
import pytest
from botocore.exceptions import ClientError
2024-01-07 12:03:33 +00:00
from moto import mock_aws, settings
2022-08-13 09:49:43 +00:00
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from moto.firehose.models import DeliveryStream
from moto.moto_api._internal import mock_random
TEST_REGION = "us-east-1" if settings.TEST_SERVER_MODE else "us-west-2"
def sample_s3_dest_config():
"""Return a simple extended s3 destination configuration."""
return {
"RoleARN": f"arn:aws:iam::{ACCOUNT_ID}:role/firehose-test-role",
"BucketARN": "arn:aws:s3::firehosetestbucket",
}
2024-01-07 12:03:33 +00:00
@mock_aws
def test_warnings():
"""Test features that raise a warning as they're unimplemented."""
if settings.TEST_SERVER_MODE:
raise SkipTest("Can't capture warnings in server mode")
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
# Can't create a delivery stream for Splunk as it's unimplemented.
stream_name = f"test_splunk_destination_{mock_random.get_random_hex(6)}"
with warnings.catch_warnings(record=True) as warn_msg:
client.create_delivery_stream(
DeliveryStreamName=stream_name,
SplunkDestinationConfiguration={
"HECEndpoint": "foo",
"HECEndpointType": "foo",
"HECToken": "foo",
"S3Configuration": {"RoleARN": "foo", "BucketARN": "foo"},
},
)
assert "Splunk destination delivery stream is not yet implemented" in str(
warn_msg[-1].message
)
# Can't update a delivery stream to Splunk as it's unimplemented.
stream_name = f"test_update_splunk_destination_{mock_random.get_random_hex(6)}"
client.create_delivery_stream(
DeliveryStreamName=stream_name, S3DestinationConfiguration=s3_dest_config
)
with warnings.catch_warnings(record=True) as warn_msg:
client.update_destination(
DeliveryStreamName=stream_name,
CurrentDeliveryStreamVersionId="1",
DestinationId="destinationId-000000000001",
SplunkDestinationUpdate={
"HECEndpoint": "foo",
"HECEndpointType": "foo",
"HECToken": "foo",
},
)
assert "Splunk destination delivery stream is not yet implemented" in str(
warn_msg[-1].message
)
2024-01-07 12:03:33 +00:00
@mock_aws
def test_create_delivery_stream_failures():
"""Test errors invoking create_delivery_stream()."""
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
failure_name = f"test_failure_{mock_random.get_random_hex(6)}"
# Create too many streams.
for idx in range(DeliveryStream.MAX_STREAMS_PER_REGION):
client.create_delivery_stream(
DeliveryStreamName=f"{failure_name}_{idx}",
ExtendedS3DestinationConfiguration=s3_dest_config,
)
with pytest.raises(ClientError) as exc:
client.create_delivery_stream(
DeliveryStreamName=f"{failure_name}_99",
ExtendedS3DestinationConfiguration=s3_dest_config,
)
err = exc.value.response["Error"]
assert err["Code"] == "LimitExceededException"
assert "You have already consumed your firehose quota" in err["Message"]
# Free up the memory from the limits test.
for idx in range(DeliveryStream.MAX_STREAMS_PER_REGION):
client.delete_delivery_stream(DeliveryStreamName=f"{failure_name}_{idx}")
# Create a stream with the same name as an existing stream.
client.create_delivery_stream(
DeliveryStreamName=f"{failure_name}",
ExtendedS3DestinationConfiguration=s3_dest_config,
)
with pytest.raises(ClientError) as exc:
client.create_delivery_stream(
DeliveryStreamName=f"{failure_name}",
ExtendedS3DestinationConfiguration=s3_dest_config,
)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceInUseException"
assert (
f"Firehose {failure_name} under accountId {ACCOUNT_ID} already exists"
in err["Message"]
)
# Only one destination configuration is allowed.
with pytest.raises(ClientError) as exc:
client.create_delivery_stream(
DeliveryStreamName=f"{failure_name}_2manyconfigs",
ExtendedS3DestinationConfiguration=s3_dest_config,
HttpEndpointDestinationConfiguration={
"EndpointConfiguration": {"Url": "google.com"},
"S3Configuration": {"RoleARN": "foo", "BucketARN": "foo"},
},
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidArgumentException"
assert (
"Exactly one destination configuration is supported for a Firehose"
in err["Message"]
)
# At least one destination configuration is required.
with pytest.raises(ClientError) as exc:
client.create_delivery_stream(DeliveryStreamName=f"{failure_name}_2manyconfigs")
err = exc.value.response["Error"]
assert err["Code"] == "InvalidArgumentException"
assert (
"Exactly one destination configuration is supported for a Firehose"
in err["Message"]
)
# Provide a Kinesis source configuration, but use DirectPut stream type.
with pytest.raises(ClientError) as exc:
client.create_delivery_stream(
DeliveryStreamName=f"{failure_name}_bad_source_type",
DeliveryStreamType="DirectPut",
KinesisStreamSourceConfiguration={
"KinesisStreamARN": "kinesis_test_ds",
"RoleARN": "foo",
},
ExtendedS3DestinationConfiguration=s3_dest_config,
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidArgumentException"
assert (
"KinesisSourceStreamConfig is only applicable for "
"KinesisStreamAsSource stream type" in err["Message"]
)
2024-01-07 12:03:33 +00:00
@mock_aws
def test_delete_delivery_stream():
"""Test successful and failed invocations of delete_delivery_stream()."""
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
stream_name = f"test_delete_{mock_random.get_random_hex(6)}"
# Create a couple of streams to test with.
for idx in range(5):
client.create_delivery_stream(
DeliveryStreamName=f"{stream_name}-{idx}",
S3DestinationConfiguration=s3_dest_config,
)
# Attempt to delete a non-existent stream.
with pytest.raises(ClientError) as exc:
client.delete_delivery_stream(DeliveryStreamName="foo")
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert f"Firehose foo under account {ACCOUNT_ID} not found." in err["Message"]
# Delete one stream, verify the remaining exist.
client.delete_delivery_stream(DeliveryStreamName=f"{stream_name}-0")
hoses = client.list_delivery_streams()
assert len(hoses["DeliveryStreamNames"]) == 4
expected_list = [f"{stream_name}-{x}" for x in range(1, 5)]
assert hoses["DeliveryStreamNames"] == expected_list
# Delete all streams, verify there are no more streams.
for idx in range(1, 5):
client.delete_delivery_stream(DeliveryStreamName=f"{stream_name}-{idx}")
hoses = client.list_delivery_streams()
assert len(hoses["DeliveryStreamNames"]) == 0
assert hoses["DeliveryStreamNames"] == []
2024-01-07 12:03:33 +00:00
@mock_aws
def test_describe_delivery_stream():
"""Test successful, failed invocations of describe_delivery_stream()."""
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
stream_name = f"test_describe_{mock_random.get_random_hex(6)}"
role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/testrole"
# Create delivery stream with S3 destination, kinesis type and source
# for testing purposes.
client.create_delivery_stream(
DeliveryStreamName=stream_name,
DeliveryStreamType="KinesisStreamAsSource",
KinesisStreamSourceConfiguration={
"KinesisStreamARN": f"arn:aws:kinesis:{TEST_REGION}:{ACCOUNT_ID}:stream/test-datastream",
"RoleARN": role_arn,
},
S3DestinationConfiguration=s3_dest_config,
)
labels_with_kinesis_source = {
"DeliveryStreamName",
"DeliveryStreamARN",
"DeliveryStreamStatus",
"DeliveryStreamType",
"VersionId",
"CreateTimestamp",
"Source",
"Destinations",
"LastUpdateTimestamp",
"HasMoreDestinations",
}
# Verify the created fields are as expected.
results = client.describe_delivery_stream(DeliveryStreamName=stream_name)
description = results["DeliveryStreamDescription"]
assert set(description.keys()) == labels_with_kinesis_source
assert description["DeliveryStreamName"] == stream_name
assert (
description["DeliveryStreamARN"]
2022-02-18 23:31:33 +00:00
== f"arn:aws:firehose:{TEST_REGION}:{ACCOUNT_ID}:deliverystream/{stream_name}"
)
assert description["DeliveryStreamStatus"] == "ACTIVE"
assert description["DeliveryStreamType"] == "KinesisStreamAsSource"
assert description["VersionId"] == "1"
assert (
description["Source"]["KinesisStreamSourceDescription"]["RoleARN"] == role_arn
)
assert len(description["Destinations"]) == 1
assert set(description["Destinations"][0].keys()) == {
"S3DestinationDescription",
"DestinationId",
}
# Update destination with ExtendedS3.
client.update_destination(
DeliveryStreamName=stream_name,
CurrentDeliveryStreamVersionId="1",
DestinationId="destinationId-000000000001",
ExtendedS3DestinationUpdate=s3_dest_config,
)
# Verify the created fields are as expected. There should be one
# destination with two destination types. The last update timestamp
# should be present and the version number updated.
results = client.describe_delivery_stream(DeliveryStreamName=stream_name)
description = results["DeliveryStreamDescription"]
assert set(description.keys()) == labels_with_kinesis_source | {
"LastUpdateTimestamp"
}
assert description["DeliveryStreamName"] == stream_name
assert (
description["DeliveryStreamARN"]
2022-02-18 23:31:33 +00:00
== f"arn:aws:firehose:{TEST_REGION}:{ACCOUNT_ID}:deliverystream/{stream_name}"
)
assert description["DeliveryStreamStatus"] == "ACTIVE"
assert description["DeliveryStreamType"] == "KinesisStreamAsSource"
assert description["VersionId"] == "2"
assert (
description["Source"]["KinesisStreamSourceDescription"]["RoleARN"] == role_arn
)
assert len(description["Destinations"]) == 1
assert set(description["Destinations"][0].keys()) == {
"S3DestinationDescription",
"DestinationId",
"ExtendedS3DestinationDescription",
}
# Update ExtendedS3 destination with a few different values.
client.update_destination(
DeliveryStreamName=stream_name,
CurrentDeliveryStreamVersionId="2",
DestinationId="destinationId-000000000001",
ExtendedS3DestinationUpdate={
"RoleARN": role_arn,
"BucketARN": "arn:aws:s3:::testbucket",
# IntervalInSeconds increased from 300 to 700.
"BufferingHints": {"IntervalInSeconds": 700, "SizeInMBs": 5},
},
)
results = client.describe_delivery_stream(DeliveryStreamName=stream_name)
description = results["DeliveryStreamDescription"]
assert description["VersionId"] == "3"
assert len(description["Destinations"]) == 1
destination = description["Destinations"][0]
assert (
destination["ExtendedS3DestinationDescription"]["BufferingHints"][
"IntervalInSeconds"
]
== 700
)
# Verify S3 was added when ExtendedS3 was added.
assert set(destination.keys()) == {
"S3DestinationDescription",
"DestinationId",
"ExtendedS3DestinationDescription",
}
assert set(destination["S3DestinationDescription"].keys()) == {
"RoleARN",
"BucketARN",
"BufferingHints",
}
# Update delivery stream from ExtendedS3 to S3.
client.update_destination(
DeliveryStreamName=stream_name,
CurrentDeliveryStreamVersionId="3",
DestinationId="destinationId-000000000001",
S3DestinationUpdate={
"RoleARN": role_arn,
"BucketARN": "arn:aws:s3:::testbucket",
# IntervalInSeconds decreased from 700 to 500.
"BufferingHints": {"IntervalInSeconds": 500, "SizeInMBs": 5},
},
)
results = client.describe_delivery_stream(DeliveryStreamName=stream_name)
description = results["DeliveryStreamDescription"]
assert description["VersionId"] == "4"
assert len(description["Destinations"]) == 1
assert set(description["Destinations"][0].keys()) == {
"S3DestinationDescription",
"ExtendedS3DestinationDescription",
"DestinationId",
}
destination = description["Destinations"][0]
assert (
destination["ExtendedS3DestinationDescription"]["BufferingHints"][
"IntervalInSeconds"
]
== 500
)
assert (
destination["S3DestinationDescription"]["BufferingHints"]["IntervalInSeconds"]
== 500
)
2024-01-07 12:03:33 +00:00
@mock_aws
def test_list_delivery_streams():
"""Test successful and failed invocations of list_delivery_streams()."""
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
stream_name = f"test_list_{mock_random.get_random_hex(6)}"
# Create a couple of streams of both types to test with.
for idx in range(5):
client.create_delivery_stream(
DeliveryStreamName=f"{stream_name}-{idx}",
DeliveryStreamType="DirectPut",
S3DestinationConfiguration=s3_dest_config,
)
for idx in range(5):
client.create_delivery_stream(
DeliveryStreamName=f"{stream_name}-{idx+5}",
DeliveryStreamType="KinesisStreamAsSource",
S3DestinationConfiguration=s3_dest_config,
)
# Verify limit works.
hoses = client.list_delivery_streams(Limit=1)
assert len(hoses["DeliveryStreamNames"]) == 1
assert hoses["DeliveryStreamNames"] == [f"{stream_name}-0"]
assert hoses["HasMoreDeliveryStreams"] is True
hoses = client.list_delivery_streams(Limit=10)
assert len(hoses["DeliveryStreamNames"]) == 10
assert hoses["HasMoreDeliveryStreams"] is False
# Verify delivery_stream_type works as a filter.
hoses = client.list_delivery_streams(DeliveryStreamType="DirectPut")
assert len(hoses["DeliveryStreamNames"]) == 5
expected_directput_list = [f"{stream_name}-{x}" for x in range(5)]
assert hoses["DeliveryStreamNames"] == expected_directput_list
assert hoses["HasMoreDeliveryStreams"] is False
hoses = client.list_delivery_streams(DeliveryStreamType="KinesisStreamAsSource")
assert len(hoses["DeliveryStreamNames"]) == 5
expected_kinesis_stream_list = [f"{stream_name}-{x+5}" for x in range(5)]
assert hoses["DeliveryStreamNames"] == expected_kinesis_stream_list
assert hoses["HasMoreDeliveryStreams"] is False
# Verify exclusive_start_delivery_stream_name returns truncated list.
hoses = client.list_delivery_streams(
ExclusiveStartDeliveryStreamName=f"{stream_name}-5"
)
assert len(hoses["DeliveryStreamNames"]) == 4
expected_stream_list = [f"{stream_name}-{x+5}" for x in range(1, 5)]
assert hoses["DeliveryStreamNames"] == expected_stream_list
assert hoses["HasMoreDeliveryStreams"] is False
hoses = client.list_delivery_streams(
ExclusiveStartDeliveryStreamName=f"{stream_name}-9"
)
assert len(hoses["DeliveryStreamNames"]) == 0
assert hoses["HasMoreDeliveryStreams"] is False
# boto3 ignores bad stream names for ExclusiveStartDeliveryStreamName.
hoses = client.list_delivery_streams(ExclusiveStartDeliveryStreamName="foo")
assert len(hoses["DeliveryStreamNames"]) == 10
assert (
hoses["DeliveryStreamNames"]
== expected_directput_list + expected_kinesis_stream_list
)
assert hoses["HasMoreDeliveryStreams"] is False
# Verify no parameters returns entire list.
client.list_delivery_streams()
assert len(hoses["DeliveryStreamNames"]) == 10
assert (
hoses["DeliveryStreamNames"]
== expected_directput_list + expected_kinesis_stream_list
)
assert hoses["HasMoreDeliveryStreams"] is False
2024-01-07 12:03:33 +00:00
@mock_aws
def test_update_destination():
"""Test successful, failed invocations of update_destination()."""
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
# Can't update a non-existent stream.
with pytest.raises(ClientError) as exc:
client.update_destination(
DeliveryStreamName="foo",
CurrentDeliveryStreamVersionId="1",
DestinationId="destinationId-000000000001",
ExtendedS3DestinationUpdate=s3_dest_config,
)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert f"Firehose foo under accountId {ACCOUNT_ID} not found" in err["Message"]
# Create a delivery stream for testing purposes.
stream_name = f"test_update_{mock_random.get_random_hex(6)}"
client.create_delivery_stream(
DeliveryStreamName=stream_name,
DeliveryStreamType="DirectPut",
S3DestinationConfiguration=s3_dest_config,
)
# Only one destination configuration is allowed.
with pytest.raises(ClientError) as exc:
client.update_destination(
DeliveryStreamName=stream_name,
CurrentDeliveryStreamVersionId="1",
DestinationId="destinationId-000000000001",
S3DestinationUpdate=s3_dest_config,
ExtendedS3DestinationUpdate=s3_dest_config,
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidArgumentException"
assert (
"Exactly one destination configuration is supported for a Firehose"
in err["Message"]
)
# Can't update to/from http or ES.
with pytest.raises(ClientError) as exc:
client.update_destination(
DeliveryStreamName=stream_name,
CurrentDeliveryStreamVersionId="1",
DestinationId="destinationId-000000000001",
HttpEndpointDestinationUpdate={
"EndpointConfiguration": {"Url": "https://google.com"},
"RetryOptions": {"DurationInSeconds": 100},
},
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidArgumentException"
assert (
"Changing the destination type to or from HttpEndpoint is not "
"supported at this time"
) in err["Message"]
2024-01-07 12:03:33 +00:00
@mock_aws
def test_lookup_name_from_arn():
"""Test delivery stream instance can be retrieved given ARN.
This won't work in TEST_SERVER_MODE as this script won't have access
to 'firehose_backends'.
"""
if settings.TEST_SERVER_MODE:
raise SkipTest("Can't access firehose_backend in server mode")
client = boto3.client("firehose", region_name=TEST_REGION)
s3_dest_config = sample_s3_dest_config()
stream_name = "test_lookup"
arn = client.create_delivery_stream(
DeliveryStreamName=stream_name, S3DestinationConfiguration=s3_dest_config
)["DeliveryStreamARN"]
from moto.firehose.models import ( # pylint: disable=import-outside-toplevel
firehose_backends,
)
2022-08-13 09:49:43 +00:00
delivery_stream = firehose_backends[ACCOUNT_ID][TEST_REGION].lookup_name_from_arn(
arn
)
assert delivery_stream.delivery_stream_arn == arn
assert delivery_stream.delivery_stream_name == stream_name