diff --git a/docs/docs/services/s3.rst b/docs/docs/services/s3.rst index aa7351335..963946db5 100644 --- a/docs/docs/services/s3.rst +++ b/docs/docs/services/s3.rst @@ -113,6 +113,7 @@ s3 The configuration can be persisted, but at the moment we only send notifications to the following targets: - AWSLambda + - SNS - SQS For the following events: diff --git a/moto/s3/models.py b/moto/s3/models.py index b88cced02..ac73475f0 100644 --- a/moto/s3/models.py +++ b/moto/s3/models.py @@ -2202,6 +2202,7 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider): The configuration can be persisted, but at the moment we only send notifications to the following targets: - AWSLambda + - SNS - SQS For the following events: diff --git a/moto/s3/notifications.py b/moto/s3/notifications.py index 5258b4f11..b43ab1908 100644 --- a/moto/s3/notifications.py +++ b/moto/s3/notifications.py @@ -60,6 +60,14 @@ def send_event(account_id: str, event_name: str, bucket: Any, key: Any) -> None: _send_sqs_message(account_id, event_body, queue_name, region_name) + for notification in bucket.notification_configuration.topic: + if notification.matches(event_name, key.name): + event_body = _get_s3_event(event_name, bucket, key, notification.id) + region_name = _get_region_from_arn(notification.arn) + topic_arn = notification.arn + + _send_sns_message(account_id, event_body, topic_arn, region_name) + def _send_sqs_message( account_id: str, event_body: Any, queue_name: str, region_name: str @@ -79,6 +87,22 @@ def _send_sqs_message( pass +def _send_sns_message( + account_id: str, event_body: Any, topic_arn: str, region_name: str +) -> None: + try: + from moto.sns.models import sns_backends + + sns_backend = sns_backends[account_id][region_name] + sns_backend.publish(arn=topic_arn, message=json.dumps(event_body)) + except: # noqa + # This is an async action in AWS. + # Even if this part fails, the calling function should pass, so catch all errors + # Possible exceptions that could be thrown: + # - Topic does not exist + pass + + def _invoke_awslambda( account_id: str, event_body: Any, fn_arn: str, region_name: str ) -> None: @@ -113,3 +137,9 @@ def send_test_event(account_id: str, bucket: Any) -> None: queue_name = arn.split(":")[-1] message_body = _get_test_event(bucket.name) _send_sqs_message(account_id, message_body, queue_name, region_name) + + arns = [n.arn for n in bucket.notification_configuration.topic] + for arn in set(arns): + region_name = _get_region_from_arn(arn) + message_body = _get_test_event(bucket.name) + _send_sns_message(account_id, message_body, arn, region_name) diff --git a/tests/test_s3/test_s3_lambda_integration.py b/tests/test_s3/test_s3_lambda_integration.py index 3df1a2fea..301af5f49 100644 --- a/tests/test_s3/test_s3_lambda_integration.py +++ b/tests/test_s3/test_s3_lambda_integration.py @@ -4,7 +4,7 @@ from uuid import uuid4 import boto3 import pytest -from moto import mock_lambda, mock_logs, mock_s3, mock_sqs +from moto import mock_lambda, mock_logs, mock_s3, mock_sns, mock_sqs from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID from tests.markers import requires_docker from tests.test_awslambda.utilities import ( @@ -285,3 +285,101 @@ def test_object_put__sends_to_queue__using_filter(): messages = queue.receive_messages() assert not messages _ = [m.delete() for m in messages] + + +@mock_s3 +@mock_sns +@mock_sqs +def test_put_bucket_notification_sns_sqs(): + s3_client = boto3.client("s3", region_name=REGION_NAME) + s3_client.create_bucket(Bucket="bucket") + + sqs_client = boto3.client("sqs", region_name=REGION_NAME) + sqs_queue = sqs_client.create_queue(QueueName="queue") + sqs_queue_arn = sqs_client.get_queue_attributes( + QueueUrl=sqs_queue["QueueUrl"], AttributeNames=["QueueArn"] + ) + + sns_client = boto3.client("sns", region_name=REGION_NAME) + sns_topic = sns_client.create_topic(Name="topic") + + # Subscribe SQS queue to SNS topic + sns_client.subscribe( + TopicArn=sns_topic["TopicArn"], + Protocol="sqs", + Endpoint=sqs_queue_arn["Attributes"]["QueueArn"], + ) + + # Set S3 to send ObjectCreated to SNS + s3_client.put_bucket_notification_configuration( + Bucket="bucket", + NotificationConfiguration={ + "TopicConfigurations": [ + { + "Id": "SomeID", + "TopicArn": sns_topic["TopicArn"], + "Events": ["s3:ObjectCreated:*"], + } + ] + }, + ) + + # We should receive a test message + messages = sqs_client.receive_message( + QueueUrl=sqs_queue["QueueUrl"], MaxNumberOfMessages=10 + ) + assert len(messages["Messages"]) == 1 + + sqs_client.delete_message( + QueueUrl=sqs_queue["QueueUrl"], + ReceiptHandle=messages["Messages"][0]["ReceiptHandle"], + ) + + message_body = messages["Messages"][0]["Body"] + sns_message = json.loads(message_body) + assert sns_message["Type"] == "Notification" + + # Get S3 notification from SNS message + s3_message_body = json.loads(sns_message["Message"]) + assert s3_message_body["Event"] == "s3:TestEvent" + + # Upload file to trigger notification + s3_client.put_object(Bucket="bucket", Key="myfile", Body=b"asdf1324") + + # Verify queue not empty + messages = sqs_client.receive_message( + QueueUrl=sqs_queue["QueueUrl"], MaxNumberOfMessages=10 + ) + assert len(messages["Messages"]) == 1 + + # Get SNS message from SQS + message_body = messages["Messages"][0]["Body"] + sns_message = json.loads(message_body) + assert sns_message["Type"] == "Notification" + + # Get S3 notification from SNS message + s3_message_body = json.loads(sns_message["Message"]) + assert s3_message_body["Records"][0]["eventName"] == "ObjectCreated:Put" + + +@mock_s3 +def test_put_bucket_notification_sns_error(): + s3_client = boto3.client("s3", region_name=REGION_NAME) + s3_client.create_bucket(Bucket="bucket") + + # Set S3 to send ObjectCreated to SNS + s3_client.put_bucket_notification_configuration( + Bucket="bucket", + NotificationConfiguration={ + "TopicConfigurations": [ + { + "Id": "SomeID", + "TopicArn": "arn:aws:sns:us-east-1:012345678910:notexistingtopic", + "Events": ["s3:ObjectCreated:*"], + } + ] + }, + ) + + # This should not throw an exception + s3_client.put_object(Bucket="bucket", Key="myfile", Body=b"asdf1324")