diff --git a/docs/docs/services/events.rst b/docs/docs/services/events.rst index 84a723f14..096b0f70d 100644 --- a/docs/docs/services/events.rst +++ b/docs/docs/services/events.rst @@ -140,6 +140,15 @@ events - [X] list_tags_for_resource - [X] list_targets_by_rule - [X] put_events + + The following targets are supported at the moment: + + - CloudWatch Log Group + - EventBridge Archive + - SQS Queue + FIFO Queue + - Cross-region/account EventBus + + - [ ] put_partner_events - [X] put_permission - [X] put_rule diff --git a/moto/events/models.py b/moto/events/models.py index 305c3de5b..dad78538a 100644 --- a/moto/events/models.py +++ b/moto/events/models.py @@ -36,7 +36,9 @@ UNDEFINED = object() class Rule(CloudFormationModel): - Arn = namedtuple("Arn", ["service", "resource_type", "resource_id"]) + Arn = namedtuple( + "Arn", ["account", "region", "service", "resource_type", "resource_id"] + ) def __init__( self, @@ -122,6 +124,7 @@ class Rule(CloudFormationModel): # - CloudWatch Log Group # - EventBridge Archive # - SQS Queue + FIFO Queue + # - Cross-region/account EventBus for target in self.targets: arn = self._parse_arn(target["Arn"]) @@ -135,17 +138,25 @@ class Rule(CloudFormationModel): elif arn.service == "sqs": group_id = target.get("SqsParameters", {}).get("MessageGroupId") self._send_to_sqs_queue(arn.resource_id, event, group_id) + elif arn.service == "events" and arn.resource_type == "event-bus": + cross_account_backend: EventsBackend = events_backends[arn.account][ + arn.region + ] + new_event = { + "Source": event["source"], + "DetailType": event["detail-type"], + "Detail": json.dumps(event["detail"]), + "EventBusName": arn.resource_id, + } + cross_account_backend.put_events([new_event]) else: raise NotImplementedError(f"Expr not defined for {type(self)}") - def _parse_arn(self, arn): + def _parse_arn(self, arn: str) -> Arn: # http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html # this method needs probably some more fine tuning, # when also other targets are supported - elements = arn.split(":", 5) - - service = elements[2] - resource = elements[5] + _, _, service, region, account, resource = arn.split(":", 5) if ":" in resource and "/" in resource: if resource.index(":") < resource.index("/"): @@ -161,7 +172,11 @@ class Rule(CloudFormationModel): resource_id = resource return self.Arn( - service=service, resource_type=resource_type, resource_id=resource_id + account=account, + region=region, + service=service, + resource_type=resource_type, + resource_id=resource_id, ) def _send_to_cw_log_group(self, name, event): @@ -925,11 +940,18 @@ class EventPatternParser: class EventsBackend(BaseBackend): """ - When a event occurs, the appropriate targets are triggered for a subset of usecases. + Some Moto services are configured to generate events and send them to EventBridge. See the AWS documentation here: + https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-service-event.html - Supported events: S3:CreateBucket + Events that currently supported - Supported targets: AWSLambda functions + - S3:CreateBucket + + Targets that are currently supported + + - AWSLambda functions + + Please let us know if you want support for an event/target that is not yet listed here. """ ACCOUNT_ID = re.compile(r"^(\d{1,12}|\*)$") @@ -1069,7 +1091,7 @@ class EventsBackend(BaseBackend): self.tagger.delete_all_tags_for_resource(arn) self.rules.pop(name) - def describe_rule(self, name): + def describe_rule(self, name: str) -> Rule: rule = self.rules.get(name) if not rule: raise ResourceNotFoundException(f"Rule {name} does not exist.") @@ -1174,6 +1196,14 @@ class EventsBackend(BaseBackend): rule.put_targets(targets) def put_events(self, events): + """ + The following targets are supported at the moment: + + - CloudWatch Log Group + - EventBridge Archive + - SQS Queue + FIFO Queue + - Cross-region/account EventBus + """ num_events = len(events) if num_events > 10: diff --git a/tests/test_events/test_events_integration.py b/tests/test_events/test_events_integration.py index 1ba9550c1..e75756414 100644 --- a/tests/test_events/test_events_integration.py +++ b/tests/test_events/test_events_integration.py @@ -1,10 +1,13 @@ import json from datetime import datetime +from unittest import SkipTest, mock import boto3 +import os + import sure # noqa # pylint: disable=unused-import -from moto import mock_events, mock_sqs, mock_logs +from moto import mock_events, mock_sqs, mock_logs, settings from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID from moto.core.utils import iso_8601_datetime_without_milliseconds @@ -316,3 +319,96 @@ def test_moto_matches_none_value_with_exists_filter(): {"foo": None, "bar": "123"}, ], ) + + +@mock_events +@mock_sqs +def test_put_events_event_bus_forwarding_rules(): + if settings.TEST_SERVER_MODE: + raise SkipTest("Cross-account test - easiest to just test in DecoratorMode") + + # EventBus1 --> EventBus2 --> SQS + account1 = ACCOUNT_ID + account2 = "222222222222" + event_bus_name1 = "asdf" + event_bus_name2 = "erty" + events_client = boto3.client("events", "eu-central-1") + sqs_client = boto3.client("sqs", region_name="eu-central-1") + + pattern = { + "source": ["source1"], + "detail-type": ["test-detail-type"], + "detail": { + "test": [{"exists": True}], + }, + } + + with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": account2}): + # Setup SQS rule in account 2 + queue_url = sqs_client.create_queue(QueueName="test-queue")["QueueUrl"] + queue_arn = sqs_client.get_queue_attributes( + QueueUrl=queue_url, AttributeNames=["QueueArn"] + )["Attributes"]["QueueArn"] + + event_bus_arn2 = events_client.create_event_bus(Name=event_bus_name2)[ + "EventBusArn" + ] + + events_client.put_rule( + Name="event_bus_2_rule", + EventPattern=json.dumps(pattern), + State="ENABLED", + EventBusName=event_bus_name2, + ) + + events_client.put_targets( + Rule="event_bus_2_rule", + EventBusName=event_bus_name2, + Targets=[{"Id": "sqs-dedup-fifo", "Arn": queue_arn}], + ) + + # Setup EventBus1 + events_client.create_event_bus(Name=event_bus_name1)["EventBusArn"] + + events_client.put_rule( + Name="event_bus_1_rule", + RoleArn=f"arn:aws:iam::{account1}:role/Administrator", + EventPattern=json.dumps(pattern), + State="ENABLED", + EventBusName=event_bus_name1, + ) + + events_client.put_targets( + Rule="event_bus_1_rule", + EventBusName=event_bus_name1, + Targets=[ + { + "Id": "event_bus_2", + "Arn": event_bus_arn2, + "RoleArn": "arn:aws:iam::123456789012:role/Administrator", + }, + ], + ) + + test_events = [ + { + "Source": "source1", + "DetailType": "test-detail-type", + "Detail": json.dumps({"test": "true"}), + "EventBusName": event_bus_name1, + } + ] + + events_client.put_events(Entries=test_events) + + with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": account2}): + # Verify SQS messages were received in account 2 + + response = sqs_client.receive_message(QueueUrl=queue_url) + + response["Messages"].should.have.length_of(1) + + message = json.loads(response["Messages"][0]["Body"]) + message["source"].should.equal("source1") + message["detail-type"].should.equal("test-detail-type") + message["detail"].should.equal({"test": "true"})