Using Enum for s3 notification events (#7238)
This commit is contained in:
		
							parent
							
								
									1c3c4953cf
								
							
						
					
					
						commit
						494eac54aa
					
				| @ -2,6 +2,8 @@ from typing import TYPE_CHECKING, Any, Optional, Union | ||||
| 
 | ||||
| from moto.core.exceptions import RESTError | ||||
| 
 | ||||
| from .notifications import S3NotificationEvent | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from moto.s3.models import FakeDeleteMarker | ||||
| 
 | ||||
| @ -281,10 +283,13 @@ class InvalidNotificationDestination(S3ClientError): | ||||
| class InvalidNotificationEvent(S3ClientError): | ||||
|     code = 400 | ||||
| 
 | ||||
|     def __init__(self) -> None: | ||||
|     def __init__(self, event_name: str) -> None: | ||||
|         super().__init__( | ||||
|             "InvalidArgument", | ||||
|             "The event is not supported for notifications", | ||||
|             ( | ||||
|                 f"The event '{event_name}' is not supported for notifications. " | ||||
|                 f"Supported events are as follows: {S3NotificationEvent.events()}" | ||||
|             ), | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
|  | ||||
| @ -2088,7 +2088,10 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider): | ||||
|         bucket.keys.setlist(key_name, keys) | ||||
| 
 | ||||
|         notifications.send_event( | ||||
|             self.account_id, notifications.S3_OBJECT_CREATE_PUT, bucket, new_key | ||||
|             self.account_id, | ||||
|             notifications.S3NotificationEvent.OBJECT_CREATED_PUT_EVENT, | ||||
|             bucket, | ||||
|             new_key, | ||||
|         ) | ||||
| 
 | ||||
|         return new_key | ||||
| @ -2640,7 +2643,10 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider): | ||||
| 
 | ||||
|         # Send notifications that an object was copied | ||||
|         notifications.send_event( | ||||
|             self.account_id, notifications.S3_OBJECT_CREATE_COPY, bucket, new_key | ||||
|             self.account_id, | ||||
|             notifications.S3NotificationEvent.OBJECT_CREATED_COPY_EVENT, | ||||
|             bucket, | ||||
|             new_key, | ||||
|         ) | ||||
| 
 | ||||
|     def put_bucket_acl(self, bucket_name: str, acl: Optional[FakeAcl]) -> None: | ||||
|  | ||||
| @ -1,11 +1,53 @@ | ||||
| import json | ||||
| from datetime import datetime | ||||
| from enum import Enum | ||||
| from typing import Any, Dict, List | ||||
| 
 | ||||
| _EVENT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" | ||||
| 
 | ||||
| S3_OBJECT_CREATE_COPY = "s3:ObjectCreated:Copy" | ||||
| S3_OBJECT_CREATE_PUT = "s3:ObjectCreated:Put" | ||||
| 
 | ||||
| class S3NotificationEvent(str, Enum): | ||||
|     REDUCED_REDUNDANCY_LOST_OBJECT_EVENT = "s3:ReducedRedundancyLostObject" | ||||
|     OBJCET_CREATED_EVENT = "s3:ObjectCreated:*" | ||||
|     OBJECT_CREATED_PUT_EVENT = "s3:ObjectCreated:Put" | ||||
|     OBJECT_CREATED_POST_EVENT = "s3:ObjectCreated:Post" | ||||
|     OBJECT_CREATED_COPY_EVENT = "s3:ObjectCreated:Copy" | ||||
|     OBJECT_CREATED_COMPLETE_MULTIPART_UPLOAD_EVENT = ( | ||||
|         "s3:ObjectCreated:CompleteMultipartUpload" | ||||
|     ) | ||||
|     OBJECT_REMOVED_EVENT = "s3:ObjectRemoved:*" | ||||
|     OBJECTREMOVED_DELETE_EVENT = "s3:ObjectRemoved:Delete" | ||||
|     OBJECTREMOVED_DELETE_MARKER_CREATED_EVENT = "s3:ObjectRemoved:DeleteMarkerCreated" | ||||
|     OBJECT_RESTORE_EVENT = "s3:ObjectRestore:*" | ||||
|     OBJECT_RESTORE_POST_EVENT = "s3:ObjectRestore:Post" | ||||
|     OBJECT_RESTORE_COMPLETED_EVENT = "s3:ObjectRestore:Completed" | ||||
|     REPLICATION_EVENT = "s3:Replication:*" | ||||
|     REPLICATION_OPERATION_FAILED_REPLICATION_EVENT = ( | ||||
|         "s3:Replication:OperationFailedReplication" | ||||
|     ) | ||||
|     REPLICATION_OPERATION_NOT_TRACKED_EVENT = "s3:Replication:OperationNotTracked" | ||||
|     REPLICATION_OPERATION_MISSED_THRESHOLD_EVENT = ( | ||||
|         "s3:Replication:OperationMissedThreshold" | ||||
|     ) | ||||
|     REPLICATION_OPERATION_REPLICATED_AFTER_THRESHOLD_EVENT = ( | ||||
|         "s3:Replication:OperationReplicatedAfterThreshold" | ||||
|     ) | ||||
|     OBJECT_RESTORE_DELETE_EVENT = "s3:ObjectRestore:Delete" | ||||
|     LIFECYCLE_TRANSITION_EVENT = "s3:LifecycleTransition" | ||||
|     INTELLIGENT_TIERING_EVENT = "s3:IntelligentTiering" | ||||
|     OBJECT_ACL_EVENT = "s3:ObjectAcl:Put" | ||||
|     LIFECYCLE_EXPIRATION_EVENT = "s3:LifecycleExpiration:*" | ||||
|     LIFECYCLEEXPIRATION_DELETE_EVENT = "s3:LifecycleExpiration:Delete" | ||||
|     LIFECYCLE_EXPIRATION_DELETE_MARKER_CREATED_EVENT = ( | ||||
|         "s3:LifecycleExpiration:DeleteMarkerCreated" | ||||
|     ) | ||||
|     OBJECT_TAGGING_EVENT = "s3:ObjectTagging:*" | ||||
|     OBJECT_TAGGING_PUT_EVENT = "s3:ObjectTagging:Put" | ||||
|     OBJECTTAGGING_DELETE_EVENT = "s3:ObjectTagging:Delete" | ||||
| 
 | ||||
