S3: Adding tests of EventBridge notification ObjectCreated:Copy (#7407)

This commit is contained in:
Akira Noda 2024-02-29 19:35:42 +09:00 committed by GitHub
parent ce447bfc2a
commit c404f0877e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 86 additions and 66 deletions

View File

@ -2048,7 +2048,9 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
lock_legal_status: Optional[str] = None, lock_legal_status: Optional[str] = None,
lock_until: Optional[str] = None, lock_until: Optional[str] = None,
checksum_value: Optional[str] = None, checksum_value: Optional[str] = None,
# arguments to handle notification
request_method: Optional[str] = "PUT", request_method: Optional[str] = "PUT",
disable_notification: Optional[bool] = False,
) -> FakeKey: ) -> FakeKey:
if storage is not None and storage not in STORAGE_CLASS: if storage is not None and storage not in STORAGE_CLASS:
raise InvalidStorageClass(storage=storage) raise InvalidStorageClass(storage=storage)
@ -2097,6 +2099,7 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
keys = [new_key] keys = [new_key]
bucket.keys.setlist(key_name, keys) bucket.keys.setlist(key_name, keys)
if not disable_notification:
# Send event notification # Send event notification
if request_method == "POST": if request_method == "POST":
notify_event_name = ( notify_event_name = (
@ -2706,6 +2709,7 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
lock_mode=lock_mode, lock_mode=lock_mode,
lock_legal_status=lock_legal_status, lock_legal_status=lock_legal_status,
lock_until=lock_until, lock_until=lock_until,
disable_notification=True, # avoid sending PutObject events here
) )
self.tagger.copy_tags(src_key.arn, new_key.arn) self.tagger.copy_tags(src_key.arn, new_key.arn)
if mdirective != "REPLACE": if mdirective != "REPLACE":

View File

