From ac6d88518d2a5d40bbb13170bfebb8ca0ec38d46 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Wed, 6 Apr 2022 21:10:32 +0000 Subject: [PATCH] S3 - initial Notifications implementation (#5007) --- docs/docs/services/s3.rst | 12 + moto/s3/models.py | 48 ++++ moto/s3/notifications.py | 108 ++++++++ tests/test_awslambda/utilities.py | 14 +- tests/test_s3/test_s3_lambda_integration.py | 291 ++++++++++++++++++++ 5 files changed, 471 insertions(+), 2 deletions(-) create mode 100644 moto/s3/notifications.py create mode 100644 tests/test_s3/test_s3_lambda_integration.py diff --git a/docs/docs/services/s3.rst b/docs/docs/services/s3.rst index 5d84d2b7e..3d0e39d98 100644 --- a/docs/docs/services/s3.rst +++ b/docs/docs/services/s3.rst @@ -105,6 +105,18 @@ s3 - [ ] put_bucket_metrics_configuration - [ ] put_bucket_notification - [X] put_bucket_notification_configuration + + The configuration can be persisted, but at the moment we only send notifications to the following targets: + + - AWSLambda + - SQS + + For the following events: + + - 's3:ObjectCreated:Copy' + - 's3:ObjectCreated:Put' + + - [ ] put_bucket_ownership_controls - [X] put_bucket_policy - [X] put_bucket_replication diff --git a/moto/s3/models.py b/moto/s3/models.py index c95d133fc..90f3c49f8 100644 --- a/moto/s3/models.py +++ b/moto/s3/models.py @@ -56,6 +56,7 @@ from moto.s3.exceptions import ( InvalidTagError, ) from .cloud_formation import cfn_to_api_encryption, is_replacement_update +from . import notifications from .utils import clean_key_name, _VersionedKeyStore, undo_clean_key_name from ..settings import get_s3_default_key_buffer_size, S3_UPLOAD_PART_MIN_SIZE @@ -694,6 +695,33 @@ class Notification(BaseModel): self.events = events self.filters = filters if filters else {} + def _event_matches(self, event_name): + if event_name in self.events: + return True + # s3:ObjectCreated:Put --> s3:ObjectCreated:* + wildcard = ":".join(event_name.rsplit(":")[0:2]) + ":*" + if wildcard in self.events: + return True + return False + + def _key_matches(self, key_name): + if "S3Key" not in self.filters: + return True + _filters = {f["Name"]: f["Value"] for f in self.filters["S3Key"]["FilterRule"]} + prefix_matches = "prefix" not in _filters or key_name.startswith( + _filters["prefix"] + ) + suffix_matches = "suffix" not in _filters or key_name.endswith( + _filters["suffix"] + ) + return prefix_matches and suffix_matches + + def matches(self, event_name, key_name): + if self._event_matches(event_name): + if self._key_matches(key_name): + return True + return False + def to_config_dict(self): data = {} @@ -1101,6 +1129,9 @@ class FakeBucket(CloudFormationModel): if region != self.region_name: raise InvalidNotificationDestination() + # Send test events so the user can verify these notifications were set correctly + notifications.send_test_event(bucket=self) + def set_accelerate_configuration(self, accelerate_config): if self.accelerate_configuration is None and accelerate_config == "Suspended": # Cannot "suspend" a not active acceleration. Leaves it undefined @@ -1635,6 +1666,8 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider): ] + [new_key] bucket.keys.setlist(key_name, keys) + notifications.send_event(notifications.S3_OBJECT_CREATE_PUT, bucket, new_key) + return new_key def put_object_acl(self, bucket_name, key_name, acl): @@ -1769,6 +1802,17 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider): bucket.public_access_block = None def put_bucket_notification_configuration(self, bucket_name, notification_config): + """ + The configuration can be persisted, but at the moment we only send notifications to the following targets: + + - AWSLambda + - SQS + + For the following events: + + - 's3:ObjectCreated:Copy' + - 's3:ObjectCreated:Put' + """ bucket = self.get_bucket(bucket_name) bucket.set_notification_configuration(notification_config) @@ -2047,6 +2091,10 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider): # Object copied from Glacier object should not have expiry new_key.set_expiry(None) + # Send notifications that an object was copied + bucket = self.get_bucket(dest_bucket_name) + notifications.send_event(notifications.S3_OBJECT_CREATE_COPY, bucket, new_key) + def put_bucket_acl(self, bucket_name, acl): bucket = self.get_bucket(bucket_name) bucket.set_acl(acl) diff --git a/moto/s3/notifications.py b/moto/s3/notifications.py new file mode 100644 index 000000000..0de29b852 --- /dev/null +++ b/moto/s3/notifications.py @@ -0,0 +1,108 @@ +import json +from datetime import datetime + +_EVENT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" + +S3_OBJECT_CREATE_COPY = "s3:ObjectCreated:Copy" +S3_OBJECT_CREATE_PUT = "s3:ObjectCreated:Put" + + +def _get_s3_event(event_name, bucket, key, notification_id): + etag = key.etag.replace('"', "") + # s3:ObjectCreated:Put --> ObjectCreated:Put + event_name = event_name[3:] + event_time = datetime.now().strftime(_EVENT_TIME_FORMAT) + return { + "Records": [ + { + "eventVersion": "2.1", + "eventSource": "aws:s3", + "awsRegion": bucket.region_name, + "eventTime": event_time, + "eventName": event_name, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": notification_id, + "bucket": { + "name": bucket.name, + "arn": f"arn:aws:s3:::{bucket.name}", + }, + "object": {"key": key.name, "size": key.size, "eTag": etag}, + }, + } + ] + } + + +def _get_region_from_arn(arn): + return arn.split(":")[3] + + +def send_event(event_name, bucket, key): + if bucket.notification_configuration is None: + return + + for notification in bucket.notification_configuration.cloud_function: + if notification.matches(event_name, key.name): + event_body = _get_s3_event(event_name, bucket, key, notification.id) + region_name = _get_region_from_arn(notification.arn) + + _invoke_awslambda(event_body, notification.arn, region_name) + + for notification in bucket.notification_configuration.queue: + if notification.matches(event_name, key.name): + event_body = _get_s3_event(event_name, bucket, key, notification.id) + region_name = _get_region_from_arn(notification.arn) + queue_name = notification.arn.split(":")[-1] + + _send_sqs_message(event_body, queue_name, region_name) + + +def _send_sqs_message(event_body, queue_name, region_name): + try: + from moto.sqs.models import sqs_backends + + sqs_backend = sqs_backends[region_name] + sqs_backend.send_message( + queue_name=queue_name, message_body=json.dumps(event_body) + ) + except: # noqa + # This is an async action in AWS. + # Even if this part fails, the calling function should pass, so catch all errors + # Possible exceptions that could be thrown: + # - Queue does not exist + pass + + +def _invoke_awslambda(event_body, fn_arn, region_name): + try: + from moto.awslambda.models import lambda_backends + + lambda_backend = lambda_backends[region_name] + func = lambda_backend.get_function(fn_arn) + func.invoke(json.dumps(event_body), dict(), dict()) + except: # noqa + # This is an async action in AWS. + # Even if this part fails, the calling function should pass, so catch all errors + # Possible exceptions that could be thrown: + # - Function does not exist + pass + + +def _get_test_event(bucket_name): + event_time = datetime.now().strftime(_EVENT_TIME_FORMAT) + return { + "Service": "Amazon S3", + "Event": "s3:TestEvent", + "Time": event_time, + "Bucket": bucket_name, + } + + +def send_test_event(bucket): + arns = [n.arn for n in bucket.notification_configuration.queue] + for arn in set(arns): + region_name = _get_region_from_arn(arn) + queue_name = arn.split(":")[-1] + message_body = _get_test_event(bucket.name) + _send_sqs_message(message_body, queue_name, region_name) diff --git a/tests/test_awslambda/utilities.py b/tests/test_awslambda/utilities.py index 170caf945..124af4805 100644 --- a/tests/test_awslambda/utilities.py +++ b/tests/test_awslambda/utilities.py @@ -134,6 +134,16 @@ def util_function(): return zip_output.read() +def get_test_zip_file_print_event(): + pfunc = """ +def lambda_handler(event, context): + print(event) + print("FINISHED_PRINTING_EVENT") + return event +""" + return _process_lambda(pfunc) + + def create_invalid_lambda(role): conn = boto3.client("lambda", _lambda_region) zip_content = get_test_zip_file1() @@ -166,11 +176,11 @@ def get_role_name(): )["Role"]["Arn"] -def wait_for_log_msg(expected_msg, log_group): +def wait_for_log_msg(expected_msg, log_group, wait_time=30): logs_conn = boto3.client("logs", region_name="us-east-1") received_messages = [] start = time.time() - while (time.time() - start) < 30: + while (time.time() - start) < wait_time: try: result = logs_conn.describe_log_streams(logGroupName=log_group) log_streams = result.get("logStreams") diff --git a/tests/test_s3/test_s3_lambda_integration.py b/tests/test_s3/test_s3_lambda_integration.py new file mode 100644 index 000000000..5514aa288 --- /dev/null +++ b/tests/test_s3/test_s3_lambda_integration.py @@ -0,0 +1,291 @@ +import boto3 +import json +import pytest +from moto import mock_lambda, mock_logs, mock_s3, mock_sqs +from moto.core import ACCOUNT_ID +from tests.test_awslambda.utilities import ( + get_test_zip_file_print_event, + get_role_name, + wait_for_log_msg, +) +from uuid import uuid4 + + +REGION_NAME = "us-east-1" + + +@mock_lambda +@mock_logs +@mock_s3 +@pytest.mark.parametrize( + "match_events,actual_event", + [ + (["s3:ObjectCreated:Put"], "ObjectCreated:Put"), + (["s3:ObjectCreated:*"], "ObjectCreated:Put"), + (["s3:ObjectCreated:Post"], None), + (["s3:ObjectCreated:Post", "s3:ObjectCreated:*"], "ObjectCreated:Put"), + ], +) +def test_objectcreated_put__invokes_lambda(match_events, actual_event): + s3_res = boto3.resource("s3", region_name=REGION_NAME) + s3_client = boto3.client("s3", region_name=REGION_NAME) + lambda_client = boto3.client("lambda", REGION_NAME) + + # Create S3 bucket + bucket_name = str(uuid4()) + s3_res.create_bucket(Bucket=bucket_name) + + # Create AWSLambda function + function_name = str(uuid4())[0:6] + fn_arn = lambda_client.create_function( + FunctionName=function_name, + Runtime="python3.7", + Role=get_role_name(), + Handler="lambda_function.lambda_handler", + Code={"ZipFile": get_test_zip_file_print_event()}, + )["FunctionArn"] + + # Put Notification + s3_client.put_bucket_notification_configuration( + Bucket=bucket_name, + NotificationConfiguration={ + "LambdaFunctionConfigurations": [ + { + "Id": "unrelated", + "LambdaFunctionArn": f"arn:aws:lambda:us-east-1:{ACCOUNT_ID}:function:n/a", + "Events": ["s3:ReducedRedundancyLostObject"], + }, + { + "Id": "s3eventtriggerslambda", + "LambdaFunctionArn": fn_arn, + "Events": match_events, + }, + ] + }, + ) + + # Put Object + s3_client.put_object(Bucket=bucket_name, Key="keyname", Body="bodyofnewobject") + + # Find the output of AWSLambda + expected_msg = "FINISHED_PRINTING_EVENT" + log_group = f"/aws/lambda/{function_name}" + msg_showed_up, all_logs = wait_for_log_msg(expected_msg, log_group, wait_time=10) + + if actual_event is None: + # The event should not be fired on POST, as we've only PUT an event for now + assert not msg_showed_up + return + + # If we do have an actual event, verify the Lambda was invoked with the correct event + assert msg_showed_up, ( + expected_msg + + " was not found after sending an SQS message. All logs: " + + str(all_logs) + ) + + records = [l for l in all_logs if l.startswith("{'Records'")][0] + records = json.loads(records.replace("'", '"'))["Records"] + + records.should.have.length_of(1) + records[0].should.have.key("awsRegion").equals(REGION_NAME) + records[0].should.have.key("eventName").equals(actual_event) + records[0].should.have.key("eventSource").equals("aws:s3") + records[0].should.have.key("eventTime") + records[0].should.have.key("s3") + records[0]["s3"].should.have.key("bucket") + records[0]["s3"]["bucket"].should.have.key("arn").equals( + f"arn:aws:s3:::{bucket_name}" + ) + records[0]["s3"]["bucket"].should.have.key("name").equals(bucket_name) + records[0]["s3"].should.have.key("configurationId").equals("s3eventtriggerslambda") + records[0]["s3"].should.have.key("object") + records[0]["s3"]["object"].should.have.key("eTag").equals( + "61ea96c3c8d2c76fc5a42bfccb6affd9" + ) + records[0]["s3"]["object"].should.have.key("key").equals("keyname") + records[0]["s3"]["object"].should.have.key("size").equals(15) + + +@mock_logs +@mock_s3 +def test_objectcreated_put__unknown_lambda_is_handled_gracefully(): + s3_res = boto3.resource("s3", region_name=REGION_NAME) + s3_client = boto3.client("s3", region_name=REGION_NAME) + + # Create S3 bucket + bucket_name = str(uuid4()) + s3_res.create_bucket(Bucket=bucket_name) + + # Put Notification + s3_client.put_bucket_notification_configuration( + Bucket=bucket_name, + NotificationConfiguration={ + "LambdaFunctionConfigurations": [ + { + "Id": "unrelated", + "LambdaFunctionArn": f"arn:aws:lambda:us-east-1:{ACCOUNT_ID}:function:n/a", + "Events": ["s3:ObjectCreated:Put"], + } + ] + }, + ) + + # Put Object + s3_client.put_object(Bucket=bucket_name, Key="keyname", Body="bodyofnewobject") + + # The object was persisted successfully + resp = s3_client.get_object(Bucket=bucket_name, Key="keyname") + resp.should.have.key("ContentLength").equal(15) + resp["Body"].read().should.equal(b"bodyofnewobject") + + +@mock_s3 +@mock_sqs +def test_object_copy__sends_to_queue(): + s3_res = boto3.resource("s3", region_name=REGION_NAME) + s3_client = boto3.client("s3", region_name=REGION_NAME) + sqs_client = boto3.client("sqs", region_name=REGION_NAME) + + # Create S3 bucket + bucket_name = str(uuid4()) + s3_res.create_bucket(Bucket=bucket_name) + + # Create SQS queue + queue_url = sqs_client.create_queue(QueueName=str(uuid4())[0:6])["QueueUrl"] + queue_arn = sqs_client.get_queue_attributes( + QueueUrl=queue_url, AttributeNames=["QueueArn"] + )["Attributes"]["QueueArn"] + + # Put Notification + s3_client.put_bucket_notification_configuration( + Bucket=bucket_name, + NotificationConfiguration={ + "QueueConfigurations": [ + { + "Id": "queue_config", + "QueueArn": queue_arn, + "Events": ["s3:ObjectCreated:Copy"], + } + ] + }, + ) + + # We should have received a test event now + messages = sqs_client.receive_message(QueueUrl=queue_url)["Messages"] + messages.should.have.length_of(1) + message = json.loads(messages[0]["Body"]) + message.should.have.key("Service").equals("Amazon S3") + message.should.have.key("Event").equals("s3:TestEvent") + message.should.have.key("Time") + message.should.have.key("Bucket").equals(bucket_name) + + # Copy an Object + s3_client.put_object(Bucket=bucket_name, Key="keyname", Body="bodyofnewobject") + s3_client.copy_object( + Bucket=bucket_name, CopySource=f"{bucket_name}/keyname", Key="key2" + ) + + # Read SQS messages - we should have the Copy-event here + resp = sqs_client.receive_message(QueueUrl=queue_url) + resp.should.have.key("Messages").length_of(1) + records = json.loads(resp["Messages"][0]["Body"])["Records"] + + records.should.have.length_of(1) + records[0].should.have.key("awsRegion").equals(REGION_NAME) + records[0].should.have.key("eventName").equals("ObjectCreated:Copy") + records[0].should.have.key("eventSource").equals("aws:s3") + records[0].should.have.key("eventTime") + records[0].should.have.key("s3") + records[0]["s3"].should.have.key("bucket") + records[0]["s3"]["bucket"].should.have.key("arn").equals( + f"arn:aws:s3:::{bucket_name}" + ) + records[0]["s3"]["bucket"].should.have.key("name").equals(bucket_name) + records[0]["s3"].should.have.key("configurationId").equals("queue_config") + records[0]["s3"].should.have.key("object") + records[0]["s3"]["object"].should.have.key("eTag").equals( + "61ea96c3c8d2c76fc5a42bfccb6affd9" + ) + records[0]["s3"]["object"].should.have.key("key").equals("key2") + records[0]["s3"]["object"].should.have.key("size").equals(15) + + +@mock_s3 +@mock_sqs +def test_object_put__sends_to_queue__using_filter(): + s3_res = boto3.resource("s3", region_name=REGION_NAME) + s3_client = boto3.client("s3", region_name=REGION_NAME) + sqs = boto3.resource("sqs", region_name=REGION_NAME) + + # Create S3 bucket + bucket_name = str(uuid4()) + s3_res.create_bucket(Bucket=bucket_name) + + # Create SQS queue + queue = sqs.create_queue(QueueName=f"{str(uuid4())[0:6]}") + queue_arn = queue.attributes["QueueArn"] + + # Put Notification + s3_client.put_bucket_notification_configuration( + Bucket=bucket_name, + NotificationConfiguration={ + "QueueConfigurations": [ + { + "Id": "prefixed", + "QueueArn": queue_arn, + "Events": ["s3:ObjectCreated:Put"], + "Filter": { + "Key": {"FilterRules": [{"Name": "prefix", "Value": "aa"}]} + }, + }, + { + "Id": "images_only", + "QueueArn": queue_arn, + "Events": ["s3:ObjectCreated:Put"], + "Filter": { + "Key": { + "FilterRules": [ + {"Name": "prefix", "Value": "image/"}, + {"Name": "suffix", "Value": "jpg"}, + ] + } + }, + }, + ] + }, + ) + + # Read the test-event + resp = queue.receive_messages() + [m.delete() for m in resp] + + # Create an Object that does not meet any filter + s3_client.put_object(Bucket=bucket_name, Key="bb", Body="sth") + messages = queue.receive_messages() + messages.should.have.length_of(0) + [m.delete() for m in messages] + + # Create an Object that does meet the filter - using the prefix only + s3_client.put_object(Bucket=bucket_name, Key="aafilter", Body="sth") + messages = queue.receive_messages() + messages.should.have.length_of(1) + [m.delete() for m in messages] + + # Create an Object that does meet the filter - using the prefix + suffix + s3_client.put_object(Bucket=bucket_name, Key="image/yes.jpg", Body="img") + messages = queue.receive_messages() + messages.should.have.length_of(1) + [m.delete() for m in messages] + + # Create an Object that does not meet the filter - only the prefix + s3_client.put_object(Bucket=bucket_name, Key="image/no.gif", Body="img") + messages = queue.receive_messages() + messages.should.have.length_of(0) + [m.delete() for m in messages] + + # Create an Object that does not meet the filter - only the suffix + s3_client.put_object(Bucket=bucket_name, Key="nonimages/yes.jpg", Body="img") + messages = queue.receive_messages() + messages.should.have.length_of(0) + [m.delete() for m in messages]