S3 - initial Notifications implementation (#5007)

This commit is contained in:
Bert Blommers 2022-04-06 21:10:32 +00:00 committed by GitHub
parent ab06e62172
commit ac6d88518d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 471 additions and 2 deletions

View File

@ -105,6 +105,18 @@ s3
- [ ] put_bucket_metrics_configuration
- [ ] put_bucket_notification
- [X] put_bucket_notification_configuration
The configuration can be persisted, but at the moment we only send notifications to the following targets:
- AWSLambda
- SQS
For the following events:
- 's3:ObjectCreated:Copy'
- 's3:ObjectCreated:Put'
- [ ] put_bucket_ownership_controls
- [X] put_bucket_policy
- [X] put_bucket_replication

View File

@ -56,6 +56,7 @@ from moto.s3.exceptions import (
InvalidTagError,
)
from .cloud_formation import cfn_to_api_encryption, is_replacement_update
from . import notifications
from .utils import clean_key_name, _VersionedKeyStore, undo_clean_key_name
from ..settings import get_s3_default_key_buffer_size, S3_UPLOAD_PART_MIN_SIZE
@ -694,6 +695,33 @@ class Notification(BaseModel):
self.events = events
self.filters = filters if filters else {}
def _event_matches(self, event_name):
if event_name in self.events:
return True
# s3:ObjectCreated:Put --> s3:ObjectCreated:*
wildcard = ":".join(event_name.rsplit(":")[0:2]) + ":*"
if wildcard in self.events:
return True
return False
def _key_matches(self, key_name):
if "S3Key" not in self.filters:
return True
_filters = {f["Name"]: f["Value"] for f in self.filters["S3Key"]["FilterRule"]}
prefix_matches = "prefix" not in _filters or key_name.startswith(
_filters["prefix"]
)
suffix_matches = "suffix" not in _filters or key_name.endswith(
_filters["suffix"]
)
return prefix_matches and suffix_matches
def matches(self, event_name, key_name):
if self._event_matches(event_name):
if self._key_matches(key_name):
return True
return False
def to_config_dict(self):
data = {}
@ -1101,6 +1129,9 @@ class FakeBucket(CloudFormationModel):
if region != self.region_name:
raise InvalidNotificationDestination()
# Send test events so the user can verify these notifications were set correctly
notifications.send_test_event(bucket=self)
def set_accelerate_configuration(self, accelerate_config):
if self.accelerate_configuration is None and accelerate_config == "Suspended":
# Cannot "suspend" a not active acceleration. Leaves it undefined
@ -1635,6 +1666,8 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
] + [new_key]
bucket.keys.setlist(key_name, keys)
notifications.send_event(notifications.S3_OBJECT_CREATE_PUT, bucket, new_key)
return new_key
def put_object_acl(self, bucket_name, key_name, acl):
@ -1769,6 +1802,17 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
bucket.public_access_block = None
def put_bucket_notification_configuration(self, bucket_name, notification_config):
"""
The configuration can be persisted, but at the moment we only send notifications to the following targets:
- AWSLambda
- SQS
For the following events:
- 's3:ObjectCreated:Copy'
- 's3:ObjectCreated:Put'
"""
bucket = self.get_bucket(bucket_name)
bucket.set_notification_configuration(notification_config)
@ -2047,6 +2091,10 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
# Object copied from Glacier object should not have expiry
new_key.set_expiry(None)
# Send notifications that an object was copied
bucket = self.get_bucket(dest_bucket_name)
notifications.send_event(notifications.S3_OBJECT_CREATE_COPY, bucket, new_key)
def put_bucket_acl(self, bucket_name, acl):
bucket = self.get_bucket(bucket_name)
bucket.set_acl(acl)

108
moto/s3/notifications.py Normal file
View File

