From 4f92ee8a1cc087ea8945775476b84aa228226a00 Mon Sep 17 00:00:00 2001 From: Sam Watson Date: Tue, 22 Mar 2022 16:34:36 -0600 Subject: [PATCH] bugfix: apply cloudwatch subscription filters to newly created streams (#4962) --- moto/logs/models.py | 10 ++-- tests/test_logs/test_integration.py | 71 +++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 3 deletions(-) diff --git a/moto/logs/models.py b/moto/logs/models.py index 079697f32..0658b4d0f 100644 --- a/moto/logs/models.py +++ b/moto/logs/models.py @@ -297,9 +297,13 @@ class LogGroup(CloudFormationModel): def create_log_stream(self, log_stream_name): if log_stream_name in self.streams: raise ResourceAlreadyExistsException() - self.streams[log_stream_name] = LogStream( - self.region, self.name, log_stream_name - ) + stream = LogStream(self.region, self.name, log_stream_name) + filters = self.describe_subscription_filters() + + if filters: + stream.destination_arn = filters[0]["destinationArn"] + stream.filter_name = filters[0]["filterName"] + self.streams[log_stream_name] = stream def delete_log_stream(self, log_stream_name): if log_stream_name not in self.streams: diff --git a/tests/test_logs/test_integration.py b/tests/test_logs/test_integration.py index d64a04bd6..9026bcbe4 100644 --- a/tests/test_logs/test_integration.py +++ b/tests/test_logs/test_integration.py @@ -181,6 +181,77 @@ def test_put_subscription_filter_with_lambda(): log_events[1]["timestamp"].should.equal(ts_1) +@mock_lambda +@mock_logs +@pytest.mark.network +def test_subscription_filter_applies_to_new_streams(): + # given + region_name = "us-east-1" + client_lambda = boto3.client("lambda", region_name) + client_logs = boto3.client("logs", region_name) + log_group_name = "/test" + log_stream_name = "stream" + client_logs.create_log_group(logGroupName=log_group_name) + function_arn = client_lambda.create_function( + FunctionName="test", + Runtime="python3.8", + Role=_get_role_name(region_name), + Handler="lambda_function.lambda_handler", + Code={"ZipFile": _get_test_zip_file()}, + Description="test lambda function", + Timeout=3, + MemorySize=128, + Publish=True, + )["FunctionArn"] + + # when + client_logs.put_subscription_filter( + logGroupName=log_group_name, + filterName="test", + filterPattern="", + destinationArn=function_arn, + ) + client_logs.create_log_stream( # create log stream after subscription filter applied + logGroupName=log_group_name, logStreamName=log_stream_name + ) + ts_0 = int(unix_time_millis(datetime.utcnow())) + ts_1 = int(unix_time_millis(datetime.utcnow())) + 10 + client_logs.put_log_events( + logGroupName=log_group_name, + logStreamName=log_stream_name, + logEvents=[ + {"timestamp": ts_0, "message": "test"}, + {"timestamp": ts_1, "message": "test 2"}, + ], + ) + + # then + msg_showed_up, received_message = _wait_for_log_msg( + client_logs, "/aws/lambda/test", "awslogs" + ) + assert msg_showed_up, "CloudWatch log event was not found. All logs: {}".format( + received_message + ) + + data = json.loads(received_message)["awslogs"]["data"] + response = json.loads( + zlib.decompress(base64.b64decode(data), 16 + zlib.MAX_WBITS).decode("utf-8") + ) + response["messageType"].should.equal("DATA_MESSAGE") + response["owner"].should.equal("123456789012") + response["logGroup"].should.equal("/test") + response["logStream"].should.equal("stream") + response["subscriptionFilters"].should.equal(["test"]) + log_events = sorted(response["logEvents"], key=lambda log_event: log_event["id"]) + log_events.should.have.length_of(2) + log_events[0]["id"].should.be.a(int) + log_events[0]["message"].should.equal("test") + log_events[0]["timestamp"].should.equal(ts_0) + log_events[1]["id"].should.be.a(int) + log_events[1]["message"].should.equal("test 2") + log_events[1]["timestamp"].should.equal(ts_1) + + @mock_s3 @mock_firehose @mock_logs