From be17a7d8e28388dd72e6007976102935eff14e2f Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Wed, 5 Apr 2023 09:36:37 +0000 Subject: [PATCH] Logs: put_subscription_filter() now supports KinesisStream destinations (#6176) --- moto/kinesis/models.py | 37 +++++++++++++++ moto/logs/models.py | 21 +++++++++ tests/test_logs/test_integration.py | 70 +++++++++++++++++++++++++++++ 3 files changed, 128 insertions(+) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index 77689eb62..2a060e6b1 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -1,5 +1,9 @@ +from base64 import b64encode from collections import OrderedDict +from gzip import GzipFile import datetime +import io +import json import re import itertools @@ -8,6 +12,7 @@ from typing import Any, Dict, List, Optional, Tuple, Iterable from moto.core import BaseBackend, BackendDict, BaseModel, CloudFormationModel from moto.core.utils import unix_time +from moto.moto_api._internal import mock_random as random from moto.utilities.paginator import paginate from moto.utilities.utils import md5_hash from .exceptions import ( @@ -958,5 +963,37 @@ class KinesisBackend(BaseBackend): stream = self._find_stream_by_arn(stream_arn) stream.stream_mode = stream_mode + """Send log events to a Stream after encoding and gzipping it.""" + + def send_log_event( + self, + delivery_stream_arn: str, + filter_name: str, + log_group_name: str, + log_stream_name: str, + log_events: List[Dict[str, Any]], + ) -> None: + data = { + "logEvents": log_events, + "logGroup": log_group_name, + "logStream": log_stream_name, + "messageType": "DATA_MESSAGE", + "owner": self.account_id, + "subscriptionFilters": [filter_name], + } + + output = io.BytesIO() + with GzipFile(fileobj=output, mode="w") as fhandle: + fhandle.write(json.dumps(data, separators=(",", ":")).encode("utf-8")) + gzipped_payload = b64encode(output.getvalue()).decode("UTF-8") + + stream = self.describe_stream(stream_arn=delivery_stream_arn, stream_name=None) + random_partition_key = random.get_random_string(length=32, lower_case=True) + stream.put_record( + partition_key=random_partition_key, + data=gzipped_payload, + explicit_hash_key="", + ) + kinesis_backends = BackendDict(KinesisBackend, "kinesis") diff --git a/moto/logs/models.py b/moto/logs/models.py index 633cb2704..09ed64f1a 100644 --- a/moto/logs/models.py +++ b/moto/logs/models.py @@ -156,6 +156,17 @@ class LogStream(BaseModel): log_stream_name, formatted_log_events, # type: ignore ) + elif service == "kinesis": + from moto.kinesis import kinesis_backends + + kinesis = kinesis_backends[self.account_id][self.region] + kinesis.send_log_event( + self.destination_arn, + self.filter_name, + log_group_name, + log_stream_name, + formatted_log_events, # type: ignore + ) return f"{self.upload_sequence_token:056d}" @@ -986,6 +997,16 @@ class LogsBackend(BaseBackend): "stream. Check if the given Firehose stream is in ACTIVE " "state." ) + elif service == "kinesis": + from moto.kinesis import kinesis_backends + + kinesis = kinesis_backends[self.account_id][self.region_name] + try: + kinesis.describe_stream(stream_arn=destination_arn, stream_name=None) + except Exception: + raise InvalidParameterException( + "Could not deliver test message to specified Kinesis stream. Verify the stream exists " + ) else: # TODO: support Kinesis stream destinations raise InvalidParameterException( diff --git a/tests/test_logs/test_integration.py b/tests/test_logs/test_integration.py index 7365bf127..c9fed1816 100644 --- a/tests/test_logs/test_integration.py +++ b/tests/test_logs/test_integration.py @@ -10,6 +10,7 @@ import boto3 from botocore.exceptions import ClientError from datetime import datetime from moto import mock_logs, mock_lambda, mock_iam, mock_firehose, mock_s3 +from moto import mock_kinesis from moto.core.utils import unix_time_millis import pytest @@ -347,6 +348,62 @@ def test_put_subscription_filter_with_firehose(): log_events[1]["timestamp"].should.equal(ts_1) +@mock_iam +@mock_logs +@mock_kinesis +def test_put_subscription_filter_with_kinesis(): + logs = boto3.client("logs", "ap-southeast-2") + logs.create_log_group(logGroupName="lg1") + logs.create_log_stream(logGroupName="lg1", logStreamName="ls1") + + # Create a DataStream + kinesis = boto3.client("kinesis", "ap-southeast-2") + kinesis.create_stream( + StreamName="test-stream", + ShardCount=1, + StreamModeDetails={"StreamMode": "ON_DEMAND"}, + ) + kinesis_datastream = kinesis.describe_stream(StreamName="test-stream")[ + "StreamDescription" + ] + kinesis_datastream_arn = kinesis_datastream["StreamARN"] + + # Subscribe to new log events + logs.put_subscription_filter( + logGroupName="lg1", + filterName="kinesis_erica_core_components_v2", + filterPattern='- "cwlogs.push.publisher"', + destinationArn=kinesis_datastream_arn, + distribution="ByLogStream", + ) + + # Create new log events + ts_0 = int(unix_time_millis(datetime.utcnow())) + ts_1 = int(unix_time_millis(datetime.utcnow())) + logs.put_log_events( + logGroupName="lg1", + logStreamName="ls1", + logEvents=[ + {"timestamp": ts_0, "message": "test"}, + {"timestamp": ts_1, "message": "test 2"}, + ], + ) + + # Verify that Kinesis Stream has this data + nr_of_records = 0 + # We don't know to which shard it was send, so check all of them + for shard in kinesis_datastream["Shards"]: + shard_id = shard["ShardId"] + + shard_iterator = kinesis.get_shard_iterator( + StreamName="test-stream", ShardId=shard_id, ShardIteratorType="TRIM_HORIZON" + )["ShardIterator"] + + resp = kinesis.get_records(ShardIterator=shard_iterator) + nr_of_records = nr_of_records + len(resp["Records"]) + assert nr_of_records == 1 + + @mock_lambda @mock_logs def test_delete_subscription_filter(): @@ -512,6 +569,19 @@ def test_put_subscription_filter_errors(): "Make sure you have given CloudWatch Logs permission to execute your function." ) + # when we pass an unknown kinesis ARN + with pytest.raises(ClientError) as e: + client.put_subscription_filter( + logGroupName="/test", + filterName="test", + filterPattern="", + destinationArn="arn:aws:kinesis:us-east-1:123456789012:stream/unknown-stream", + ) + + # then + err = e.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + def _get_role_name(region_name): with mock_iam():