From 9a5f0a065e0cd8e58b6ee11a0134ebf49b5514f5 Mon Sep 17 00:00:00 2001 From: Akira Noda <61897166+tsugumi-sys@users.noreply.github.com> Date: Thu, 8 Feb 2024 05:02:51 +0900 Subject: [PATCH] Techdebt: Use typing for event message (#7315) --- moto/events/models.py | 45 ++++++++++++++++++++++--------------------- moto/events/utils.py | 17 +++++++++++++++- 2 files changed, 39 insertions(+), 23 deletions(-) diff --git a/moto/events/models.py b/moto/events/models.py index 11d7c62bd..ad78fac7f 100644 --- a/moto/events/models.py +++ b/moto/events/models.py @@ -34,7 +34,7 @@ from moto.utilities.arns import parse_arn from moto.utilities.paginator import paginate from moto.utilities.tagging_service import TaggingService -from .utils import PAGINATION_MODEL +from .utils import _BASE_EVENT_MESSAGE, PAGINATION_MODEL, EventMessageType # Sentinel to signal the absence of a field for `Exists` pattern matching UNDEFINED = object() @@ -113,7 +113,7 @@ class Rule(CloudFormationModel): if index is not None: self.targets.pop(index) - def send_to_targets(self, event: Dict[str, Any]) -> None: + def send_to_targets(self, event: EventMessageType) -> None: if not self.event_pattern.matches_event(event): return @@ -166,7 +166,7 @@ class Rule(CloudFormationModel): else: raise NotImplementedError(f"Expr not defined for {type(self)}") - def _send_to_cw_log_group(self, name: str, event: Dict[str, Any]) -> None: + def _send_to_cw_log_group(self, name: str, event: EventMessageType) -> None: from moto.logs import logs_backends event_copy = copy.deepcopy(event) @@ -183,7 +183,9 @@ class Rule(CloudFormationModel): log_backend.create_log_stream(name, log_stream_name) log_backend.put_log_events(name, log_stream_name, log_events) - def _send_to_events_archive(self, resource_id: str, event: Dict[str, Any]) -> None: + def _send_to_events_archive( + self, resource_id: str, event: EventMessageType + ) -> None: archive_name, archive_uuid = resource_id.split(":") archive = events_backends[self.account_id][self.region_name].archives.get( archive_name @@ -197,7 +199,7 @@ class Rule(CloudFormationModel): return backend.destinations[destination_name] def _send_to_sqs_queue( - self, resource_id: str, event: Dict[str, Any], group_id: Optional[str] = None + self, resource_id: str, event: EventMessageType, group_id: Optional[str] = None ) -> None: from moto.sqs import sqs_backends @@ -557,7 +559,7 @@ class Archive(CloudFormationModel): self.state = "ENABLED" self.uuid = str(random.uuid4()) - self.events: List[str] = [] + self.events: List[EventMessageType] = [] self.event_bus_name = source_arn.split("/")[-1] def describe_short(self) -> Dict[str, Any]: @@ -873,16 +875,18 @@ class EventPattern: def get_pattern(self) -> Dict[str, Any]: return self._pattern - def matches_event(self, event: Dict[str, Any]) -> bool: + def matches_event(self, event: EventMessageType) -> bool: if not self._pattern: return True event = json.loads(json.dumps(event)) return self._does_event_match(event, self._pattern) - def _does_event_match(self, event: Dict[str, Any], pattern: Dict[str, Any]) -> bool: + def _does_event_match( + self, event: EventMessageType, pattern: Dict[str, Any] + ) -> bool: items_and_filters = [(event.get(k, UNDEFINED), v) for k, v in pattern.items()] nested_filter_matches = [ - self._does_event_match(item, nested_filter) + self._does_event_match(item, nested_filter) # type: ignore for item, nested_filter in items_and_filters if isinstance(nested_filter, dict) ] @@ -1352,19 +1356,16 @@ class EventsBackend(BaseBackend): event_bus = self.describe_event_bus(event_bus_name) for rule in event_bus.rules.values(): - rule.send_to_targets( - { - "version": "0", - "id": event_id, - "detail-type": event["DetailType"], - "source": event["Source"], - "account": self.account_id, - "time": event.get("Time", unix_time()), - "region": self.region_name, - "resources": event.get("Resources", []), - "detail": json.loads(event["Detail"]), - }, - ) + event_msg = copy.deepcopy(_BASE_EVENT_MESSAGE) + event_msg["id"] = event_id + event_msg["detail-type"] = event["DetailType"] + event_msg["source"] = event["Source"] + event_msg["account"] = self.account_id + event_msg["time"] = event.get("Time", unix_time()) + event_msg["region"] = self.region_name + event_msg["resources"] = event.get("Resources", []) + event_msg["detail"] = json.loads(event["Detail"]) + rule.send_to_targets(event_msg) return entries diff --git a/moto/events/utils.py b/moto/events/utils.py index 416182880..7d7e226fa 100644 --- a/moto/events/utils.py +++ b/moto/events/utils.py @@ -13,11 +13,14 @@ EventMessageType = TypedDict( "detail-type": "Required[Union[str, List[str]]]", "source": "Required[Union[str, List[str]]]", "account": str, - "time": str, + # support float type for internal use of moto. + "time": "Union[str, float]", + "replay-name": str, "region": str, "resources": List[str], "detail": "Required[Dict[str, Any]]", }, + total=False, ) PAGINATION_MODEL = { @@ -36,3 +39,15 @@ PAGINATION_MODEL = { "fail_on_invalid_token": False, }, } + +_BASE_EVENT_MESSAGE: EventMessageType = { + "version": "0", + "id": "17793124-05d4-b198-2fde-7ededc63b103", + "detail-type": "", + "source": "", + "account": "", + "time": "", + "region": "", + "resources": [], + "detail": {}, +}