@ -1,5 +1,6 @@
import json import json
from io import BytesIO from io import BytesIO
from typing import Any, Dict, List
from unittest import SkipTest from unittest import SkipTest
from uuid import uuid4 from uuid import uuid4
@ -14,18 +15,31 @@ REGION_NAME = "us-east-1"
REDUCED_PART_SIZE = 256 REDUCED_PART_SIZE = 256
@mock_aws def _seteup_bucket_notification_eventbridge(
def test_put_object_notification_ObjectCreated_PUT(): bucket_name: str = str(uuid4()),
rule_name: str = "test-rule",
log_group_name: str = "/test-group",
) -> Dict[str, str]:
"""Setups S3, EventBridge and CloudWatchLogs"""
# Setup S3
s3_res = boto3.resource("s3", region_name=REGION_NAME) s3_res = boto3.resource("s3", region_name=REGION_NAME)
s3_client = boto3.client("s3", region_name=REGION_NAME) s3_res.create_bucket(Bucket=bucket_name)
events_client = boto3.client("events", region_name=REGION_NAME)
logs_client = boto3.client("logs", region_name=REGION_NAME)
rule_name = "test-rule" # Put bucket notification event bridge
s3_client = boto3.client("s3", region_name=REGION_NAME)
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={"EventBridgeConfiguration": {}},
)
# Setup EventBridge Rule
events_client = boto3.client("events", region_name=REGION_NAME)
events_client.put_rule( events_client.put_rule(
Name=rule_name, EventPattern=json.dumps({"account": [ACCOUNT_ID]}) Name=rule_name, EventPattern=json.dumps({"account": [ACCOUNT_ID]})
) )
log_group_name = "/test-group"
# Create a log group and attach it to the events target.
logs_client = boto3.client("logs", region_name=REGION_NAME)
logs_client.create_log_group(logGroupName=log_group_name) logs_client.create_log_group(logGroupName=log_group_name)
events_client.put_targets( events_client.put_targets(
Rule=rule_name, Rule=rule_name,
@ -37,23 +51,31 @@ def test_put_object_notification_ObjectCreated_PUT():
], ],
) )
# Create S3 bucket return {
bucket_name = str(uuid4()) "bucket_name": bucket_name,
s3_res.create_bucket(Bucket=bucket_name) "event_rule_name": rule_name,
"log_group_name": log_group_name,
}
# Put Notification
s3_client.put_bucket_notification_configuration( def _get_send_events(log_group_name: str = "/test-group") -> List[Dict[str, Any]]:
Bucket=bucket_name, logs_client = boto3.client("logs", region_name=REGION_NAME)
NotificationConfiguration={"EventBridgeConfiguration": {}}, return sorted(
logs_client.filter_log_events(logGroupName=log_group_name)["events"],
key=lambda item: item["timestamp"],
) )
@mock_aws
def test_put_object_notification_ObjectCreated_PUT():
resource_names = _seteup_bucket_notification_eventbridge()
bucket_name = resource_names["bucket_name"]
s3_client = boto3.client("s3", region_name=REGION_NAME)
# Put Object # Put Object
s3_client.put_object(Bucket=bucket_name, Key="keyname", Body="bodyofnewobject") s3_client.put_object(Bucket=bucket_name, Key="keyname", Body="bodyofnewobject")
events = sorted( events = _get_send_events()
logs_client.filter_log_events(logGroupName=log_group_name)["events"],
key=lambda item: item["eventId"],
)
assert len(events) == 1 assert len(events) == 1
event_message = json.loads(events[0]["message"]) event_message = json.loads(events[0]["message"])
assert event_message["detail-type"] == "Object Created" assert event_message["detail-type"] == "Object Created"
@ -70,36 +92,8 @@ def test_put_object_notification_ObjectCreated_POST():
if not settings.TEST_DECORATOR_MODE: if not settings.TEST_DECORATOR_MODE:
raise SkipTest(("Doesn't quite work right with the Proxy or Server")) raise SkipTest(("Doesn't quite work right with the Proxy or Server"))
s3_res = boto3.resource("s3", region_name=REGION_NAME) resource_names = _seteup_bucket_notification_eventbridge()
s3_client = boto3.client("s3", region_name=REGION_NAME) bucket_name = resource_names["bucket_name"]
events_client = boto3.client("events", region_name=REGION_NAME)
logs_client = boto3.client("logs", region_name=REGION_NAME)
rule_name = "test-rule"
events_client.put_rule(
Name=rule_name, EventPattern=json.dumps({"account": [ACCOUNT_ID]})
)
log_group_name = "/test-group"
logs_client.create_log_group(logGroupName=log_group_name)
events_client.put_targets(
Rule=rule_name,
Targets=[
{
"Id": "test",
"Arn": f"arn:aws:logs:{REGION_NAME}:{ACCOUNT_ID}:log-group:{log_group_name}",
}
],
)
# Create S3 bucket
bucket_name = str(uuid4())
s3_res.create_bucket(Bucket=bucket_name)
# Put bucket notification event bridge
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={"EventBridgeConfiguration": {}},
)
### ###
# multipart/formdata POST request (this request is processed in S3Response._bucket_response_post) # multipart/formdata POST request (this request is processed in S3Response._bucket_response_post)
@ -113,10 +107,7 @@ def test_put_object_notification_ObjectCreated_POST():
files={"file": ("tmp.txt", BytesIO(content))}, files={"file": ("tmp.txt", BytesIO(content))},
) )
events = sorted( events = _get_send_events()
logs_client.filter_log_events(logGroupName=log_group_name)["events"],
key=lambda item: item["eventId"],
)
assert len(events) == 1 assert len(events) == 1
event_message = json.loads(events[0]["message"]) event_message = json.loads(events[0]["message"])
assert event_message["detail-type"] == "Object Created" assert event_message["detail-type"] == "Object Created"
@ -126,3 +117,28 @@ def test_put_object_notification_ObjectCreated_POST():
assert event_message["detail"]["bucket"]["name"] == bucket_name assert event_message["detail"]["bucket"]["name"] == bucket_name
assert event_message["detail"]["object"]["key"] == object_key assert event_message["detail"]["object"]["key"] == object_key
assert event_message["detail"]["reason"] == "ObjectCreated" assert event_message["detail"]["reason"] == "ObjectCreated"
@mock_aws
def test_copy_object_notification():
resource_names = _seteup_bucket_notification_eventbridge()
bucket_name = resource_names["bucket_name"]
s3_client = boto3.client("s3", region_name=REGION_NAME)
# Copy object (send two events; PutObject and CopyObject)
s3_client.put_object(Bucket=bucket_name, Key="keyname", Body="bodyofnewobject")
object_key = "key2"
s3_client.copy_object(
Bucket=bucket_name, CopySource=f"{bucket_name}/keyname", Key="key2"
)
events = _get_send_events()
assert len(events) == 2 # [PutObject event, CopyObject event]
event_message = json.loads(events[-1]["message"])
assert event_message["detail-type"] == "Object Created"
assert event_message["source"] == "aws.s3"
assert event_message["account"] == ACCOUNT_ID
assert event_message["region"] == REGION_NAME
assert event_message["detail"]["bucket"]["name"] == bucket_name
assert event_message["detail"]["object"]["key"] == object_key
assert event_message["detail"]["reason"] == "ObjectCreated"