@ -0,0 +1,108 @@
import json
from datetime import datetime
_EVENT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
S3_OBJECT_CREATE_COPY = "s3:ObjectCreated:Copy"
S3_OBJECT_CREATE_PUT = "s3:ObjectCreated:Put"
def _get_s3_event(event_name, bucket, key, notification_id):
etag = key.etag.replace('"', "")
# s3:ObjectCreated:Put --> ObjectCreated:Put
event_name = event_name[3:]
event_time = datetime.now().strftime(_EVENT_TIME_FORMAT)
return {
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": bucket.region_name,
"eventTime": event_time,
"eventName": event_name,
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": notification_id,
"bucket": {
"name": bucket.name,
"arn": f"arn:aws:s3:::{bucket.name}",
},
"object": {"key": key.name, "size": key.size, "eTag": etag},
},
}
]
}
def _get_region_from_arn(arn):
return arn.split(":")[3]
def send_event(event_name, bucket, key):
if bucket.notification_configuration is None:
return
for notification in bucket.notification_configuration.cloud_function:
if notification.matches(event_name, key.name):
event_body = _get_s3_event(event_name, bucket, key, notification.id)
region_name = _get_region_from_arn(notification.arn)
_invoke_awslambda(event_body, notification.arn, region_name)
for notification in bucket.notification_configuration.queue:
if notification.matches(event_name, key.name):
event_body = _get_s3_event(event_name, bucket, key, notification.id)
region_name = _get_region_from_arn(notification.arn)
queue_name = notification.arn.split(":")[-1]
_send_sqs_message(event_body, queue_name, region_name)
def _send_sqs_message(event_body, queue_name, region_name):
try:
from moto.sqs.models import sqs_backends
sqs_backend = sqs_backends[region_name]
sqs_backend.send_message(
queue_name=queue_name, message_body=json.dumps(event_body)
)
except: # noqa
# This is an async action in AWS.
# Even if this part fails, the calling function should pass, so catch all errors
# Possible exceptions that could be thrown:
# - Queue does not exist
pass
def _invoke_awslambda(event_body, fn_arn, region_name):
try:
from moto.awslambda.models import lambda_backends
lambda_backend = lambda_backends[region_name]
func = lambda_backend.get_function(fn_arn)
func.invoke(json.dumps(event_body), dict(), dict())
except: # noqa
# This is an async action in AWS.
# Even if this part fails, the calling function should pass, so catch all errors
# Possible exceptions that could be thrown:
# - Function does not exist
pass
def _get_test_event(bucket_name):
event_time = datetime.now().strftime(_EVENT_TIME_FORMAT)
return {
"Service": "Amazon S3",
"Event": "s3:TestEvent",
"Time": event_time,
"Bucket": bucket_name,
}
def send_test_event(bucket):
arns = [n.arn for n in bucket.notification_configuration.queue]
for arn in set(arns):
region_name = _get_region_from_arn(arn)
queue_name = arn.split(":")[-1]
message_body = _get_test_event(bucket.name)
_send_sqs_message(message_body, queue_name, region_name)

View File

@ -134,6 +134,16 @@ def util_function():
return zip_output.read()
def get_test_zip_file_print_event():
pfunc = """
def lambda_handler(event, context):
print(event)
print("FINISHED_PRINTING_EVENT")
return event
"""
return _process_lambda(pfunc)
def create_invalid_lambda(role):
conn = boto3.client("lambda", _lambda_region)
zip_content = get_test_zip_file1()
@ -166,11 +176,11 @@ def get_role_name():
)["Role"]["Arn"]
def wait_for_log_msg(expected_msg, log_group):
def wait_for_log_msg(expected_msg, log_group, wait_time=30):
logs_conn = boto3.client("logs", region_name="us-east-1")
received_messages = []
start = time.time()
while (time.time() - start) < 30:
while (time.time() - start) < wait_time:
try:
result = logs_conn.describe_log_streams(logGroupName=log_group)
log_streams = result.get("logStreams")

View File

