Techdebt: Use typing for event message (#7315)

This commit is contained in:
Akira Noda 2024-02-08 05:02:51 +09:00 committed by GitHub
parent 652eabda51
commit 9a5f0a065e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 39 additions and 23 deletions

View File

@ -34,7 +34,7 @@ from moto.utilities.arns import parse_arn
from moto.utilities.paginator import paginate
from moto.utilities.tagging_service import TaggingService
from .utils import PAGINATION_MODEL
from .utils import _BASE_EVENT_MESSAGE, PAGINATION_MODEL, EventMessageType
# Sentinel to signal the absence of a field for `Exists` pattern matching
UNDEFINED = object()
@ -113,7 +113,7 @@ class Rule(CloudFormationModel):
if index is not None:
self.targets.pop(index)
def send_to_targets(self, event: Dict[str, Any]) -> None:
def send_to_targets(self, event: EventMessageType) -> None:
if not self.event_pattern.matches_event(event):
return
@ -166,7 +166,7 @@ class Rule(CloudFormationModel):
else:
raise NotImplementedError(f"Expr not defined for {type(self)}")
def _send_to_cw_log_group(self, name: str, event: Dict[str, Any]) -> None:
def _send_to_cw_log_group(self, name: str, event: EventMessageType) -> None:
from moto.logs import logs_backends
event_copy = copy.deepcopy(event)
@ -183,7 +183,9 @@ class Rule(CloudFormationModel):
log_backend.create_log_stream(name, log_stream_name)
log_backend.put_log_events(name, log_stream_name, log_events)
def _send_to_events_archive(self, resource_id: str, event: Dict[str, Any]) -> None:
def _send_to_events_archive(
self, resource_id: str, event: EventMessageType
) -> None:
archive_name, archive_uuid = resource_id.split(":")
archive = events_backends[self.account_id][self.region_name].archives.get(
archive_name
@ -197,7 +199,7 @@ class Rule(CloudFormationModel):
return backend.destinations[destination_name]
def _send_to_sqs_queue(
self, resource_id: str, event: Dict[str, Any], group_id: Optional[str] = None
self, resource_id: str, event: EventMessageType, group_id: Optional[str] = None
) -> None:
from moto.sqs import sqs_backends
@ -557,7 +559,7 @@ class Archive(CloudFormationModel):
self.state = "ENABLED"
self.uuid = str(random.uuid4())
self.events: List[str] = []
self.events: List[EventMessageType] = []
self.event_bus_name = source_arn.split("/")[-1]
def describe_short(self) -> Dict[str, Any]:
@ -873,16 +875,18 @@ class EventPattern:
def get_pattern(self) -> Dict[str, Any]:
return self._pattern
def matches_event(self, event: Dict[str, Any]) -> bool:
def matches_event(self, event: EventMessageType) -> bool:
if not self._pattern:
return True
event = json.loads(json.dumps(event))
return self._does_event_match(event, self._pattern)
def _does_event_match(self, event: Dict[str, Any], pattern: Dict[str, Any]) -> bool:
def _does_event_match(
self, event: EventMessageType, pattern: Dict[str, Any]
) -> bool:
items_and_filters = [(event.get(k, UNDEFINED), v) for k, v in pattern.items()]
nested_filter_matches = [
self._does_event_match(item, nested_filter)
self._does_event_match(item, nested_filter) # type: ignore
for item, nested_filter in items_and_filters
if isinstance(nested_filter, dict)
]
@ -1352,19 +1356,16 @@ class EventsBackend(BaseBackend):
event_bus = self.describe_event_bus(event_bus_name)
for rule in event_bus.rules.values():
rule.send_to_targets(
{
"version": "0",
"id": event_id,
"detail-type": event["DetailType"],
"source": event["Source"],
"account": self.account_id,
"time": event.get("Time", unix_time()),
"region": self.region_name,
"resources": event.get("Resources", []),
"detail": json.loads(event["Detail"]),
},
)
event_msg = copy.deepcopy(_BASE_EVENT_MESSAGE)
event_msg["id"] = event_id
event_msg["detail-type"] = event["DetailType"]
event_msg["source"] = event["Source"]
event_msg["account"] = self.account_id
event_msg["time"] = event.get("Time", unix_time())
event_msg["region"] = self.region_name
event_msg["resources"] = event.get("Resources", [])
event_msg["detail"] = json.loads(event["Detail"])
rule.send_to_targets(event_msg)
return entries

View File

@ -13,11 +13,14 @@ EventMessageType = TypedDict(
"detail-type": "Required[Union[str, List[str]]]",
"source": "Required[Union[str, List[str]]]",
"account": str,
"time": str,
# support float type for internal use of moto.
"time": "Union[str, float]",
"replay-name": str,
"region": str,
"resources": List[str],
"detail": "Required[Dict[str, Any]]",
},
total=False,
)
PAGINATION_MODEL = {
@ -36,3 +39,15 @@ PAGINATION_MODEL = {
"fail_on_invalid_token": False,
},
}
_BASE_EVENT_MESSAGE: EventMessageType = {
"version": "0",
"id": "17793124-05d4-b198-2fde-7ededc63b103",
"detail-type": "",
"source": "",
"account": "",
"time": "",
"region": "",
"resources": [],
"detail": {},
}