Events: Support CrossAccount events (#5866)
This commit is contained in:
parent
931bb6d960
commit
b0ee64f24a
@ -140,6 +140,15 @@ events
|
|||||||
- [X] list_tags_for_resource
|
- [X] list_tags_for_resource
|
||||||
- [X] list_targets_by_rule
|
- [X] list_targets_by_rule
|
||||||
- [X] put_events
|
- [X] put_events
|
||||||
|
|
||||||
|
The following targets are supported at the moment:
|
||||||
|
|
||||||
|
- CloudWatch Log Group
|
||||||
|
- EventBridge Archive
|
||||||
|
- SQS Queue + FIFO Queue
|
||||||
|
- Cross-region/account EventBus
|
||||||
|
|
||||||
|
|
||||||
- [ ] put_partner_events
|
- [ ] put_partner_events
|
||||||
- [X] put_permission
|
- [X] put_permission
|
||||||
- [X] put_rule
|
- [X] put_rule
|
||||||
|
@ -36,7 +36,9 @@ UNDEFINED = object()
|
|||||||
|
|
||||||
|
|
||||||
class Rule(CloudFormationModel):
|
class Rule(CloudFormationModel):
|
||||||
Arn = namedtuple("Arn", ["service", "resource_type", "resource_id"])
|
Arn = namedtuple(
|
||||||
|
"Arn", ["account", "region", "service", "resource_type", "resource_id"]
|
||||||
|
)
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@ -122,6 +124,7 @@ class Rule(CloudFormationModel):
|
|||||||
# - CloudWatch Log Group
|
# - CloudWatch Log Group
|
||||||
# - EventBridge Archive
|
# - EventBridge Archive
|
||||||
# - SQS Queue + FIFO Queue
|
# - SQS Queue + FIFO Queue
|
||||||
|
# - Cross-region/account EventBus
|
||||||
for target in self.targets:
|
for target in self.targets:
|
||||||
arn = self._parse_arn(target["Arn"])
|
arn = self._parse_arn(target["Arn"])
|
||||||
|
|
||||||
@ -135,17 +138,25 @@ class Rule(CloudFormationModel):
|
|||||||
elif arn.service == "sqs":
|
elif arn.service == "sqs":
|
||||||
group_id = target.get("SqsParameters", {}).get("MessageGroupId")
|
group_id = target.get("SqsParameters", {}).get("MessageGroupId")
|
||||||
self._send_to_sqs_queue(arn.resource_id, event, group_id)
|
self._send_to_sqs_queue(arn.resource_id, event, group_id)
|
||||||
|
elif arn.service == "events" and arn.resource_type == "event-bus":
|
||||||
|
cross_account_backend: EventsBackend = events_backends[arn.account][
|
||||||
|
arn.region
|
||||||
|
]
|
||||||
|
new_event = {
|
||||||
|
"Source": event["source"],
|
||||||
|
"DetailType": event["detail-type"],
|
||||||
|
"Detail": json.dumps(event["detail"]),
|
||||||
|
"EventBusName": arn.resource_id,
|
||||||
|
}
|
||||||
|
cross_account_backend.put_events([new_event])
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError(f"Expr not defined for {type(self)}")
|
raise NotImplementedError(f"Expr not defined for {type(self)}")
|
||||||
|
|
||||||
def _parse_arn(self, arn):
|
def _parse_arn(self, arn: str) -> Arn:
|
||||||
# http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html
|
# http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html
|
||||||
# this method needs probably some more fine tuning,
|
# this method needs probably some more fine tuning,
|
||||||
# when also other targets are supported
|
# when also other targets are supported
|
||||||
elements = arn.split(":", 5)
|
_, _, service, region, account, resource = arn.split(":", 5)
|
||||||
|
|
||||||
service = elements[2]
|
|
||||||
resource = elements[5]
|
|
||||||
|
|
||||||
if ":" in resource and "/" in resource:
|
if ":" in resource and "/" in resource:
|
||||||
if resource.index(":") < resource.index("/"):
|
if resource.index(":") < resource.index("/"):
|
||||||
@ -161,7 +172,11 @@ class Rule(CloudFormationModel):
|
|||||||
resource_id = resource
|
resource_id = resource
|
||||||
|
|
||||||
return self.Arn(
|
return self.Arn(
|
||||||
service=service, resource_type=resource_type, resource_id=resource_id
|
account=account,
|
||||||
|
region=region,
|
||||||
|
service=service,
|
||||||
|
resource_type=resource_type,
|
||||||
|
resource_id=resource_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _send_to_cw_log_group(self, name, event):
|
def _send_to_cw_log_group(self, name, event):
|
||||||
@ -925,11 +940,18 @@ class EventPatternParser:
|
|||||||
|
|
||||||
class EventsBackend(BaseBackend):
|
class EventsBackend(BaseBackend):
|
||||||
"""
|
"""
|
||||||
When a event occurs, the appropriate targets are triggered for a subset of usecases.
|
Some Moto services are configured to generate events and send them to EventBridge. See the AWS documentation here:
|
||||||
|
https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-service-event.html
|
||||||
|
|
||||||
Supported events: S3:CreateBucket
|
Events that currently supported
|
||||||
|
|
||||||
Supported targets: AWSLambda functions
|
- S3:CreateBucket
|
||||||
|
|
||||||
|
Targets that are currently supported
|
||||||
|
|
||||||
|
- AWSLambda functions
|
||||||
|
|
||||||
|
Please let us know if you want support for an event/target that is not yet listed here.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
ACCOUNT_ID = re.compile(r"^(\d{1,12}|\*)$")
|
ACCOUNT_ID = re.compile(r"^(\d{1,12}|\*)$")
|
||||||
@ -1069,7 +1091,7 @@ class EventsBackend(BaseBackend):
|
|||||||
self.tagger.delete_all_tags_for_resource(arn)
|
self.tagger.delete_all_tags_for_resource(arn)
|
||||||
self.rules.pop(name)
|
self.rules.pop(name)
|
||||||
|
|
||||||
def describe_rule(self, name):
|
def describe_rule(self, name: str) -> Rule:
|
||||||
rule = self.rules.get(name)
|
rule = self.rules.get(name)
|
||||||
if not rule:
|
if not rule:
|
||||||
raise ResourceNotFoundException(f"Rule {name} does not exist.")
|
raise ResourceNotFoundException(f"Rule {name} does not exist.")
|
||||||
@ -1174,6 +1196,14 @@ class EventsBackend(BaseBackend):
|
|||||||
rule.put_targets(targets)
|
rule.put_targets(targets)
|
||||||
|
|
||||||
def put_events(self, events):
|
def put_events(self, events):
|
||||||
|
"""
|
||||||
|
The following targets are supported at the moment:
|
||||||
|
|
||||||
|
- CloudWatch Log Group
|
||||||
|
- EventBridge Archive
|
||||||
|
- SQS Queue + FIFO Queue
|
||||||
|
- Cross-region/account EventBus
|
||||||
|
"""
|
||||||
num_events = len(events)
|
num_events = len(events)
|
||||||
|
|
||||||
if num_events > 10:
|
if num_events > 10:
|
||||||
|
@ -1,10 +1,13 @@
|
|||||||
import json
|
import json
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from unittest import SkipTest, mock
|
||||||
|
|
||||||
import boto3
|
import boto3
|
||||||
|
import os
|
||||||
|
|
||||||
import sure # noqa # pylint: disable=unused-import
|
import sure # noqa # pylint: disable=unused-import
|
||||||
|
|
||||||
from moto import mock_events, mock_sqs, mock_logs
|
from moto import mock_events, mock_sqs, mock_logs, settings
|
||||||
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
|
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
|
||||||
from moto.core.utils import iso_8601_datetime_without_milliseconds
|
from moto.core.utils import iso_8601_datetime_without_milliseconds
|
||||||
|
|
||||||
@ -316,3 +319,96 @@ def test_moto_matches_none_value_with_exists_filter():
|
|||||||
{"foo": None, "bar": "123"},
|
{"foo": None, "bar": "123"},
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_events
|
||||||
|
@mock_sqs
|
||||||
|
def test_put_events_event_bus_forwarding_rules():
|
||||||
|
if settings.TEST_SERVER_MODE:
|
||||||
|
raise SkipTest("Cross-account test - easiest to just test in DecoratorMode")
|
||||||
|
|
||||||
|
# EventBus1 --> EventBus2 --> SQS
|
||||||
|
account1 = ACCOUNT_ID
|
||||||
|
account2 = "222222222222"
|
||||||
|
event_bus_name1 = "asdf"
|
||||||
|
event_bus_name2 = "erty"
|
||||||
|
events_client = boto3.client("events", "eu-central-1")
|
||||||
|
sqs_client = boto3.client("sqs", region_name="eu-central-1")
|
||||||
|
|
||||||
|
pattern = {
|
||||||
|
"source": ["source1"],
|
||||||
|
"detail-type": ["test-detail-type"],
|
||||||
|
"detail": {
|
||||||
|
"test": [{"exists": True}],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": account2}):
|
||||||
|
# Setup SQS rule in account 2
|
||||||
|
queue_url = sqs_client.create_queue(QueueName="test-queue")["QueueUrl"]
|
||||||
|
queue_arn = sqs_client.get_queue_attributes(
|
||||||
|
QueueUrl=queue_url, AttributeNames=["QueueArn"]
|
||||||
|
)["Attributes"]["QueueArn"]
|
||||||
|
|
||||||
|
event_bus_arn2 = events_client.create_event_bus(Name=event_bus_name2)[
|
||||||
|
"EventBusArn"
|
||||||
|
]
|
||||||
|
|
||||||
|
events_client.put_rule(
|
||||||
|
Name="event_bus_2_rule",
|
||||||
|
EventPattern=json.dumps(pattern),
|
||||||
|
State="ENABLED",
|
||||||
|
EventBusName=event_bus_name2,
|
||||||
|
)
|
||||||
|
|
||||||
|
events_client.put_targets(
|
||||||
|
Rule="event_bus_2_rule",
|
||||||
|
EventBusName=event_bus_name2,
|
||||||
|
Targets=[{"Id": "sqs-dedup-fifo", "Arn": queue_arn}],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Setup EventBus1
|
||||||
|
events_client.create_event_bus(Name=event_bus_name1)["EventBusArn"]
|
||||||
|
|
||||||
|
events_client.put_rule(
|
||||||
|
Name="event_bus_1_rule",
|
||||||
|
RoleArn=f"arn:aws:iam::{account1}:role/Administrator",
|
||||||
|
EventPattern=json.dumps(pattern),
|
||||||
|
State="ENABLED",
|
||||||
|
EventBusName=event_bus_name1,
|
||||||
|
)
|
||||||
|
|
||||||
|
events_client.put_targets(
|
||||||
|
Rule="event_bus_1_rule",
|
||||||
|
EventBusName=event_bus_name1,
|
||||||
|
Targets=[
|
||||||
|
{
|
||||||
|
"Id": "event_bus_2",
|
||||||
|
"Arn": event_bus_arn2,
|
||||||
|
"RoleArn": "arn:aws:iam::123456789012:role/Administrator",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
test_events = [
|
||||||
|
{
|
||||||
|
"Source": "source1",
|
||||||
|
"DetailType": "test-detail-type",
|
||||||
|
"Detail": json.dumps({"test": "true"}),
|
||||||
|
"EventBusName": event_bus_name1,
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
events_client.put_events(Entries=test_events)
|
||||||
|
|
||||||
|
with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": account2}):
|
||||||
|
# Verify SQS messages were received in account 2
|
||||||
|
|
||||||
|
response = sqs_client.receive_message(QueueUrl=queue_url)
|
||||||
|
|
||||||
|
response["Messages"].should.have.length_of(1)
|
||||||
|
|
||||||
|
message = json.loads(response["Messages"][0]["Body"])
|
||||||
|
message["source"].should.equal("source1")
|
||||||
|
message["detail-type"].should.equal("test-detail-type")
|
||||||
|
message["detail"].should.equal({"test": "true"})
|
||||||
|
Loading…
Reference in New Issue
Block a user