From 69c2a11f5cb445e319c0fbde9d336bf1967cb2a2 Mon Sep 17 00:00:00 2001 From: kbalk <7536198+kbalk@users.noreply.github.com> Date: Wed, 15 Sep 2021 04:40:27 -0400 Subject: [PATCH] Allow CloudWatch logs to be sent to Firehose (#4286) Co-authored-by: Karri Balk --- moto/firehose/models.py | 53 ++++++++++++--- moto/logs/models.py | 40 +++++++++-- tests/test_firehose/test_firehose.py | 27 ++++++++ tests/test_logs/test_integration.py | 99 ++++++++++++++++++++++++++-- 4 files changed, 200 insertions(+), 19 deletions(-) diff --git a/moto/firehose/models.py b/moto/firehose/models.py index 9c3d24d40..cc1672bf8 100644 --- a/moto/firehose/models.py +++ b/moto/firehose/models.py @@ -3,8 +3,11 @@ Incomplete list of unfinished items: - The create_delivery_stream() argument DeliveryStreamEncryptionConfigurationInput is not supported. + - The S3BackupMode argument is ignored as are most of the other + destination arguments. + - Data record size and number of transactions are ignored. - Better validation of delivery destination parameters, e.g., - validation of the url for an http endpoint (boto3 does this), + validation of the url for an http endpoint (boto3 does this). - Better handling of the put_record_batch() API. Not only is the existing logic bare bones, but for the ElasticSearch and RedShift destinations, the data is just ignored. @@ -12,8 +15,11 @@ Incomplete list of unfinished items: are reported back to the user. Instead an exception is raised. - put_record(), put_record_batch() always set "Encrypted" to False. """ -from base64 import b64decode +from base64 import b64decode, b64encode from datetime import datetime +from gzip import GzipFile +import io +import json from time import time from uuid import uuid4 import warnings @@ -32,7 +38,6 @@ from moto.firehose.exceptions import ( ResourceNotFoundException, ValidationException, ) -from moto.core.utils import get_random_hex from moto.s3 import s3_backend from moto.utilities.tagging_service import TaggingService @@ -163,11 +168,6 @@ class FirehoseBackend(BaseBackend): self.delivery_streams = {} self.tagger = TaggingService() - def lookup_name_from_arn(self, arn): - """Given an ARN, return the associated delivery stream name.""" - # TODO - need to test - return self.delivery_streams.get(arn.split("/")[-1]) - def reset(self): """Re-initializes all attributes for this instance.""" region_name = self.region_name @@ -425,7 +425,7 @@ class FirehoseBackend(BaseBackend): return ( f"{prefix}{now.strftime('%Y/%m/%d/%H')}/" f"{delivery_stream_name}-{version_id}-" - f"{now.strftime('%Y-%m-%d-%H-%M-%S')}-{get_random_hex()}" + f"{now.strftime('%Y-%m-%d-%H-%M-%S')}-{str(uuid4())}" ) def put_s3_records(self, delivery_stream_name, version_id, s3_destination, records): @@ -621,6 +621,41 @@ class FirehoseBackend(BaseBackend): # S3 backup if it is disabled. If backup is enabled, you can't update # the delivery stream to disable it." + def lookup_name_from_arn(self, arn): + """Given an ARN, return the associated delivery stream name.""" + return self.delivery_streams.get(arn.split("/")[-1]) + + def send_log_event( + self, + delivery_stream_arn, + filter_name, + log_group_name, + log_stream_name, + log_events, + ): # pylint: disable=too-many-arguments + """Send log events to a S3 bucket after encoding and gzipping it.""" + data = { + "logEvents": log_events, + "logGroup": log_group_name, + "logStream": log_stream_name, + "messageType": "DATA_MESSAGE", + "owner": ACCOUNT_ID, + "subscriptionFilters": [filter_name], + } + + output = io.BytesIO() + with GzipFile(fileobj=output, mode="w") as fhandle: + fhandle.write(json.dumps(data, separators=(",", ":")).encode("utf-8")) + gzipped_payload = b64encode(output.getvalue()).decode("utf-8") + + delivery_stream = self.lookup_name_from_arn(delivery_stream_arn) + self.put_s3_records( + delivery_stream.delivery_stream_name, + delivery_stream.version_id, + delivery_stream.destinations[0]["S3"], + [{"Data": gzipped_payload}], + ) + firehose_backends = {} for available_region in Session().get_available_regions("firehose"): diff --git a/moto/logs/models.py b/moto/logs/models.py index eb62cab7a..ee0fb912c 100644 --- a/moto/logs/models.py +++ b/moto/logs/models.py @@ -123,10 +123,10 @@ class LogStream(BaseModel): self.events += events self.upload_sequence_token += 1 - if self.destination_arn and self.destination_arn.split(":")[2] == "lambda": - from moto.awslambda import lambda_backends # due to circular dependency - - lambda_log_events = [ + service = None + if self.destination_arn: + service = self.destination_arn.split(":")[2] + formatted_log_events = [ { "id": event.event_id, "timestamp": event.timestamp, @@ -135,12 +135,27 @@ class LogStream(BaseModel): for event in events ] + if service == "lambda": + from moto.awslambda import lambda_backends # due to circular dependency + lambda_backends[self.region].send_log_event( self.destination_arn, self.filter_name, log_group_name, log_stream_name, - lambda_log_events, + formatted_log_events, + ) + elif service == "firehose": + from moto.firehose import ( # pylint: disable=import-outside-toplevel + firehose_backends, + ) + + firehose_backends[self.region].send_log_event( + self.destination_arn, + self.filter_name, + log_group_name, + log_stream_name, + formatted_log_events, ) return "{:056d}".format(self.upload_sequence_token) @@ -849,7 +864,22 @@ class LogsBackend(BaseBackend): "have given CloudWatch Logs permission to execute your " "function." ) + elif service == "firehose": + from moto.firehose import ( # pylint: disable=import-outside-toplevel + firehose_backends, + ) + + firehose = firehose_backends[self.region_name].lookup_name_from_arn( + destination_arn + ) + if not firehose: + raise InvalidParameterException( + "Could not deliver test message to specified Firehose " + "stream. Check if the given Firehose stream is in ACTIVE " + "state." + ) else: + # TODO: support Kinesis stream destinations raise InvalidParameterException( f"Service '{service}' has not implemented for " f"put_subscription_filter()" diff --git a/tests/test_firehose/test_firehose.py b/tests/test_firehose/test_firehose.py index 4733fd636..bfb751dc0 100644 --- a/tests/test_firehose/test_firehose.py +++ b/tests/test_firehose/test_firehose.py @@ -493,3 +493,30 @@ def test_update_destination(): "Changing the destination type to or from HttpEndpoint is not " "supported at this time" ) in err["Message"] + + +@mock_firehose +def test_lookup_name_from_arn(): + """Test delivery stream instance can be retrieved given ARN. + + This won't work in TEST_SERVER_MODE as this script won't have access + to 'firehose_backends'. + """ + if settings.TEST_SERVER_MODE: + raise SkipTest("Can't access firehose_backend in server mode") + + client = boto3.client("firehose", region_name=TEST_REGION) + s3_dest_config = sample_s3_dest_config() + stream_name = "test_lookup" + + arn = client.create_delivery_stream( + DeliveryStreamName=stream_name, S3DestinationConfiguration=s3_dest_config, + )["DeliveryStreamARN"] + + from moto.firehose.models import ( # pylint: disable=import-outside-toplevel + firehose_backends, + ) + + delivery_stream = firehose_backends[TEST_REGION].lookup_name_from_arn(arn) + assert delivery_stream.delivery_stream_arn == arn + assert delivery_stream.delivery_stream_name == stream_name diff --git a/tests/test_logs/test_integration.py b/tests/test_logs/test_integration.py index 590ff29a9..bdcffe3f6 100644 --- a/tests/test_logs/test_integration.py +++ b/tests/test_logs/test_integration.py @@ -1,15 +1,15 @@ import base64 -import boto3 +from io import BytesIO import json -import sure # noqa import time +from zipfile import ZipFile, ZIP_DEFLATED import zlib +import boto3 from botocore.exceptions import ClientError -from io import BytesIO -from moto import mock_logs, mock_lambda, mock_iam +from moto import mock_logs, mock_lambda, mock_iam, mock_firehose, mock_s3 import pytest -from zipfile import ZipFile, ZIP_DEFLATED +import sure # noqa @mock_lambda @@ -177,6 +177,95 @@ def test_put_subscription_filter_with_lambda(): log_events[1]["timestamp"].should.equal(0) +@mock_s3 +@mock_firehose +@mock_logs +@pytest.mark.network +def test_put_subscription_filter_with_firehose(): + # given + region_name = "us-east-1" + client_firehose = boto3.client("firehose", region_name) + client_logs = boto3.client("logs", region_name) + + log_group_name = "/firehose-test" + log_stream_name = "delivery-stream" + client_logs.create_log_group(logGroupName=log_group_name) + client_logs.create_log_stream( + logGroupName=log_group_name, logStreamName=log_stream_name + ) + + # Create a S3 bucket. + bucket_name = "firehosetestbucket" + s3_client = boto3.client("s3", region_name=region_name) + s3_client.create_bucket( + Bucket=bucket_name, + CreateBucketConfiguration={"LocationConstraint": "us-west-1"}, + ) + + # Create the Firehose delivery stream that uses that S3 bucket as + # the destination. + delivery_stream_name = "firehose_log_test" + firehose_arn = client_firehose.create_delivery_stream( + DeliveryStreamName=delivery_stream_name, + ExtendedS3DestinationConfiguration={ + "RoleARN": _get_role_name(region_name), + "BucketARN": f"arn:aws:s3::{bucket_name}", + }, + )["DeliveryStreamARN"] + + # when + client_logs.put_subscription_filter( + logGroupName=log_group_name, + filterName="firehose-test", + filterPattern="", + destinationArn=firehose_arn, + ) + + # then + response = client_logs.describe_subscription_filters(logGroupName=log_group_name) + response["subscriptionFilters"].should.have.length_of(1) + filter = response["subscriptionFilters"][0] + filter["creationTime"].should.be.a(int) + filter["destinationArn"] = firehose_arn + filter["distribution"] = "ByLogStream" + filter["logGroupName"] = "/firehose-test" + filter["filterName"] = "firehose-test" + filter["filterPattern"] = "" + + # when + client_logs.put_log_events( + logGroupName=log_group_name, + logStreamName=log_stream_name, + logEvents=[ + {"timestamp": 0, "message": "test"}, + {"timestamp": 0, "message": "test 2"}, + ], + ) + + # then + bucket_objects = s3_client.list_objects_v2(Bucket=bucket_name) + message = s3_client.get_object( + Bucket=bucket_name, Key=bucket_objects["Contents"][0]["Key"] + ) + response = json.loads( + zlib.decompress(message["Body"].read(), 16 + zlib.MAX_WBITS).decode("utf-8") + ) + + response["messageType"].should.equal("DATA_MESSAGE") + response["owner"].should.equal("123456789012") + response["logGroup"].should.equal("/firehose-test") + response["logStream"].should.equal("delivery-stream") + response["subscriptionFilters"].should.equal(["firehose-test"]) + log_events = sorted(response["logEvents"], key=lambda log_event: log_event["id"]) + log_events.should.have.length_of(2) + log_events[0]["id"].should.be.a(int) + log_events[0]["message"].should.equal("test") + log_events[0]["timestamp"].should.equal(0) + log_events[1]["id"].should.be.a(int) + log_events[1]["message"].should.equal("test 2") + log_events[1]["timestamp"].should.equal(0) + + @mock_lambda @mock_logs def test_delete_subscription_filter():