Logs: put_subscription_filter() now supports KinesisStream destinations (#6176)

This commit is contained in:
Bert Blommers 2023-04-05 09:36:37 +00:00 committed by GitHub
parent 9f91ac0eb9
commit be17a7d8e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 128 additions and 0 deletions

View File

@ -1,5 +1,9 @@
from base64 import b64encode
from collections import OrderedDict
from gzip import GzipFile
import datetime
import io
import json
import re
import itertools
@ -8,6 +12,7 @@ from typing import Any, Dict, List, Optional, Tuple, Iterable
from moto.core import BaseBackend, BackendDict, BaseModel, CloudFormationModel
from moto.core.utils import unix_time
from moto.moto_api._internal import mock_random as random
from moto.utilities.paginator import paginate
from moto.utilities.utils import md5_hash
from .exceptions import (
@ -958,5 +963,37 @@ class KinesisBackend(BaseBackend):
stream = self._find_stream_by_arn(stream_arn)
stream.stream_mode = stream_mode
"""Send log events to a Stream after encoding and gzipping it."""
def send_log_event(
self,
delivery_stream_arn: str,
filter_name: str,
log_group_name: str,
log_stream_name: str,
log_events: List[Dict[str, Any]],
) -> None:
data = {
"logEvents": log_events,
"logGroup": log_group_name,
"logStream": log_stream_name,
"messageType": "DATA_MESSAGE",
"owner": self.account_id,
"subscriptionFilters": [filter_name],
}
output = io.BytesIO()
with GzipFile(fileobj=output, mode="w") as fhandle:
fhandle.write(json.dumps(data, separators=(",", ":")).encode("utf-8"))
gzipped_payload = b64encode(output.getvalue()).decode("UTF-8")
stream = self.describe_stream(stream_arn=delivery_stream_arn, stream_name=None)
random_partition_key = random.get_random_string(length=32, lower_case=True)
stream.put_record(
partition_key=random_partition_key,
data=gzipped_payload,
explicit_hash_key="",
)
kinesis_backends = BackendDict(KinesisBackend, "kinesis")

View File

@ -156,6 +156,17 @@ class LogStream(BaseModel):
log_stream_name,
formatted_log_events, # type: ignore
)
elif service == "kinesis":
from moto.kinesis import kinesis_backends
kinesis = kinesis_backends[self.account_id][self.region]
kinesis.send_log_event(
self.destination_arn,
self.filter_name,
log_group_name,
log_stream_name,
formatted_log_events, # type: ignore
)
return f"{self.upload_sequence_token:056d}"
@ -986,6 +997,16 @@ class LogsBackend(BaseBackend):
"stream. Check if the given Firehose stream is in ACTIVE "
"state."
)
elif service == "kinesis":
from moto.kinesis import kinesis_backends
kinesis = kinesis_backends[self.account_id][self.region_name]
try:
kinesis.describe_stream(stream_arn=destination_arn, stream_name=None)
except Exception:
raise InvalidParameterException(
"Could not deliver test message to specified Kinesis stream. Verify the stream exists "
)
else:
# TODO: support Kinesis stream destinations
raise InvalidParameterException(

View File

@ -10,6 +10,7 @@ import boto3
from botocore.exceptions import ClientError
from datetime import datetime
from moto import mock_logs, mock_lambda, mock_iam, mock_firehose, mock_s3
from moto import mock_kinesis
from moto.core.utils import unix_time_millis
import pytest
@ -347,6 +348,62 @@ def test_put_subscription_filter_with_firehose():
log_events[1]["timestamp"].should.equal(ts_1)
@mock_iam
@mock_logs
@mock_kinesis
def test_put_subscription_filter_with_kinesis():
logs = boto3.client("logs", "ap-southeast-2")
logs.create_log_group(logGroupName="lg1")
logs.create_log_stream(logGroupName="lg1", logStreamName="ls1")
# Create a DataStream
kinesis = boto3.client("kinesis", "ap-southeast-2")
kinesis.create_stream(
StreamName="test-stream",
ShardCount=1,
StreamModeDetails={"StreamMode": "ON_DEMAND"},
)
kinesis_datastream = kinesis.describe_stream(StreamName="test-stream")[
"StreamDescription"
]
kinesis_datastream_arn = kinesis_datastream["StreamARN"]
# Subscribe to new log events
logs.put_subscription_filter(
logGroupName="lg1",
filterName="kinesis_erica_core_components_v2",
filterPattern='- "cwlogs.push.publisher"',
destinationArn=kinesis_datastream_arn,
distribution="ByLogStream",
)
# Create new log events
ts_0 = int(unix_time_millis(datetime.utcnow()))
ts_1 = int(unix_time_millis(datetime.utcnow()))
logs.put_log_events(
logGroupName="lg1",
logStreamName="ls1",
logEvents=[
{"timestamp": ts_0, "message": "test"},
{"timestamp": ts_1, "message": "test 2"},
],
)
# Verify that Kinesis Stream has this data
nr_of_records = 0
# We don't know to which shard it was send, so check all of them
for shard in kinesis_datastream["Shards"]:
shard_id = shard["ShardId"]
shard_iterator = kinesis.get_shard_iterator(
StreamName="test-stream", ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
)["ShardIterator"]
resp = kinesis.get_records(ShardIterator=shard_iterator)
nr_of_records = nr_of_records + len(resp["Records"])
assert nr_of_records == 1
@mock_lambda
@mock_logs
def test_delete_subscription_filter():
@ -512,6 +569,19 @@ def test_put_subscription_filter_errors():
"Make sure you have given CloudWatch Logs permission to execute your function."
)
# when we pass an unknown kinesis ARN
with pytest.raises(ClientError) as e:
client.put_subscription_filter(
logGroupName="/test",
filterName="test",
filterPattern="",
destinationArn="arn:aws:kinesis:us-east-1:123456789012:stream/unknown-stream",
)
# then
err = e.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
def _get_role_name(region_name):
with mock_iam():