|     @classmethod | ||||
|     def events(self) -> List[str]: | ||||
|         return sorted([item.value for item in S3NotificationEvent]) | ||||
| 
 | ||||
| 
 | ||||
| def _get_s3_event( | ||||
| @ -41,7 +83,9 @@ def _get_region_from_arn(arn: str) -> str: | ||||
|     return arn.split(":")[3] | ||||
| 
 | ||||
| 
 | ||||
| def send_event(account_id: str, event_name: str, bucket: Any, key: Any) -> None: | ||||
| def send_event( | ||||
|     account_id: str, event_name: S3NotificationEvent, bucket: Any, key: Any | ||||
| ) -> None: | ||||
|     if bucket.notification_configuration is None: | ||||
|         return | ||||
| 
 | ||||
|  | ||||
| @ -63,6 +63,7 @@ from .models import ( | ||||
|     get_canned_acl, | ||||
|     s3_backends, | ||||
| ) | ||||
| from .notifications import S3NotificationEvent | ||||
| from .select_object_content import serialize_select | ||||
| from .utils import ( | ||||
|     ARCHIVE_STORAGE_CLASSES, | ||||
| @ -2113,18 +2114,6 @@ class S3Response(BaseResponse): | ||||
|             ("CloudFunction", "lambda"), | ||||
|         ] | ||||
| 
 | ||||
|         event_names = [ | ||||
|             "s3:ReducedRedundancyLostObject", | ||||
|             "s3:ObjectCreated:*", | ||||
|             "s3:ObjectCreated:Put", | ||||
|             "s3:ObjectCreated:Post", | ||||
|             "s3:ObjectCreated:Copy", | ||||
|             "s3:ObjectCreated:CompleteMultipartUpload", | ||||
|             "s3:ObjectRemoved:*", | ||||
|             "s3:ObjectRemoved:Delete", | ||||
|             "s3:ObjectRemoved:DeleteMarkerCreated", | ||||
|         ] | ||||
| 
 | ||||
|         found_notifications = ( | ||||
|             0  # Tripwire -- if this is not ever set, then there were no notifications | ||||
|         ) | ||||
| @ -2151,8 +2140,8 @@ class S3Response(BaseResponse): | ||||
|                         n["Event"] = [n["Event"]] | ||||
| 
 | ||||
|                     for event in n["Event"]: | ||||
|                         if event not in event_names: | ||||
|                             raise InvalidNotificationEvent() | ||||
|                         if event not in S3NotificationEvent.events(): | ||||
|                             raise InvalidNotificationEvent(event) | ||||
| 
 | ||||
|                     # Parse out the filters: | ||||
|                     if n.get("Filter"): | ||||
|  | ||||
| @ -2336,7 +2336,7 @@ def test_put_bucket_notification_errors(): | ||||
|     assert err.value.response["Error"]["Code"] == "InvalidArgument" | ||||
|     assert ( | ||||
|         err.value.response["Error"]["Message"] | ||||
|         == "The event is not supported for notifications" | ||||
|         == "The event 'notarealeventname' is not supported for notifications. Supported events are as follows: ['s3:IntelligentTiering', 's3:LifecycleExpiration:*', 's3:LifecycleExpiration:Delete', 's3:LifecycleExpiration:DeleteMarkerCreated', 's3:LifecycleTransition', 's3:ObjectAcl:Put', 's3:ObjectCreated:*', 's3:ObjectCreated:CompleteMultipartUpload', 's3:ObjectCreated:Copy', 's3:ObjectCreated:Post', 's3:ObjectCreated:Put', 's3:ObjectRemoved:*', 's3:ObjectRemoved:Delete', 's3:ObjectRemoved:DeleteMarkerCreated', 's3:ObjectRestore:*', 's3:ObjectRestore:Completed', 's3:ObjectRestore:Delete', 's3:ObjectRestore:Post', 's3:ObjectTagging:*', 's3:ObjectTagging:Delete', 's3:ObjectTagging:Put', 's3:ReducedRedundancyLostObject', 's3:Replication:*', 's3:Replication:OperationFailedReplication', 's3:Replication:OperationMissedThreshold', 's3:Replication:OperationNotTracked', 's3:Replication:OperationReplicatedAfterThreshold']" | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user