Events: Support for partner event sources (#6804)
This commit is contained in:
parent
5e8b457bc9
commit
b7efcdde6f
@ -3077,7 +3077,7 @@
|
||||
|
||||
## events
|
||||
<details>
|
||||
<summary>71% implemented</summary>
|
||||
<summary>80% implemented</summary>
|
||||
|
||||
- [ ] activate_event_source
|
||||
- [X] cancel_replay
|
||||
@ -3086,7 +3086,7 @@
|
||||
- [X] create_connection
|
||||
- [ ] create_endpoint
|
||||
- [X] create_event_bus
|
||||
- [ ] create_partner_event_source
|
||||
- [X] create_partner_event_source
|
||||
- [ ] deactivate_event_source
|
||||
- [ ] deauthorize_connection
|
||||
- [X] delete_api_destination
|
||||
@ -3094,15 +3094,15 @@
|
||||
- [X] delete_connection
|
||||
- [ ] delete_endpoint
|
||||
- [X] delete_event_bus
|
||||
- [ ] delete_partner_event_source
|
||||
- [X] delete_partner_event_source
|
||||
- [X] delete_rule
|
||||
- [X] describe_api_destination
|
||||
- [X] describe_archive
|
||||
- [X] describe_connection
|
||||
- [ ] describe_endpoint
|
||||
- [X] describe_event_bus
|
||||
- [ ] describe_event_source
|
||||
- [ ] describe_partner_event_source
|
||||
- [X] describe_event_source
|
||||
- [X] describe_partner_event_source
|
||||
- [X] describe_replay
|
||||
- [X] describe_rule
|
||||
- [X] disable_rule
|
||||
@ -3121,7 +3121,7 @@
|
||||
- [X] list_tags_for_resource
|
||||
- [X] list_targets_by_rule
|
||||
- [X] put_events
|
||||
- [ ] put_partner_events
|
||||
- [X] put_partner_events
|
||||
- [X] put_permission
|
||||
- [X] put_rule
|
||||
- [X] put_targets
|
||||
|
@ -34,7 +34,7 @@ events
|
||||
- [X] create_connection
|
||||
- [ ] create_endpoint
|
||||
- [X] create_event_bus
|
||||
- [ ] create_partner_event_source
|
||||
- [X] create_partner_event_source
|
||||
- [ ] deactivate_event_source
|
||||
- [ ] deauthorize_connection
|
||||
- [X] delete_api_destination
|
||||
@ -42,15 +42,15 @@ events
|
||||
- [X] delete_connection
|
||||
- [ ] delete_endpoint
|
||||
- [X] delete_event_bus
|
||||
- [ ] delete_partner_event_source
|
||||
- [X] delete_partner_event_source
|
||||
- [X] delete_rule
|
||||
- [X] describe_api_destination
|
||||
- [X] describe_archive
|
||||
- [X] describe_connection
|
||||
- [ ] describe_endpoint
|
||||
- [X] describe_event_bus
|
||||
- [ ] describe_event_source
|
||||
- [ ] describe_partner_event_source
|
||||
- [X] describe_event_source
|
||||
- [X] describe_partner_event_source
|
||||
- [X] describe_replay
|
||||
- [X] describe_rule
|
||||
- [X] disable_rule
|
||||
@ -76,9 +76,14 @@ events
|
||||
- EventBridge Archive
|
||||
- SQS Queue + FIFO Queue
|
||||
- Cross-region/account EventBus
|
||||
- HTTP requests (only enabled when MOTO_EVENTS_INVOKE_HTTP=true)
|
||||
|
||||
|
||||
- [X] put_partner_events
|
||||
|
||||
Validation of the entries is not yet implemented.
|
||||
|
||||
|
||||
- [ ] put_partner_events
|
||||
- [X] put_permission
|
||||
- [X] put_rule
|
||||
- [X] put_targets
|
||||
|
@ -966,6 +966,24 @@ class EventPatternParser:
|
||||
raise InvalidEventPatternException(reason="Invalid JSON")
|
||||
|
||||
|
||||
class PartnerEventSource(BaseModel):
|
||||
def __init__(self, region: str, name: str):
|
||||
self.name = name
|
||||
self.arn = f"arn:aws:events:{region}::event-source/aws.partner/{name}"
|
||||
self.created_on = unix_time()
|
||||
self.accounts: List[str] = []
|
||||
self.state = "ACTIVE"
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"Arn": self.arn,
|
||||
"CreatedBy": self.name.split("/")[0],
|
||||
"CreatedOn": self.created_on,
|
||||
"Name": self.name,
|
||||
"State": self.state,
|
||||
}
|
||||
|
||||
|
||||
class EventsBackend(BaseBackend):
|
||||
"""
|
||||
Some Moto services are configured to generate events and send them to EventBridge. See the AWS documentation here:
|
||||
@ -991,7 +1009,7 @@ class EventsBackend(BaseBackend):
|
||||
super().__init__(region_name, account_id)
|
||||
self.next_tokens: Dict[str, int] = {}
|
||||
self.event_buses: Dict[str, EventBus] = {}
|
||||
self.event_sources: Dict[str, str] = {}
|
||||
self.event_sources: Dict[str, PartnerEventSource] = {}
|
||||
self.archives: Dict[str, Archive] = {}
|
||||
self.replays: Dict[str, Replay] = {}
|
||||
self.tagger = TaggingService()
|
||||
@ -999,6 +1017,8 @@ class EventsBackend(BaseBackend):
|
||||
self._add_default_event_bus()
|
||||
self.connections: Dict[str, Connection] = {}
|
||||
self.destinations: Dict[str, Destination] = {}
|
||||
self.partner_event_sources: Dict[str, PartnerEventSource] = {}
|
||||
self.approved_parent_event_bus_names: List[str] = []
|
||||
|
||||
@staticmethod
|
||||
def default_vpc_endpoint_service(
|
||||
@ -1041,7 +1061,7 @@ class EventsBackend(BaseBackend):
|
||||
return start_index, end_index, new_next_token
|
||||
|
||||
def _get_event_bus(self, name: str) -> EventBus:
|
||||
event_bus_name = name.split("/")[-1]
|
||||
event_bus_name = name.split(f"{self.account_id}:event-bus/")[-1]
|
||||
|
||||
event_bus = self.event_buses.get(event_bus_name)
|
||||
if not event_bus:
|
||||
@ -1901,5 +1921,36 @@ class EventsBackend(BaseBackend):
|
||||
f"An api-destination '{name}' does not exist."
|
||||
)
|
||||
|
||||
def create_partner_event_source(self, name: str, account_id: str) -> None:
|
||||
# https://docs.aws.amazon.com/eventbridge/latest/onboarding/amazon_eventbridge_partner_onboarding_guide.html
|
||||
if name not in self.partner_event_sources:
|
||||
self.partner_event_sources[name] = PartnerEventSource(
|
||||
region=self.region_name, name=name
|
||||
)
|
||||
self.partner_event_sources[name].accounts.append(account_id)
|
||||
client_backend = events_backends[account_id][self.region_name]
|
||||
client_backend.event_sources[name] = self.partner_event_sources[name]
|
||||
|
||||
def describe_event_source(self, name: str) -> PartnerEventSource:
|
||||
return self.event_sources[name]
|
||||
|
||||
def describe_partner_event_source(self, name: str) -> PartnerEventSource:
|
||||
return self.partner_event_sources[name]
|
||||
|
||||
def delete_partner_event_source(self, name: str, account_id: str) -> None:
|
||||
client_backend = events_backends[account_id][self.region_name]
|
||||
client_backend.event_sources[name].state = "DELETED"
|
||||
|
||||
def put_partner_events(self, entries: List[Dict[str, Any]]) -> None:
|
||||
"""
|
||||
Validation of the entries is not yet implemented.
|
||||
"""
|
||||
# This implementation is very basic currently, just to verify the behaviour
|
||||
# In the future we could create a batch of events, grouped by source, and send them all at once
|
||||
for entry in entries:
|
||||
source = entry["Source"]
|
||||
for account_id in self.partner_event_sources[source].accounts:
|
||||
events_backends[account_id][self.region_name].put_events([entry])
|
||||
|
||||
|
||||
events_backends = BackendDict(EventsBackend, "events")
|
||||
|
@ -512,3 +512,33 @@ class EventsHandler(BaseResponse):
|
||||
name = self._get_param("Name")
|
||||
self.events_backend.delete_api_destination(name)
|
||||
return self._create_response({})
|
||||
|
||||
def create_partner_event_source(self) -> str:
|
||||
name = self._get_param("Name")
|
||||
account_id = self._get_param("Account")
|
||||
self.events_backend.create_partner_event_source(
|
||||
name=name,
|
||||
account_id=account_id,
|
||||
)
|
||||
return "{}"
|
||||
|
||||
def describe_event_source(self) -> str:
|
||||
name = self._get_param("Name")
|
||||
event_source = self.events_backend.describe_event_source(name)
|
||||
return json.dumps(event_source.to_dict())
|
||||
|
||||
def describe_partner_event_source(self) -> str:
|
||||
name = self._get_param("Name")
|
||||
event_source = self.events_backend.describe_partner_event_source(name)
|
||||
return json.dumps({"Arn": event_source.arn, "Name": event_source.name})
|
||||
|
||||
def delete_partner_event_source(self) -> str:
|
||||
name = self._get_param("Name")
|
||||
account_id = self._get_param("Account")
|
||||
self.events_backend.delete_partner_event_source(name, account_id)
|
||||
return "{}"
|
||||
|
||||
def put_partner_events(self) -> str:
|
||||
entries = self._get_param("Entries")
|
||||
self.events_backend.put_partner_events(entries)
|
||||
return json.dumps({"Entries": [], "FailedEntryCount": 0})
|
||||
|
105
tests/test_events/test_events_partners_integration.py
Normal file
105
tests/test_events/test_events_partners_integration.py
Normal file
@ -0,0 +1,105 @@
|
||||
import boto3
|
||||
import json
|
||||
import os
|
||||
|
||||
from datetime import datetime
|
||||
from moto import mock_events, mock_logs, settings
|
||||
from unittest import mock, SkipTest
|
||||
|
||||
|
||||
@mock_events
|
||||
def test_create_partner_event_bus():
|
||||
client_account = "111122223333"
|
||||
client = boto3.client("events", "us-east-1")
|
||||
client.create_partner_event_source(
|
||||
Name="mypartner/actions/action1", Account=client_account
|
||||
)
|
||||
resp = client.describe_partner_event_source(Name="mypartner/actions/action1")
|
||||
assert (
|
||||
resp["Arn"]
|
||||
== "arn:aws:events:us-east-1::event-source/aws.partner/mypartner/actions/action1"
|
||||
)
|
||||
assert resp["Name"] == "mypartner/actions/action1"
|
||||
|
||||
|
||||
@mock_events
|
||||
def test_describe_partner_event_busses():
|
||||
if settings.TEST_SERVER_MODE:
|
||||
raise SkipTest("Can't change accounts easily in ServerMode")
|
||||
# Having a separate partner account isn't 'strictly necessary - we could do that from the main account
|
||||
# But it makes it more obvious for the reader that we're accessing different accounts IMO
|
||||
partner_account = "111122223333"
|
||||
client_account = "444455556666"
|
||||
client = boto3.client("events", "us-east-1")
|
||||
name = "mypartner/actions/action1"
|
||||
with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": partner_account}):
|
||||
client.create_partner_event_source(Name=name, Account=client_account)
|
||||
|
||||
with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": client_account}):
|
||||
resp = client.describe_event_source(Name=name)
|
||||
assert resp["Name"] == name
|
||||
assert resp["CreatedBy"] == "mypartner"
|
||||
assert resp["State"] == "ACTIVE"
|
||||
|
||||
client.create_event_bus(Name=name, EventSourceName=name)
|
||||
resp = client.describe_event_bus(Name=name)
|
||||
assert resp["Name"] == name
|
||||
|
||||
with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": partner_account}):
|
||||
client.delete_partner_event_source(Name=name, Account=client_account)
|
||||
|
||||
with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": client_account}):
|
||||
resp = client.describe_event_source(Name=name)
|
||||
assert resp["State"] == "DELETED"
|
||||
|
||||
|
||||
@mock_events
|
||||
@mock_logs
|
||||
def test_put_partner_events():
|
||||
if settings.TEST_SERVER_MODE:
|
||||
raise SkipTest("Can't change accounts easily in ServerMode")
|
||||
|
||||
partner_account = "111122223333"
|
||||
client_account = "444455556666"
|
||||
events = boto3.client("events", "us-east-1")
|
||||
logs = boto3.client("logs", region_name="us-east-1")
|
||||
name = "mypartner/actions/action1"
|
||||
with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": partner_account}):
|
||||
events.create_partner_event_source(Name=name, Account=client_account)
|
||||
|
||||
with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": client_account}):
|
||||
events.create_event_bus(Name=name, EventSourceName=name)
|
||||
|
||||
log_group_name = "/test-group"
|
||||
rule_name = "test-rule"
|
||||
logs.create_log_group(logGroupName=log_group_name)
|
||||
log_group_arn = (
|
||||
f"arn:aws:logs:us-east-1:{client_account}:log-group:{log_group_name}"
|
||||
)
|
||||
events.put_rule(
|
||||
Name=rule_name,
|
||||
EventPattern=json.dumps({"account": [client_account]}),
|
||||
State="ENABLED",
|
||||
)
|
||||
events.put_targets(
|
||||
Rule=rule_name,
|
||||
Targets=[{"Id": "logs", "Arn": log_group_arn}],
|
||||
)
|
||||
|
||||
with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": partner_account}):
|
||||
resp = events.put_partner_events(
|
||||
Entries=[
|
||||
{
|
||||
"Time": datetime.now(),
|
||||
"Source": name,
|
||||
"DetailType": "test-detail-type",
|
||||
"Detail": json.dumps({"foo": "123", "bar": "123"}),
|
||||
}
|
||||
]
|
||||
)
|
||||
assert resp["FailedEntryCount"] == 0
|
||||
|
||||
with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": client_account}):
|
||||
log_events = logs.filter_log_events(logGroupName=log_group_name)["events"]
|
||||
assert len(log_events) == 1
|
||||
assert "test-detail-type" in log_events[0]["message"]
|
Loading…
x
Reference in New Issue
Block a user