Feature: s3 sns notification (#6838)
This commit is contained in:
parent
0ae51188dd
commit
f878aaf8cd
@ -113,6 +113,7 @@ s3
|
|||||||
The configuration can be persisted, but at the moment we only send notifications to the following targets:
|
The configuration can be persisted, but at the moment we only send notifications to the following targets:
|
||||||
|
|
||||||
- AWSLambda
|
- AWSLambda
|
||||||
|
- SNS
|
||||||
- SQS
|
- SQS
|
||||||
|
|
||||||
For the following events:
|
For the following events:
|
||||||
|
@ -2202,6 +2202,7 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
|
|||||||
The configuration can be persisted, but at the moment we only send notifications to the following targets:
|
The configuration can be persisted, but at the moment we only send notifications to the following targets:
|
||||||
|
|
||||||
- AWSLambda
|
- AWSLambda
|
||||||
|
- SNS
|
||||||
- SQS
|
- SQS
|
||||||
|
|
||||||
For the following events:
|
For the following events:
|
||||||
|
@ -60,6 +60,14 @@ def send_event(account_id: str, event_name: str, bucket: Any, key: Any) -> None:
|
|||||||
|
|
||||||
_send_sqs_message(account_id, event_body, queue_name, region_name)
|
_send_sqs_message(account_id, event_body, queue_name, region_name)
|
||||||
|
|
||||||
|
for notification in bucket.notification_configuration.topic:
|
||||||
|
if notification.matches(event_name, key.name):
|
||||||
|
event_body = _get_s3_event(event_name, bucket, key, notification.id)
|
||||||
|
region_name = _get_region_from_arn(notification.arn)
|
||||||
|
topic_arn = notification.arn
|
||||||
|
|
||||||
|
_send_sns_message(account_id, event_body, topic_arn, region_name)
|
||||||
|
|
||||||
|
|
||||||
def _send_sqs_message(
|
def _send_sqs_message(
|
||||||
account_id: str, event_body: Any, queue_name: str, region_name: str
|
account_id: str, event_body: Any, queue_name: str, region_name: str
|
||||||
@ -79,6 +87,22 @@ def _send_sqs_message(
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _send_sns_message(
|
||||||
|
account_id: str, event_body: Any, topic_arn: str, region_name: str
|
||||||
|
) -> None:
|
||||||
|
try:
|
||||||
|
from moto.sns.models import sns_backends
|
||||||
|
|
||||||
|
sns_backend = sns_backends[account_id][region_name]
|
||||||
|
sns_backend.publish(arn=topic_arn, message=json.dumps(event_body))
|
||||||
|
except: # noqa
|
||||||
|
# This is an async action in AWS.
|
||||||
|
# Even if this part fails, the calling function should pass, so catch all errors
|
||||||
|
# Possible exceptions that could be thrown:
|
||||||
|
# - Topic does not exist
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def _invoke_awslambda(
|
def _invoke_awslambda(
|
||||||
account_id: str, event_body: Any, fn_arn: str, region_name: str
|
account_id: str, event_body: Any, fn_arn: str, region_name: str
|
||||||
) -> None:
|
) -> None:
|
||||||
@ -113,3 +137,9 @@ def send_test_event(account_id: str, bucket: Any) -> None:
|
|||||||
queue_name = arn.split(":")[-1]
|
queue_name = arn.split(":")[-1]
|
||||||
message_body = _get_test_event(bucket.name)
|
message_body = _get_test_event(bucket.name)
|
||||||
_send_sqs_message(account_id, message_body, queue_name, region_name)
|
_send_sqs_message(account_id, message_body, queue_name, region_name)
|
||||||
|
|
||||||
|
arns = [n.arn for n in bucket.notification_configuration.topic]
|
||||||
|
for arn in set(arns):
|
||||||
|
region_name = _get_region_from_arn(arn)
|
||||||
|
message_body = _get_test_event(bucket.name)
|
||||||
|
_send_sns_message(account_id, message_body, arn, region_name)
|
||||||
|
@ -4,7 +4,7 @@ from uuid import uuid4
|
|||||||
import boto3
|
import boto3
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from moto import mock_lambda, mock_logs, mock_s3, mock_sqs
|
from moto import mock_lambda, mock_logs, mock_s3, mock_sns, mock_sqs
|
||||||
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
|
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
|
||||||
from tests.markers import requires_docker
|
from tests.markers import requires_docker
|
||||||
from tests.test_awslambda.utilities import (
|
from tests.test_awslambda.utilities import (
|
||||||
@ -285,3 +285,101 @@ def test_object_put__sends_to_queue__using_filter():
|
|||||||
messages = queue.receive_messages()
|
messages = queue.receive_messages()
|
||||||
assert not messages
|
assert not messages
|
||||||
_ = [m.delete() for m in messages]
|
_ = [m.delete() for m in messages]
|
||||||
|
|
||||||
|
|
||||||
|
@mock_s3
|
||||||
|
@mock_sns
|
||||||
|
@mock_sqs
|
||||||
|
def test_put_bucket_notification_sns_sqs():
|
||||||
|
s3_client = boto3.client("s3", region_name=REGION_NAME)
|
||||||
|
s3_client.create_bucket(Bucket="bucket")
|
||||||
|
|
||||||
|
sqs_client = boto3.client("sqs", region_name=REGION_NAME)
|
||||||
|
sqs_queue = sqs_client.create_queue(QueueName="queue")
|
||||||
|
sqs_queue_arn = sqs_client.get_queue_attributes(
|
||||||
|
QueueUrl=sqs_queue["QueueUrl"], AttributeNames=["QueueArn"]
|
||||||
|
)
|
||||||
|
|
||||||
|
sns_client = boto3.client("sns", region_name=REGION_NAME)
|
||||||
|
sns_topic = sns_client.create_topic(Name="topic")
|
||||||
|
|
||||||
|
# Subscribe SQS queue to SNS topic
|
||||||
|
sns_client.subscribe(
|
||||||
|
TopicArn=sns_topic["TopicArn"],
|
||||||
|
Protocol="sqs",
|
||||||
|
Endpoint=sqs_queue_arn["Attributes"]["QueueArn"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set S3 to send ObjectCreated to SNS
|
||||||
|
s3_client.put_bucket_notification_configuration(
|
||||||
|
Bucket="bucket",
|
||||||
|
NotificationConfiguration={
|
||||||
|
"TopicConfigurations": [
|
||||||
|
{
|
||||||
|
"Id": "SomeID",
|
||||||
|
"TopicArn": sns_topic["TopicArn"],
|
||||||
|
"Events": ["s3:ObjectCreated:*"],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# We should receive a test message
|
||||||
|
messages = sqs_client.receive_message(
|
||||||
|
QueueUrl=sqs_queue["QueueUrl"], MaxNumberOfMessages=10
|
||||||
|
)
|
||||||
|
assert len(messages["Messages"]) == 1
|
||||||
|
|
||||||
|
sqs_client.delete_message(
|
||||||
|
QueueUrl=sqs_queue["QueueUrl"],
|
||||||
|
ReceiptHandle=messages["Messages"][0]["ReceiptHandle"],
|
||||||
|
)
|
||||||
|
|
||||||
|
message_body = messages["Messages"][0]["Body"]
|
||||||
|
sns_message = json.loads(message_body)
|
||||||
|
assert sns_message["Type"] == "Notification"
|
||||||
|
|
||||||
|
# Get S3 notification from SNS message
|
||||||
|
s3_message_body = json.loads(sns_message["Message"])
|
||||||
|
assert s3_message_body["Event"] == "s3:TestEvent"
|
||||||
|
|
||||||
|
# Upload file to trigger notification
|
||||||
|
s3_client.put_object(Bucket="bucket", Key="myfile", Body=b"asdf1324")
|
||||||
|
|
||||||
|
# Verify queue not empty
|
||||||
|
messages = sqs_client.receive_message(
|
||||||
|
QueueUrl=sqs_queue["QueueUrl"], MaxNumberOfMessages=10
|
||||||
|
)
|
||||||
|
assert len(messages["Messages"]) == 1
|
||||||
|
|
||||||
|
# Get SNS message from SQS
|
||||||
|
message_body = messages["Messages"][0]["Body"]
|
||||||
|
sns_message = json.loads(message_body)
|
||||||
|
assert sns_message["Type"] == "Notification"
|
||||||
|
|
||||||
|
# Get S3 notification from SNS message
|
||||||
|
s3_message_body = json.loads(sns_message["Message"])
|
||||||
|
assert s3_message_body["Records"][0]["eventName"] == "ObjectCreated:Put"
|
||||||
|
|
||||||
|
|
||||||
|
@mock_s3
|
||||||
|
def test_put_bucket_notification_sns_error():
|
||||||
|
s3_client = boto3.client("s3", region_name=REGION_NAME)
|
||||||
|
s3_client.create_bucket(Bucket="bucket")
|
||||||
|
|
||||||
|
# Set S3 to send ObjectCreated to SNS
|
||||||
|
s3_client.put_bucket_notification_configuration(
|
||||||
|
Bucket="bucket",
|
||||||
|
NotificationConfiguration={
|
||||||
|
"TopicConfigurations": [
|
||||||
|
{
|
||||||
|
"Id": "SomeID",
|
||||||
|
"TopicArn": "arn:aws:sns:us-east-1:012345678910:notexistingtopic",
|
||||||
|
"Events": ["s3:ObjectCreated:*"],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# This should not throw an exception
|
||||||
|
s3_client.put_object(Bucket="bucket", Key="myfile", Body=b"asdf1324")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user