From e73a69421952eb65da583fadf86af4efa6dd0c93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anton=20Gr=C3=BCbel?= <33207684+gruebel@users.noreply.github.com> Date: Tue, 12 May 2020 14:34:10 +0200 Subject: [PATCH] Add CloudWatch logs subscription filters (#2982) * Add logs.describe_subscription_filters * Add logs.put_subscription_filter * Add logs.delete_subscription_filter * Change to usage of ACCOUNT_ID --- moto/awslambda/models.py | 24 ++ moto/logs/exceptions.py | 12 +- moto/logs/models.py | 109 ++++++++- moto/logs/responses.py | 30 +++ tests/test_logs/test_logs.py | 414 ++++++++++++++++++++++++++++++++++- 5 files changed, 585 insertions(+), 4 deletions(-) diff --git a/moto/awslambda/models.py b/moto/awslambda/models.py index 589a790ae..7641ce067 100644 --- a/moto/awslambda/models.py +++ b/moto/awslambda/models.py @@ -5,6 +5,8 @@ import time from collections import defaultdict import copy import datetime +from gzip import GzipFile + import docker import docker.errors import hashlib @@ -983,6 +985,28 @@ class LambdaBackend(BaseBackend): func = self._lambdas.get_arn(function_arn) return func.invoke(json.dumps(event), {}, {}) + def send_log_event( + self, function_arn, filter_name, log_group_name, log_stream_name, log_events + ): + data = { + "messageType": "DATA_MESSAGE", + "owner": ACCOUNT_ID, + "logGroup": log_group_name, + "logStream": log_stream_name, + "subscriptionFilters": [filter_name], + "logEvents": log_events, + } + + output = io.BytesIO() + with GzipFile(fileobj=output, mode="w") as f: + f.write(json.dumps(data, separators=(",", ":")).encode("utf-8")) + payload_gz_encoded = base64.b64encode(output.getvalue()).decode("utf-8") + + event = {"awslogs": {"data": payload_gz_encoded}} + + func = self._lambdas.get_arn(function_arn) + return func.invoke(json.dumps(event), {}, {}) + def list_tags(self, resource): return self.get_function_by_arn(resource).tags diff --git a/moto/logs/exceptions.py b/moto/logs/exceptions.py index 9f6628b0f..022b3a411 100644 --- a/moto/logs/exceptions.py +++ b/moto/logs/exceptions.py @@ -7,10 +7,10 @@ class LogsClientError(JsonRESTError): class ResourceNotFoundException(LogsClientError): - def __init__(self): + def __init__(self, msg=None): self.code = 400 super(ResourceNotFoundException, self).__init__( - "ResourceNotFoundException", "The specified resource does not exist" + "ResourceNotFoundException", msg or "The specified log group does not exist" ) @@ -28,3 +28,11 @@ class ResourceAlreadyExistsException(LogsClientError): super(ResourceAlreadyExistsException, self).__init__( "ResourceAlreadyExistsException", "The specified log group already exists" ) + + +class LimitExceededException(LogsClientError): + def __init__(self): + self.code = 400 + super(LimitExceededException, self).__init__( + "LimitExceededException", "Resource limit exceeded." + ) diff --git a/moto/logs/models.py b/moto/logs/models.py index 755605734..dcc0e85e1 100644 --- a/moto/logs/models.py +++ b/moto/logs/models.py @@ -6,6 +6,7 @@ from .exceptions import ( ResourceNotFoundException, ResourceAlreadyExistsException, InvalidParameterException, + LimitExceededException, ) @@ -57,6 +58,8 @@ class LogStream: 0 # I'm guessing this is token needed for sequenceToken by put_events ) self.events = [] + self.destination_arn = None + self.filter_name = None self.__class__._log_ids += 1 @@ -97,11 +100,32 @@ class LogStream: self.lastIngestionTime = int(unix_time_millis()) # TODO: make this match AWS if possible self.storedBytes += sum([len(log_event["message"]) for log_event in log_events]) - self.events += [ + events = [ LogEvent(self.lastIngestionTime, log_event) for log_event in log_events ] + self.events += events self.uploadSequenceToken += 1 + if self.destination_arn and self.destination_arn.split(":")[2] == "lambda": + from moto.awslambda import lambda_backends # due to circular dependency + + lambda_log_events = [ + { + "id": event.eventId, + "timestamp": event.timestamp, + "message": event.message, + } + for event in events + ] + + lambda_backends[self.region].send_log_event( + self.destination_arn, + self.filter_name, + log_group_name, + log_stream_name, + lambda_log_events, + ) + return "{:056d}".format(self.uploadSequenceToken) def get_log_events( @@ -227,6 +251,7 @@ class LogGroup: self.retention_in_days = kwargs.get( "RetentionInDays" ) # AWS defaults to Never Expire for log group retention + self.subscription_filters = [] def create_log_stream(self, log_stream_name): if log_stream_name in self.streams: @@ -386,6 +411,48 @@ class LogGroup: k: v for (k, v) in self.tags.items() if k not in tags_to_remove } + def describe_subscription_filters(self): + return self.subscription_filters + + def put_subscription_filter( + self, filter_name, filter_pattern, destination_arn, role_arn + ): + creation_time = int(unix_time_millis()) + + # only one subscription filter can be associated with a log group + if self.subscription_filters: + if self.subscription_filters[0]["filterName"] == filter_name: + creation_time = self.subscription_filters[0]["creationTime"] + else: + raise LimitExceededException + + for stream in self.streams.values(): + stream.destination_arn = destination_arn + stream.filter_name = filter_name + + self.subscription_filters = [ + { + "filterName": filter_name, + "logGroupName": self.name, + "filterPattern": filter_pattern, + "destinationArn": destination_arn, + "roleArn": role_arn, + "distribution": "ByLogStream", + "creationTime": creation_time, + } + ] + + def delete_subscription_filter(self, filter_name): + if ( + not self.subscription_filters + or self.subscription_filters[0]["filterName"] != filter_name + ): + raise ResourceNotFoundException( + "The specified subscription filter does not exist." + ) + + self.subscription_filters = [] + class LogsBackend(BaseBackend): def __init__(self, region_name): @@ -557,6 +624,46 @@ class LogsBackend(BaseBackend): log_group = self.groups[log_group_name] log_group.untag(tags) + def describe_subscription_filters(self, log_group_name): + log_group = self.groups.get(log_group_name) + + if not log_group: + raise ResourceNotFoundException() + + return log_group.describe_subscription_filters() + + def put_subscription_filter( + self, log_group_name, filter_name, filter_pattern, destination_arn, role_arn + ): + # TODO: support other destinations like Kinesis stream + from moto.awslambda import lambda_backends # due to circular dependency + + log_group = self.groups.get(log_group_name) + + if not log_group: + raise ResourceNotFoundException() + + lambda_func = lambda_backends[self.region_name].get_function(destination_arn) + + # no specific permission check implemented + if not lambda_func: + raise InvalidParameterException( + "Could not execute the lambda function. " + "Make sure you have given CloudWatch Logs permission to execute your function." + ) + + log_group.put_subscription_filter( + filter_name, filter_pattern, destination_arn, role_arn + ) + + def delete_subscription_filter(self, log_group_name, filter_name): + log_group = self.groups.get(log_group_name) + + if not log_group: + raise ResourceNotFoundException() + + log_group.delete_subscription_filter(filter_name) + logs_backends = {} for region in Session().get_available_regions("logs"): diff --git a/moto/logs/responses.py b/moto/logs/responses.py index 4631da2f9..9e6886a42 100644 --- a/moto/logs/responses.py +++ b/moto/logs/responses.py @@ -178,3 +178,33 @@ class LogsResponse(BaseResponse): tags = self._get_param("tags") self.logs_backend.untag_log_group(log_group_name, tags) return "" + + def describe_subscription_filters(self): + log_group_name = self._get_param("logGroupName") + + subscription_filters = self.logs_backend.describe_subscription_filters( + log_group_name + ) + + return json.dumps({"subscriptionFilters": subscription_filters}) + + def put_subscription_filter(self): + log_group_name = self._get_param("logGroupName") + filter_name = self._get_param("filterName") + filter_pattern = self._get_param("filterPattern") + destination_arn = self._get_param("destinationArn") + role_arn = self._get_param("roleArn") + + self.logs_backend.put_subscription_filter( + log_group_name, filter_name, filter_pattern, destination_arn, role_arn + ) + + return "" + + def delete_subscription_filter(self): + log_group_name = self._get_param("logGroupName") + filter_name = self._get_param("filterName") + + self.logs_backend.delete_subscription_filter(log_group_name, filter_name) + + return "" diff --git a/tests/test_logs/test_logs.py b/tests/test_logs/test_logs.py index 2429d7e93..675948150 100644 --- a/tests/test_logs/test_logs.py +++ b/tests/test_logs/test_logs.py @@ -1,10 +1,17 @@ +import base64 +import json +import time +import zlib +from io import BytesIO +from zipfile import ZipFile, ZIP_DEFLATED + import boto3 import os import sure # noqa import six from botocore.exceptions import ClientError -from moto import mock_logs, settings +from moto import mock_logs, settings, mock_lambda, mock_iam from nose.tools import assert_raises from nose import SkipTest @@ -425,3 +432,408 @@ def test_untag_log_group(): assert response["tags"] == remaining_tags response = conn.delete_log_group(logGroupName=log_group_name) + + +@mock_logs +def test_describe_subscription_filters(): + # given + client = boto3.client("logs", "us-east-1") + log_group_name = "/test" + client.create_log_group(logGroupName=log_group_name) + + # when + response = client.describe_subscription_filters(logGroupName=log_group_name) + + # then + response["subscriptionFilters"].should.have.length_of(0) + + +@mock_logs +def test_describe_subscription_filters_errors(): + # given + client = boto3.client("logs", "us-east-1") + + # when + with assert_raises(ClientError) as e: + client.describe_subscription_filters(logGroupName="not-existing-log-group",) + + # then + ex = e.exception + ex.operation_name.should.equal("DescribeSubscriptionFilters") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ResourceNotFoundException") + ex.response["Error"]["Message"].should.equal( + "The specified log group does not exist" + ) + + +@mock_lambda +@mock_logs +def test_put_subscription_filter_update(): + # given + region_name = "us-east-1" + client_lambda = boto3.client("lambda", region_name) + client_logs = boto3.client("logs", region_name) + log_group_name = "/test" + log_stream_name = "stream" + client_logs.create_log_group(logGroupName=log_group_name) + client_logs.create_log_stream( + logGroupName=log_group_name, logStreamName=log_stream_name + ) + function_arn = client_lambda.create_function( + FunctionName="test", + Runtime="python3.8", + Role=_get_role_name(region_name), + Handler="lambda_function.lambda_handler", + Code={"ZipFile": _get_test_zip_file()}, + Description="test lambda function", + Timeout=3, + MemorySize=128, + Publish=True, + )["FunctionArn"] + + # when + client_logs.put_subscription_filter( + logGroupName=log_group_name, + filterName="test", + filterPattern="", + destinationArn=function_arn, + ) + + # then + response = client_logs.describe_subscription_filters(logGroupName=log_group_name) + response["subscriptionFilters"].should.have.length_of(1) + filter = response["subscriptionFilters"][0] + creation_time = filter["creationTime"] + creation_time.should.be.a(int) + filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test" + filter["distribution"] = "ByLogStream" + filter["logGroupName"] = "/test" + filter["filterName"] = "test" + filter["filterPattern"] = "" + + # when + # to update an existing subscription filter the 'filerName' must be identical + client_logs.put_subscription_filter( + logGroupName=log_group_name, + filterName="test", + filterPattern="[]", + destinationArn=function_arn, + ) + + # then + response = client_logs.describe_subscription_filters(logGroupName=log_group_name) + response["subscriptionFilters"].should.have.length_of(1) + filter = response["subscriptionFilters"][0] + filter["creationTime"].should.equal(creation_time) + filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test" + filter["distribution"] = "ByLogStream" + filter["logGroupName"] = "/test" + filter["filterName"] = "test" + filter["filterPattern"] = "[]" + + # when + # only one subscription filter can be associated with a log group + with assert_raises(ClientError) as e: + client_logs.put_subscription_filter( + logGroupName=log_group_name, + filterName="test-2", + filterPattern="", + destinationArn=function_arn, + ) + + # then + ex = e.exception + ex.operation_name.should.equal("PutSubscriptionFilter") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("LimitExceededException") + ex.response["Error"]["Message"].should.equal("Resource limit exceeded.") + + +@mock_lambda +@mock_logs +def test_put_subscription_filter_with_lambda(): + # given + region_name = "us-east-1" + client_lambda = boto3.client("lambda", region_name) + client_logs = boto3.client("logs", region_name) + log_group_name = "/test" + log_stream_name = "stream" + client_logs.create_log_group(logGroupName=log_group_name) + client_logs.create_log_stream( + logGroupName=log_group_name, logStreamName=log_stream_name + ) + function_arn = client_lambda.create_function( + FunctionName="test", + Runtime="python3.8", + Role=_get_role_name(region_name), + Handler="lambda_function.lambda_handler", + Code={"ZipFile": _get_test_zip_file()}, + Description="test lambda function", + Timeout=3, + MemorySize=128, + Publish=True, + )["FunctionArn"] + + # when + client_logs.put_subscription_filter( + logGroupName=log_group_name, + filterName="test", + filterPattern="", + destinationArn=function_arn, + ) + + # then + response = client_logs.describe_subscription_filters(logGroupName=log_group_name) + response["subscriptionFilters"].should.have.length_of(1) + filter = response["subscriptionFilters"][0] + filter["creationTime"].should.be.a(int) + filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test" + filter["distribution"] = "ByLogStream" + filter["logGroupName"] = "/test" + filter["filterName"] = "test" + filter["filterPattern"] = "" + + # when + client_logs.put_log_events( + logGroupName=log_group_name, + logStreamName=log_stream_name, + logEvents=[ + {"timestamp": 0, "message": "test"}, + {"timestamp": 0, "message": "test 2"}, + ], + ) + + # then + msg_showed_up, received_message = _wait_for_log_msg( + client_logs, "/aws/lambda/test", "awslogs" + ) + assert msg_showed_up, "CloudWatch log event was not found. All logs: {}".format( + received_message + ) + + data = json.loads(received_message)["awslogs"]["data"] + response = json.loads( + zlib.decompress(base64.b64decode(data), 16 + zlib.MAX_WBITS).decode("utf-8") + ) + response["messageType"].should.equal("DATA_MESSAGE") + response["owner"].should.equal("123456789012") + response["logGroup"].should.equal("/test") + response["logStream"].should.equal("stream") + response["subscriptionFilters"].should.equal(["test"]) + log_events = sorted(response["logEvents"], key=lambda log_event: log_event["id"]) + log_events.should.have.length_of(2) + log_events[0]["id"].should.be.a(int) + log_events[0]["message"].should.equal("test") + log_events[0]["timestamp"].should.equal(0) + log_events[1]["id"].should.be.a(int) + log_events[1]["message"].should.equal("test 2") + log_events[1]["timestamp"].should.equal(0) + + +@mock_logs +def test_put_subscription_filter_errors(): + # given + client = boto3.client("logs", "us-east-1") + log_group_name = "/test" + client.create_log_group(logGroupName=log_group_name) + + # when + with assert_raises(ClientError) as e: + client.put_subscription_filter( + logGroupName="not-existing-log-group", + filterName="test", + filterPattern="", + destinationArn="arn:aws:lambda:us-east-1:123456789012:function:test", + ) + + # then + ex = e.exception + ex.operation_name.should.equal("PutSubscriptionFilter") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ResourceNotFoundException") + ex.response["Error"]["Message"].should.equal( + "The specified log group does not exist" + ) + + # when + with assert_raises(ClientError) as e: + client.put_subscription_filter( + logGroupName="/test", + filterName="test", + filterPattern="", + destinationArn="arn:aws:lambda:us-east-1:123456789012:function:not-existing", + ) + + # then + ex = e.exception + ex.operation_name.should.equal("PutSubscriptionFilter") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("InvalidParameterException") + ex.response["Error"]["Message"].should.equal( + "Could not execute the lambda function. " + "Make sure you have given CloudWatch Logs permission to execute your function." + ) + + # when + with assert_raises(ClientError) as e: + client.put_subscription_filter( + logGroupName="/test", + filterName="test", + filterPattern="", + destinationArn="arn:aws:lambda:us-east-1:123456789012:function:not-existing", + ) + + # then + ex = e.exception + ex.operation_name.should.equal("PutSubscriptionFilter") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("InvalidParameterException") + ex.response["Error"]["Message"].should.equal( + "Could not execute the lambda function. " + "Make sure you have given CloudWatch Logs permission to execute your function." + ) + + +@mock_lambda +@mock_logs +def test_delete_subscription_filter_errors(): + # given + region_name = "us-east-1" + client_lambda = boto3.client("lambda", region_name) + client_logs = boto3.client("logs", region_name) + log_group_name = "/test" + client_logs.create_log_group(logGroupName=log_group_name) + function_arn = client_lambda.create_function( + FunctionName="test", + Runtime="python3.8", + Role=_get_role_name(region_name), + Handler="lambda_function.lambda_handler", + Code={"ZipFile": _get_test_zip_file()}, + Description="test lambda function", + Timeout=3, + MemorySize=128, + Publish=True, + )["FunctionArn"] + client_logs.put_subscription_filter( + logGroupName=log_group_name, + filterName="test", + filterPattern="", + destinationArn=function_arn, + ) + + # when + client_logs.delete_subscription_filter( + logGroupName="/test", filterName="test", + ) + + # then + response = client_logs.describe_subscription_filters(logGroupName=log_group_name) + response["subscriptionFilters"].should.have.length_of(0) + + +@mock_lambda +@mock_logs +def test_delete_subscription_filter_errors(): + # given + region_name = "us-east-1" + client_lambda = boto3.client("lambda", region_name) + client_logs = boto3.client("logs", region_name) + log_group_name = "/test" + client_logs.create_log_group(logGroupName=log_group_name) + function_arn = client_lambda.create_function( + FunctionName="test", + Runtime="python3.8", + Role=_get_role_name(region_name), + Handler="lambda_function.lambda_handler", + Code={"ZipFile": _get_test_zip_file()}, + Description="test lambda function", + Timeout=3, + MemorySize=128, + Publish=True, + )["FunctionArn"] + client_logs.put_subscription_filter( + logGroupName=log_group_name, + filterName="test", + filterPattern="", + destinationArn=function_arn, + ) + + # when + with assert_raises(ClientError) as e: + client_logs.delete_subscription_filter( + logGroupName="not-existing-log-group", filterName="test", + ) + + # then + ex = e.exception + ex.operation_name.should.equal("DeleteSubscriptionFilter") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ResourceNotFoundException") + ex.response["Error"]["Message"].should.equal( + "The specified log group does not exist" + ) + + # when + with assert_raises(ClientError) as e: + client_logs.delete_subscription_filter( + logGroupName="/test", filterName="wrong-filter-name", + ) + + # then + ex = e.exception + ex.operation_name.should.equal("DeleteSubscriptionFilter") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ResourceNotFoundException") + ex.response["Error"]["Message"].should.equal( + "The specified subscription filter does not exist." + ) + + +def _get_role_name(region_name): + with mock_iam(): + iam = boto3.client("iam", region_name=region_name) + try: + return iam.get_role(RoleName="test-role")["Role"]["Arn"] + except ClientError: + return iam.create_role( + RoleName="test-role", AssumeRolePolicyDocument="test policy", Path="/", + )["Role"]["Arn"] + + +def _get_test_zip_file(): + func_str = """ +def lambda_handler(event, context): + return event +""" + + zip_output = BytesIO() + zip_file = ZipFile(zip_output, "w", ZIP_DEFLATED) + zip_file.writestr("lambda_function.py", func_str) + zip_file.close() + zip_output.seek(0) + return zip_output.read() + + +def _wait_for_log_msg(client, log_group_name, expected_msg_part): + received_messages = [] + start = time.time() + while (time.time() - start) < 10: + result = client.describe_log_streams(logGroupName=log_group_name) + log_streams = result.get("logStreams") + if not log_streams: + time.sleep(1) + continue + + for log_stream in log_streams: + result = client.get_log_events( + logGroupName=log_group_name, logStreamName=log_stream["logStreamName"], + ) + received_messages.extend( + [event["message"] for event in result.get("events")] + ) + for message in received_messages: + if expected_msg_part in message: + return True, message + time.sleep(1) + return False, received_messages