@ -0,0 +1,291 @@
import boto3
import json
import pytest
from moto import mock_lambda, mock_logs, mock_s3, mock_sqs
from moto.core import ACCOUNT_ID
from tests.test_awslambda.utilities import (
get_test_zip_file_print_event,
get_role_name,
wait_for_log_msg,
)
from uuid import uuid4
REGION_NAME = "us-east-1"
@mock_lambda
@mock_logs
@mock_s3
@pytest.mark.parametrize(
"match_events,actual_event",
[
(["s3:ObjectCreated:Put"], "ObjectCreated:Put"),
(["s3:ObjectCreated:*"], "ObjectCreated:Put"),
(["s3:ObjectCreated:Post"], None),
(["s3:ObjectCreated:Post", "s3:ObjectCreated:*"], "ObjectCreated:Put"),
],
)
def test_objectcreated_put__invokes_lambda(match_events, actual_event):
s3_res = boto3.resource("s3", region_name=REGION_NAME)
s3_client = boto3.client("s3", region_name=REGION_NAME)
lambda_client = boto3.client("lambda", REGION_NAME)
# Create S3 bucket
bucket_name = str(uuid4())
s3_res.create_bucket(Bucket=bucket_name)
# Create AWSLambda function
function_name = str(uuid4())[0:6]
fn_arn = lambda_client.create_function(
FunctionName=function_name,
Runtime="python3.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file_print_event()},
)["FunctionArn"]
# Put Notification
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={
"LambdaFunctionConfigurations": [
{
"Id": "unrelated",
"LambdaFunctionArn": f"arn:aws:lambda:us-east-1:{ACCOUNT_ID}:function:n/a",
"Events": ["s3:ReducedRedundancyLostObject"],
},
{
"Id": "s3eventtriggerslambda",
"LambdaFunctionArn": fn_arn,
"Events": match_events,
},
]
},
)
# Put Object
s3_client.put_object(Bucket=bucket_name, Key="keyname", Body="bodyofnewobject")
# Find the output of AWSLambda
expected_msg = "FINISHED_PRINTING_EVENT"
log_group = f"/aws/lambda/{function_name}"
msg_showed_up, all_logs = wait_for_log_msg(expected_msg, log_group, wait_time=10)
if actual_event is None:
# The event should not be fired on POST, as we've only PUT an event for now
assert not msg_showed_up
return
# If we do have an actual event, verify the Lambda was invoked with the correct event
assert msg_showed_up, (
expected_msg
+ " was not found after sending an SQS message. All logs: "
+ str(all_logs)
)
records = [l for l in all_logs if l.startswith("{'Records'")][0]
records = json.loads(records.replace("'", '"'))["Records"]
records.should.have.length_of(1)
records[0].should.have.key("awsRegion").equals(REGION_NAME)
records[0].should.have.key("eventName").equals(actual_event)
records[0].should.have.key("eventSource").equals("aws:s3")
records[0].should.have.key("eventTime")
records[0].should.have.key("s3")
records[0]["s3"].should.have.key("bucket")
records[0]["s3"]["bucket"].should.have.key("arn").equals(
f"arn:aws:s3:::{bucket_name}"
)
records[0]["s3"]["bucket"].should.have.key("name").equals(bucket_name)
records[0]["s3"].should.have.key("configurationId").equals("s3eventtriggerslambda")
records[0]["s3"].should.have.key("object")
records[0]["s3"]["object"].should.have.key("eTag").equals(
"61ea96c3c8d2c76fc5a42bfccb6affd9"
)
records[0]["s3"]["object"].should.have.key("key").equals("keyname")
records[0]["s3"]["object"].should.have.key("size").equals(15)
@mock_logs
@mock_s3
def test_objectcreated_put__unknown_lambda_is_handled_gracefully():
s3_res = boto3.resource("s3", region_name=REGION_NAME)
s3_client = boto3.client("s3", region_name=REGION_NAME)
# Create S3 bucket
bucket_name = str(uuid4())
s3_res.create_bucket(Bucket=bucket_name)
# Put Notification
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={
"LambdaFunctionConfigurations": [
{
"Id": "unrelated",
"LambdaFunctionArn": f"arn:aws:lambda:us-east-1:{ACCOUNT_ID}:function:n/a",
"Events": ["s3:ObjectCreated:Put"],
}
]
},
)
# Put Object
s3_client.put_object(Bucket=bucket_name, Key="keyname", Body="bodyofnewobject")
# The object was persisted successfully
resp = s3_client.get_object(Bucket=bucket_name, Key="keyname")
resp.should.have.key("ContentLength").equal(15)
resp["Body"].read().should.equal(b"bodyofnewobject")
@mock_s3
@mock_sqs
def test_object_copy__sends_to_queue():
s3_res = boto3.resource("s3", region_name=REGION_NAME)
s3_client = boto3.client("s3", region_name=REGION_NAME)
sqs_client = boto3.client("sqs", region_name=REGION_NAME)
# Create S3 bucket
bucket_name = str(uuid4())
s3_res.create_bucket(Bucket=bucket_name)
# Create SQS queue
queue_url = sqs_client.create_queue(QueueName=str(uuid4())[0:6])["QueueUrl"]
queue_arn = sqs_client.get_queue_attributes(
QueueUrl=queue_url, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
# Put Notification
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={
"QueueConfigurations": [
{
"Id": "queue_config",
"QueueArn": queue_arn,
"Events": ["s3:ObjectCreated:Copy"],
}
]
},
)
# We should have received a test event now
messages = sqs_client.receive_message(QueueUrl=queue_url)["Messages"]
messages.should.have.length_of(1)
message = json.loads(messages[0]["Body"])
message.should.have.key("Service").equals("Amazon S3")
message.should.have.key("Event").equals("s3:TestEvent")
message.should.have.key("Time")
message.should.have.key("Bucket").equals(bucket_name)
# Copy an Object
s3_client.put_object(Bucket=bucket_name, Key="keyname", Body="bodyofnewobject")
s3_client.copy_object(
Bucket=bucket_name, CopySource=f"{bucket_name}/keyname", Key="key2"
)
# Read SQS messages - we should have the Copy-event here
resp = sqs_client.receive_message(QueueUrl=queue_url)
resp.should.have.key("Messages").length_of(1)
records = json.loads(resp["Messages"][0]["Body"])["Records"]
records.should.have.length_of(1)
records[0].should.have.key("awsRegion").equals(REGION_NAME)
records[0].should.have.key("eventName").equals("ObjectCreated:Copy")
records[0].should.have.key("eventSource").equals("aws:s3")
records[0].should.have.key("eventTime")
records[0].should.have.key("s3")
records[0]["s3"].should.have.key("bucket")
records[0]["s3"]["bucket"].should.have.key("arn").equals(
f"arn:aws:s3:::{bucket_name}"
)
records[0]["s3"]["bucket"].should.have.key("name").equals(bucket_name)
records[0]["s3"].should.have.key("configurationId").equals("queue_config")
records[0]["s3"].should.have.key("object")
records[0]["s3"]["object"].should.have.key("eTag").equals(
"61ea96c3c8d2c76fc5a42bfccb6affd9"
)
records[0]["s3"]["object"].should.have.key("key").equals("key2")
records[0]["s3"]["object"].should.have.key("size").equals(15)
@mock_s3
@mock_sqs
def test_object_put__sends_to_queue__using_filter():
s3_res = boto3.resource("s3", region_name=REGION_NAME)
s3_client = boto3.client("s3", region_name=REGION_NAME)
sqs = boto3.resource("sqs", region_name=REGION_NAME)
# Create S3 bucket
bucket_name = str(uuid4())
s3_res.create_bucket(Bucket=bucket_name)
# Create SQS queue
queue = sqs.create_queue(QueueName=f"{str(uuid4())[0:6]}")
queue_arn = queue.attributes["QueueArn"]
# Put Notification
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={
"QueueConfigurations": [
{
"Id": "prefixed",
"QueueArn": queue_arn,
"Events": ["s3:ObjectCreated:Put"],
"Filter": {
"Key": {"FilterRules": [{"Name": "prefix", "Value": "aa"}]}
},
},
{
"Id": "images_only",
"QueueArn": queue_arn,
"Events": ["s3:ObjectCreated:Put"],
"Filter": {
"Key": {
"FilterRules": [
{"Name": "prefix", "Value": "image/"},
{"Name": "suffix", "Value": "jpg"},
]
}
},
},
]
},
)
# Read the test-event
resp = queue.receive_messages()
[m.delete() for m in resp]
# Create an Object that does not meet any filter
s3_client.put_object(Bucket=bucket_name, Key="bb", Body="sth")
messages = queue.receive_messages()
messages.should.have.length_of(0)
[m.delete() for m in messages]
# Create an Object that does meet the filter - using the prefix only
s3_client.put_object(Bucket=bucket_name, Key="aafilter", Body="sth")
messages = queue.receive_messages()
messages.should.have.length_of(1)
[m.delete() for m in messages]
# Create an Object that does meet the filter - using the prefix + suffix
s3_client.put_object(Bucket=bucket_name, Key="image/yes.jpg", Body="img")
messages = queue.receive_messages()
messages.should.have.length_of(1)
[m.delete() for m in messages]
# Create an Object that does not meet the filter - only the prefix
s3_client.put_object(Bucket=bucket_name, Key="image/no.gif", Body="img")
messages = queue.receive_messages()
messages.should.have.length_of(0)
[m.delete() for m in messages]
# Create an Object that does not meet the filter - only the suffix
s3_client.put_object(Bucket=bucket_name, Key="nonimages/yes.jpg", Body="img")
messages = queue.receive_messages()
messages.should.have.length_of(0)
[m.delete() for m in messages]