From 3c810ad152425a212a2eb5cbb2cc78018f87b066 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anton=20Gr=C3=BCbel?= Date: Mon, 1 Mar 2021 14:20:36 +0100 Subject: [PATCH] Add eventbridge replay (#3735) * Add events.start_replay * Add events.describe_replay * Add events.list_replays * Add events.cancel_replay * implement actual replay functionality * Fix Python 2.7 issues --- moto/events/exceptions.py | 7 + moto/events/models.py | 410 +++++++++++++++++--- moto/events/responses.py | 37 ++ tests/test_events/test_events.py | 633 ++++++++++++++++++++++++++++++- 4 files changed, 1038 insertions(+), 49 deletions(-) diff --git a/moto/events/exceptions.py b/moto/events/exceptions.py index e31bda09b..b077cc983 100644 --- a/moto/events/exceptions.py +++ b/moto/events/exceptions.py @@ -2,6 +2,13 @@ from __future__ import unicode_literals from moto.core.exceptions import JsonRESTError +class IllegalStatusException(JsonRESTError): + code = 400 + + def __init__(self, message): + super(IllegalStatusException, self).__init__("IllegalStatusException", message) + + class InvalidEventPatternException(JsonRESTError): code = 400 diff --git a/moto/events/models.py b/moto/events/models.py index 9f3cdec6a..2b7b623a1 100644 --- a/moto/events/models.py +++ b/moto/events/models.py @@ -3,18 +3,21 @@ import os import re import json import sys +from collections import namedtuple from datetime import datetime +from enum import Enum, unique from boto3 import Session from moto.core.exceptions import JsonRESTError -from moto.core import ACCOUNT_ID, BaseBackend, CloudFormationModel -from moto.core.utils import unix_time +from moto.core import ACCOUNT_ID, BaseBackend, CloudFormationModel, BaseModel +from moto.core.utils import unix_time, iso_8601_datetime_without_milliseconds from moto.events.exceptions import ( ValidationException, ResourceNotFoundException, ResourceAlreadyExistsException, InvalidEventPatternException, + IllegalStatusException, ) from moto.utilities.tagging_service import TaggingService @@ -22,6 +25,8 @@ from uuid import uuid4 class Rule(CloudFormationModel): + Arn = namedtuple("Arn", ["service", "resource_type", "resource_id"]) + def _generate_arn(self, name): return "arn:aws:events:{region_name}:111111111111:rule/{name}".format( region_name=self.region_name, name=name @@ -36,7 +41,7 @@ class Rule(CloudFormationModel): self.state = kwargs.get("State") or "ENABLED" self.description = kwargs.get("Description") self.role_arn = kwargs.get("RoleArn") - self.event_bus_name = kwargs.get("EventBusName", "default") + self.event_bus_name = kwargs.get("EventBusName") or "default" self.targets = [] @property @@ -76,6 +81,98 @@ class Rule(CloudFormationModel): if index is not None: self.targets.pop(index) + def send_to_targets(self, event_bus_name, event): + if event_bus_name != self.event_bus_name: + return + + if not self._validate_event(event): + return + + # for now only CW Log groups are supported + for target in self.targets: + arn = self._parse_arn(target["Arn"]) + + if arn.service == "logs" and arn.resource_type == "log-group": + self._send_to_cw_log_group(arn.resource_id, event) + elif arn.service == "events" and not arn.resource_type: + input_template = json.loads(target["InputTransformer"]["InputTemplate"]) + archive_arn = self._parse_arn(input_template["archive-arn"]) + + self._send_to_events_archive(archive_arn.resource_id, event) + else: + raise NotImplementedError("Expr not defined for {0}".format(type(self))) + + def _validate_event(self, event): + for field, pattern in json.loads(self.event_pattern).items(): + if not isinstance(pattern, list): + # to keep it simple at the beginning only pattern with 1 level of depth are validated + continue + + if isinstance(pattern[0], dict): + if "exists" in pattern[0]: + if pattern[0]["exists"] and field not in event: + return False + elif not pattern[0]["exists"] and field in event: + return False + elif event.get(field) not in pattern: + return False + + return True + + def _parse_arn(self, arn): + # http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html + # this method needs probably some more fine tuning, + # when also other targets are supported + elements = arn.split(":", 5) + + service = elements[2] + resource = elements[5] + + if ":" in resource and "/" in resource: + if resource.index(":") < resource.index("/"): + resource_type, resource_id = resource.split(":", 1) + else: + resource_type, resource_id = resource.split("/", 1) + elif ":" in resource: + resource_type, resource_id = resource.split(":", 1) + elif "/" in resource: + resource_type, resource_id = resource.split("/", 1) + else: + resource_type = None + resource_id = resource + + return self.Arn( + service=service, resource_type=resource_type, resource_id=resource_id + ) + + def _send_to_cw_log_group(self, name, event): + from moto.logs import logs_backends + + event_copy = copy.deepcopy(event) + event_copy["time"] = iso_8601_datetime_without_milliseconds( + datetime.utcfromtimestamp(event_copy["time"]) + ) + + log_stream_name = str(uuid4()) + log_events = [ + { + "timestamp": unix_time(datetime.utcnow()), + "message": json.dumps(event_copy), + } + ] + + logs_backends[self.region_name].create_log_stream(name, log_stream_name) + logs_backends[self.region_name].put_log_events( + name, log_stream_name, log_events, None + ) + + def _send_to_events_archive(self, resource_id, event): + archive_name, archive_uuid = resource_id.split(":") + archive = events_backends[self.region_name].archives.get(archive_name) + + if archive.uuid == archive_uuid: + archive.events.append(event) + def get_cfn_attribute(self, attribute_name): from moto.cloudformation.exceptions import UnformattedGetAttTemplateException @@ -233,6 +330,7 @@ class Archive(CloudFormationModel): self.creation_time = unix_time(datetime.utcnow()) self.state = "ENABLED" + self.uuid = str(uuid4()) self.events = [] self.event_bus_name = source_arn.split("/")[-1] @@ -276,19 +374,6 @@ class Archive(CloudFormationModel): event_backend = events_backends[region_name] event_backend.archives.pop(self.name) - def matches_pattern(self, event): - if not self.event_pattern: - return True - - # only works on the first level of the event dict - # logic for nested dicts needs to be implemented - for pattern_key, pattern_value in json.loads(self.event_pattern).items(): - event_value = event.get(pattern_key) - if event_value not in pattern_value: - return False - - return True - def get_cfn_attribute(self, attribute_name): from moto.cloudformation.exceptions import UnformattedGetAttTemplateException @@ -352,6 +437,82 @@ class Archive(CloudFormationModel): event_backend.delete_archive(resource_name) +@unique +class ReplayState(Enum): + # https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_ListReplays.html#API_ListReplays_RequestParameters + STARTING = "STARTING" + RUNNING = "RUNNING" + CANCELLING = "CANCELLING" + COMPLETED = "COMPLETED" + CANCELLED = "CANCELLED" + FAILED = "FAILED" + + +class Replay(BaseModel): + def __init__( + self, + region_name, + name, + description, + source_arn, + start_time, + end_time, + destination, + ): + self.region = region_name + self.name = name + self.description = description + self.source_arn = source_arn + self.event_start_time = start_time + self.event_end_time = end_time + self.destination = destination + + self.state = ReplayState.STARTING + self.start_time = unix_time(datetime.utcnow()) + self.end_time = None + + @property + def arn(self): + return "arn:aws:events:{region}:{account_id}:replay/{name}".format( + region=self.region, account_id=ACCOUNT_ID, name=self.name + ) + + def describe_short(self): + return { + "ReplayName": self.name, + "EventSourceArn": self.source_arn, + "State": self.state.value, + "EventStartTime": self.event_start_time, + "EventEndTime": self.event_end_time, + "ReplayStartTime": self.start_time, + "ReplayEndTime": self.end_time, + } + + def describe(self): + result = { + "ReplayArn": self.arn, + "Description": self.description, + "Destination": self.destination, + } + + result.update(self.describe_short()) + + return result + + def replay_events(self, archive): + event_bus_name = self.destination["Arn"].split("/")[-1] + + for event in archive.events: + for rule in events_backends[self.region].rules.values(): + rule.send_to_targets( + event_bus_name, + dict(event, **{"id": str(uuid4()), "replay-name": self.name}), + ) + + self.state = ReplayState.COMPLETED + self.end_time = unix_time(datetime.utcnow()) + + class EventsBackend(BaseBackend): ACCOUNT_ID = re.compile(r"^(\d{1,12}|\*)$") STATEMENT_ID = re.compile(r"^[a-zA-Z0-9-_]{1,64}$") @@ -366,6 +527,7 @@ class EventsBackend(BaseBackend): self.event_buses = {} self.event_sources = {} self.archives = {} + self.replays = {} self.tagger = TaggingService() self._add_default_event_bus() @@ -402,6 +564,24 @@ class EventsBackend(BaseBackend): return start_index, end_index, new_next_token + def _get_event_bus(self, name): + event_bus_name = name.split("/")[-1] + + event_bus = self.event_buses.get(event_bus_name) + if not event_bus: + raise ResourceNotFoundException( + "Event bus {} does not exist.".format(event_bus_name) + ) + + return event_bus + + def _get_replay(self, name): + replay = self.replays.get(name) + if not replay: + raise ResourceNotFoundException("Replay {} does not exist.".format(name)) + + return replay + def delete_rule(self, name): self.rules_order.pop(self.rules_order.index(name)) arn = self.rules.get(name).arn @@ -510,9 +690,7 @@ class EventsBackend(BaseBackend): def put_events(self, events): num_events = len(events) - if num_events < 1: - raise JsonRESTError("ValidationError", "Need at least 1 event") - elif num_events > 10: + if num_events > 10: # the exact error text is longer, the Value list consists of all the put events raise ValidationException( "1 validation error detected: " @@ -555,25 +733,28 @@ class EventsBackend(BaseBackend): ) continue - entries.append({"EventId": str(uuid4())}) + event_id = str(uuid4()) + entries.append({"EventId": event_id}) - # add to correct archive - # if 'EventBusName' is not espically set, it will stored in the default + # if 'EventBusName' is not especially set, it will be sent to the default one event_bus_name = event.get("EventBusName", "default") - archives = [ - archive - for archive in self.archives.values() - if archive.event_bus_name == event_bus_name - ] - for archive in archives: - event_copy = copy.deepcopy(event) - event_copy.pop("EventBusName", None) + for rule in self.rules.values(): + rule.send_to_targets( + event_bus_name, + { + "version": "0", + "id": event_id, + "detail-type": event["DetailType"], + "source": event["Source"], + "account": ACCOUNT_ID, + "time": event.get("Time", unix_time(datetime.utcnow())), + "region": self.region_name, + "resources": event.get("Resources", []), + "detail": json.loads(event["Detail"]), + }, + ) - if archive.matches_pattern(event): - archive.events.append(event_copy) - - # We dont really need to store the events yet return entries def remove_targets(self, name, ids): @@ -639,12 +820,7 @@ class EventsBackend(BaseBackend): if not name: name = "default" - event_bus = self.event_buses.get(name) - - if not event_bus: - raise JsonRESTError( - "ResourceNotFoundException", "Event bus {} does not exist.".format(name) - ) + event_bus = self._get_event_bus(name) return event_bus @@ -724,11 +900,7 @@ class EventsBackend(BaseBackend): if event_pattern: self._validate_event_pattern(event_pattern) - event_bus_name = source_arn.split("/")[-1] - if event_bus_name not in self.event_buses: - raise ResourceNotFoundException( - "Event bus {} does not exist.".format(event_bus_name) - ) + event_bus = self._get_event_bus(source_arn) if name in self.archives: raise ResourceAlreadyExistsException( @@ -739,6 +911,38 @@ class EventsBackend(BaseBackend): self.region_name, name, source_arn, description, event_pattern, retention ) + rule_event_pattern = json.loads(event_pattern or "{}") + rule_event_pattern["replay-name"] = [{"exists": False}] + + rule = self.put_rule( + "Events-Archive-{}".format(name), + **{ + "EventPattern": json.dumps(rule_event_pattern), + "EventBusName": event_bus.name, + } + ) + self.put_targets( + rule.name, + [ + { + "Id": rule.name, + "Arn": "arn:aws:events:{}:::".format(self.region_name), + "InputTransformer": { + "InputPathsMap": {}, + "InputTemplate": json.dumps( + { + "archive-arn": "{0}:{1}".format( + archive.arn, archive.uuid + ), + "event": "", + "ingestion-time": "", + } + ), + }, + } + ], + ) + self.archives[name] = archive return archive @@ -780,7 +984,8 @@ class EventsBackend(BaseBackend): if state and state not in Archive.VALID_STATES: raise ValidationException( - "1 validation error detected: Value '{0}' at 'state' failed to satisfy constraint: " + "1 validation error detected: " + "Value '{0}' at 'state' failed to satisfy constraint: " "Member must satisfy enum value set: " "[{1}]".format(state, ", ".join(Archive.VALID_STATES)) ) @@ -825,6 +1030,119 @@ class EventsBackend(BaseBackend): archive.delete(self.region_name) + def start_replay( + self, name, description, source_arn, start_time, end_time, destination + ): + event_bus_arn = destination["Arn"] + event_bus_arn_pattern = r"^arn:aws:events:[a-zA-Z0-9-]+:\d{12}:event-bus/" + if not re.match(event_bus_arn_pattern, event_bus_arn): + raise ValidationException( + "Parameter Destination.Arn is not valid. " + "Reason: Must contain an event bus ARN." + ) + + self._get_event_bus(event_bus_arn) + + archive_name = source_arn.split("/")[-1] + archive = self.archives.get(archive_name) + if not archive: + raise ValidationException( + "Parameter EventSourceArn is not valid. " + "Reason: Archive {} does not exist.".format(archive_name) + ) + + if event_bus_arn != archive.source_arn: + raise ValidationException( + "Parameter Destination.Arn is not valid. " + "Reason: Cross event bus replay is not permitted." + ) + + if start_time > end_time: + raise ValidationException( + "Parameter EventEndTime is not valid. " + "Reason: EventStartTime must be before EventEndTime." + ) + + if name in self.replays: + raise ResourceAlreadyExistsException( + "Replay {} already exists.".format(name) + ) + + replay = Replay( + self.region_name, + name, + description, + source_arn, + start_time, + end_time, + destination, + ) + + self.replays[name] = replay + + replay.replay_events(archive) + + return { + "ReplayArn": replay.arn, + "ReplayStartTime": replay.start_time, + "State": ReplayState.STARTING.value, # the replay will be done before returning the response + } + + def describe_replay(self, name): + replay = self._get_replay(name) + + return replay.describe() + + def list_replays(self, name_prefix, source_arn, state): + if [name_prefix, source_arn, state].count(None) < 2: + raise ValidationException( + "At most one filter is allowed for ListReplays. " + "Use either : State, EventSourceArn, or NamePrefix." + ) + + valid_states = sorted([item.value for item in ReplayState]) + if state and state not in valid_states: + raise ValidationException( + "1 validation error detected: " + "Value '{0}' at 'state' failed to satisfy constraint: " + "Member must satisfy enum value set: " + "[{1}]".format(state, ", ".join(valid_states)) + ) + + if [name_prefix, source_arn, state].count(None) == 3: + return [replay.describe_short() for replay in self.replays.values()] + + result = [] + + for replay in self.replays.values(): + if name_prefix and replay.name.startswith(name_prefix): + result.append(replay.describe_short()) + elif source_arn and replay.source_arn == source_arn: + result.append(replay.describe_short()) + elif state and replay.state == state: + result.append(replay.describe_short()) + + return result + + def cancel_replay(self, name): + replay = self._get_replay(name) + + # replays in the state 'COMPLETED' can't be canceled, + # but the implementation is done synchronously, + # so they are done right after the start + if replay.state not in [ + ReplayState.STARTING, + ReplayState.RUNNING, + ReplayState.COMPLETED, + ]: + raise IllegalStatusException( + "Replay {} is not in a valid state for this operation.".format(name) + ) + + replay.state = ReplayState.CANCELLED + + return {"ReplayArn": replay.arn, "State": ReplayState.CANCELLING.value} + events_backends = {} for region in Session().get_available_regions("events"): diff --git a/moto/events/responses.py b/moto/events/responses.py index d1f37b47b..cdf5ca052 100644 --- a/moto/events/responses.py +++ b/moto/events/responses.py @@ -389,3 +389,40 @@ class EventsHandler(BaseResponse): self.events_backend.delete_archive(name) return "", self.response_headers + + def start_replay(self): + name = self._get_param("ReplayName") + description = self._get_param("Description") + source_arn = self._get_param("EventSourceArn") + start_time = self._get_param("EventStartTime") + end_time = self._get_param("EventEndTime") + destination = self._get_param("Destination") + + result = self.events_backend.start_replay( + name, description, source_arn, start_time, end_time, destination + ) + + return json.dumps(result), self.response_headers + + def describe_replay(self): + name = self._get_param("ReplayName") + + result = self.events_backend.describe_replay(name) + + return json.dumps(result), self.response_headers + + def list_replays(self): + name_prefix = self._get_param("NamePrefix") + source_arn = self._get_param("EventSourceArn") + state = self._get_param("State") + + result = self.events_backend.list_replays(name_prefix, source_arn, state) + + return json.dumps({"Replays": result}), self.response_headers + + def cancel_replay(self): + name = self._get_param("ReplayName") + + result = self.events_backend.cancel_replay(name) + + return json.dumps(result), self.response_headers diff --git a/tests/test_events/test_events.py b/tests/test_events/test_events.py index cffd41228..3cb69d40d 100644 --- a/tests/test_events/test_events.py +++ b/tests/test_events/test_events.py @@ -4,12 +4,15 @@ import unittest from datetime import datetime import boto3 +import pytz import sure # noqa from botocore.exceptions import ClientError import pytest +from moto import mock_logs from moto.core import ACCOUNT_ID +from moto.core.utils import iso_8601_datetime_without_milliseconds from moto.events import mock_events RULES = [ @@ -1106,7 +1109,8 @@ def test_list_archives_error_invalid_state(): ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) ex.response["Error"]["Code"].should.contain("ValidationException") ex.response["Error"]["Message"].should.equal( - "1 validation error detected: Value 'invalid' at 'state' failed to satisfy constraint: " + "1 validation error detected: " + "Value 'invalid' at 'state' failed to satisfy constraint: " "Member must satisfy enum value set: " "[ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]" ) @@ -1255,12 +1259,12 @@ def test_archive_actual_events(): client.create_archive( ArchiveName=name_2, EventSourceArn=event_bus_arn, - EventPattern=json.dumps({"DetailType": ["type"], "Source": ["test"]}), + EventPattern=json.dumps({"detail-type": ["type"], "source": ["test"]}), ) client.create_archive( ArchiveName=name_3, EventSourceArn=event_bus_arn, - EventPattern=json.dumps({"DetailType": ["type"], "Source": ["source"]}), + EventPattern=json.dumps({"detail-type": ["type"], "source": ["source"]}), ) # when @@ -1281,3 +1285,626 @@ def test_archive_actual_events(): response = client.describe_archive(ArchiveName=name_3) response["EventCount"].should.equal(1) response["SizeBytes"].should.be.greater_than(0) + + +@mock_events +def test_start_replay(): + # given + client = boto3.client("events", "eu-central-1") + name = "test-replay" + event_bus_arn = "arn:aws:events:eu-central-1:{}:event-bus/default".format( + ACCOUNT_ID + ) + archive_arn = client.create_archive( + ArchiveName="test-archive", EventSourceArn=event_bus_arn, + )["ArchiveArn"] + + # when + response = client.start_replay( + ReplayName=name, + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 2, 1), + EventEndTime=datetime(2021, 2, 2), + Destination={"Arn": event_bus_arn}, + ) + + # then + response["ReplayArn"].should.equal( + "arn:aws:events:eu-central-1:{0}:replay/{1}".format(ACCOUNT_ID, name) + ) + response["ReplayStartTime"].should.be.a(datetime) + response["State"].should.equal("STARTING") + + +@mock_events +def test_start_replay_error_unknown_event_bus(): + # given + client = boto3.client("events", "eu-central-1") + event_bus_name = "unknown" + + # when + with pytest.raises(ClientError) as e: + client.start_replay( + ReplayName="test", + EventSourceArn="arn:aws:events:eu-central-1:{}:archive/test".format( + ACCOUNT_ID + ), + EventStartTime=datetime(2021, 2, 1), + EventEndTime=datetime(2021, 2, 2), + Destination={ + "Arn": "arn:aws:events:eu-central-1:{0}:event-bus/{1}".format( + ACCOUNT_ID, event_bus_name + ), + }, + ) + + # then + ex = e.value + ex.operation_name.should.equal("StartReplay") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ResourceNotFoundException") + ex.response["Error"]["Message"].should.equal( + "Event bus {} does not exist.".format(event_bus_name) + ) + + +@mock_events +def test_start_replay_error_invalid_event_bus_arn(): + # given + client = boto3.client("events", "eu-central-1") + + # when + with pytest.raises(ClientError) as e: + client.start_replay( + ReplayName="test", + EventSourceArn="arn:aws:events:eu-central-1:{}:archive/test".format( + ACCOUNT_ID + ), + EventStartTime=datetime(2021, 2, 1), + EventEndTime=datetime(2021, 2, 2), + Destination={"Arn": "invalid",}, + ) + + # then + ex = e.value + ex.operation_name.should.equal("StartReplay") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ValidationException") + ex.response["Error"]["Message"].should.equal( + "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN." + ) + + +@mock_events +def test_start_replay_error_unknown_archive(): + # given + client = boto3.client("events", "eu-central-1") + archive_name = "unknown" + + # when + with pytest.raises(ClientError) as e: + client.start_replay( + ReplayName="test", + EventSourceArn="arn:aws:events:eu-central-1:{0}:archive/{1}".format( + ACCOUNT_ID, archive_name + ), + EventStartTime=datetime(2021, 2, 1), + EventEndTime=datetime(2021, 2, 2), + Destination={ + "Arn": "arn:aws:events:eu-central-1:{}:event-bus/default".format( + ACCOUNT_ID + ), + }, + ) + + # then + ex = e.value + ex.operation_name.should.equal("StartReplay") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ValidationException") + ex.response["Error"]["Message"].should.equal( + "Parameter EventSourceArn is not valid. " + "Reason: Archive {} does not exist.".format(archive_name) + ) + + +@mock_events +def test_start_replay_error_cross_event_bus(): + # given + client = boto3.client("events", "eu-central-1") + archive_arn = client.create_archive( + ArchiveName="test-archive", + EventSourceArn="arn:aws:events:eu-central-1:{}:event-bus/default".format( + ACCOUNT_ID + ), + )["ArchiveArn"] + event_bus_arn = client.create_event_bus(Name="test-bus")["EventBusArn"] + + # when + with pytest.raises(ClientError) as e: + client.start_replay( + ReplayName="test", + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 2, 1), + EventEndTime=datetime(2021, 2, 2), + Destination={"Arn": event_bus_arn}, + ) + + # then + ex = e.value + ex.operation_name.should.equal("StartReplay") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ValidationException") + ex.response["Error"]["Message"].should.equal( + "Parameter Destination.Arn is not valid. " + "Reason: Cross event bus replay is not permitted." + ) + + +@mock_events +def test_start_replay_error_invalid_end_time(): + # given + client = boto3.client("events", "eu-central-1") + event_bus_arn = "arn:aws:events:eu-central-1:{}:event-bus/default".format( + ACCOUNT_ID + ) + archive_arn = client.create_archive( + ArchiveName="test-archive", EventSourceArn=event_bus_arn, + )["ArchiveArn"] + + # when + with pytest.raises(ClientError) as e: + client.start_replay( + ReplayName="test", + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 2, 2), + EventEndTime=datetime(2021, 2, 1), + Destination={"Arn": event_bus_arn}, + ) + + # then + ex = e.value + ex.operation_name.should.equal("StartReplay") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ValidationException") + ex.response["Error"]["Message"].should.equal( + "Parameter EventEndTime is not valid. " + "Reason: EventStartTime must be before EventEndTime." + ) + + +@mock_events +def test_start_replay_error_duplicate(): + # given + client = boto3.client("events", "eu-central-1") + name = "test-replay" + event_bus_arn = "arn:aws:events:eu-central-1:{}:event-bus/default".format( + ACCOUNT_ID + ) + archive_arn = client.create_archive( + ArchiveName="test-archive", EventSourceArn=event_bus_arn, + )["ArchiveArn"] + client.start_replay( + ReplayName=name, + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 2, 1), + EventEndTime=datetime(2021, 2, 2), + Destination={"Arn": event_bus_arn}, + ) + + # when + with pytest.raises(ClientError) as e: + client.start_replay( + ReplayName=name, + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 2, 1), + EventEndTime=datetime(2021, 2, 2), + Destination={"Arn": event_bus_arn}, + ) + + # then + ex = e.value + ex.operation_name.should.equal("StartReplay") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ResourceAlreadyExistsException") + ex.response["Error"]["Message"].should.equal( + "Replay {} already exists.".format(name) + ) + + +@mock_events +def test_describe_replay(): + # given + client = boto3.client("events", "eu-central-1") + name = "test-replay" + event_bus_arn = "arn:aws:events:eu-central-1:{}:event-bus/default".format( + ACCOUNT_ID + ) + archive_arn = client.create_archive( + ArchiveName="test-archive", EventSourceArn=event_bus_arn, + )["ArchiveArn"] + client.start_replay( + ReplayName=name, + Description="test replay", + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 2, 1, tzinfo=pytz.utc), + EventEndTime=datetime(2021, 2, 2, tzinfo=pytz.utc), + Destination={"Arn": event_bus_arn}, + ) + + # when + response = client.describe_replay(ReplayName=name) + + # then + response["Description"].should.equal("test replay") + response["Destination"].should.equal({"Arn": event_bus_arn}) + response["EventSourceArn"].should.equal(archive_arn) + response["EventStartTime"].should.equal(datetime(2021, 2, 1, tzinfo=pytz.utc)) + response["EventEndTime"].should.equal(datetime(2021, 2, 2, tzinfo=pytz.utc)) + response["ReplayArn"].should.equal( + "arn:aws:events:eu-central-1:{0}:replay/{1}".format(ACCOUNT_ID, name) + ) + response["ReplayName"].should.equal(name) + response["ReplayStartTime"].should.be.a(datetime) + response["ReplayEndTime"].should.be.a(datetime) + response["State"].should.equal("COMPLETED") + + +@mock_events +def test_describe_replay_error_unknown_replay(): + # given + client = boto3.client("events", "eu-central-1") + name = "unknown" + + # when + with pytest.raises(ClientError) as e: + client.describe_replay(ReplayName=name) + + # then + ex = e.value + ex.operation_name.should.equal("DescribeReplay") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ResourceNotFoundException") + ex.response["Error"]["Message"].should.equal( + "Replay {} does not exist.".format(name) + ) + + +@mock_events +def test_list_replays(): + # given + client = boto3.client("events", "eu-central-1") + name = "test-replay" + event_bus_arn = "arn:aws:events:eu-central-1:{}:event-bus/default".format( + ACCOUNT_ID + ) + archive_arn = client.create_archive( + ArchiveName="test-replay", EventSourceArn=event_bus_arn, + )["ArchiveArn"] + client.start_replay( + ReplayName=name, + Description="test replay", + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 2, 1, tzinfo=pytz.utc), + EventEndTime=datetime(2021, 2, 2, tzinfo=pytz.utc), + Destination={"Arn": event_bus_arn}, + ) + + # when + replays = client.list_replays()["Replays"] + + # then + replays.should.have.length_of(1) + replay = replays[0] + replay["EventSourceArn"].should.equal(archive_arn) + replay["EventStartTime"].should.equal(datetime(2021, 2, 1, tzinfo=pytz.utc)) + replay["EventEndTime"].should.equal(datetime(2021, 2, 2, tzinfo=pytz.utc)) + replay["ReplayName"].should.equal(name) + replay["ReplayStartTime"].should.be.a(datetime) + replay["ReplayEndTime"].should.be.a(datetime) + replay["State"].should.equal("COMPLETED") + + +@mock_events +def test_list_replays_with_name_prefix(): + # given + client = boto3.client("events", "eu-central-1") + event_bus_arn = "arn:aws:events:eu-central-1:{}:event-bus/default".format( + ACCOUNT_ID + ) + archive_arn = client.create_archive( + ArchiveName="test-replay", EventSourceArn=event_bus_arn, + )["ArchiveArn"] + client.start_replay( + ReplayName="test", + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 1, 1, tzinfo=pytz.utc), + EventEndTime=datetime(2021, 1, 2, tzinfo=pytz.utc), + Destination={"Arn": event_bus_arn}, + ) + client.start_replay( + ReplayName="test-replay", + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 2, 1, tzinfo=pytz.utc), + EventEndTime=datetime(2021, 2, 2, tzinfo=pytz.utc), + Destination={"Arn": event_bus_arn}, + ) + + # when + replays = client.list_replays(NamePrefix="test-")["Replays"] + + # then + replays.should.have.length_of(1) + replays[0]["ReplayName"].should.equal("test-replay") + + +@mock_events +def test_list_replays_with_source_arn(): + # given + client = boto3.client("events", "eu-central-1") + event_bus_arn = "arn:aws:events:eu-central-1:{}:event-bus/default".format( + ACCOUNT_ID + ) + archive_arn = client.create_archive( + ArchiveName="test-replay", EventSourceArn=event_bus_arn, + )["ArchiveArn"] + client.start_replay( + ReplayName="test", + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 1, 1, tzinfo=pytz.utc), + EventEndTime=datetime(2021, 1, 2, tzinfo=pytz.utc), + Destination={"Arn": event_bus_arn}, + ) + client.start_replay( + ReplayName="test-replay", + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 2, 1, tzinfo=pytz.utc), + EventEndTime=datetime(2021, 2, 2, tzinfo=pytz.utc), + Destination={"Arn": event_bus_arn}, + ) + + # when + replays = client.list_replays(EventSourceArn=archive_arn)["Replays"] + + # then + replays.should.have.length_of(2) + + +@mock_events +def test_list_replays_with_state(): + # given + client = boto3.client("events", "eu-central-1") + event_bus_arn = "arn:aws:events:eu-central-1:{}:event-bus/default".format( + ACCOUNT_ID + ) + archive_arn = client.create_archive( + ArchiveName="test-replay", EventSourceArn=event_bus_arn, + )["ArchiveArn"] + client.start_replay( + ReplayName="test", + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 1, 1, tzinfo=pytz.utc), + EventEndTime=datetime(2021, 1, 2, tzinfo=pytz.utc), + Destination={"Arn": event_bus_arn}, + ) + client.start_replay( + ReplayName="test-replay", + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 2, 1, tzinfo=pytz.utc), + EventEndTime=datetime(2021, 2, 2, tzinfo=pytz.utc), + Destination={"Arn": event_bus_arn}, + ) + + # when + replays = client.list_replays(State="FAILED")["Replays"] + + # then + replays.should.have.length_of(0) + + +@mock_events +def test_list_replays_error_multiple_filters(): + # given + client = boto3.client("events", "eu-central-1") + + # when + with pytest.raises(ClientError) as e: + client.list_replays(NamePrefix="test", State="COMPLETED") + + # then + ex = e.value + ex.operation_name.should.equal("ListReplays") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ValidationException") + ex.response["Error"]["Message"].should.equal( + "At most one filter is allowed for ListReplays. " + "Use either : State, EventSourceArn, or NamePrefix." + ) + + +@mock_events +def test_list_replays_error_invalid_state(): + # given + client = boto3.client("events", "eu-central-1") + + # when + with pytest.raises(ClientError) as e: + client.list_replays(State="invalid") + + # then + ex = e.value + ex.operation_name.should.equal("ListReplays") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ValidationException") + ex.response["Error"]["Message"].should.equal( + "1 validation error detected: " + "Value 'invalid' at 'state' failed to satisfy constraint: " + "Member must satisfy enum value set: " + "[CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]" + ) + + +@mock_events +def test_cancel_replay(): + # given + client = boto3.client("events", "eu-central-1") + name = "test-replay" + event_bus_arn = "arn:aws:events:eu-central-1:{}:event-bus/default".format( + ACCOUNT_ID + ) + archive_arn = client.create_archive( + ArchiveName="test-archive", EventSourceArn=event_bus_arn, + )["ArchiveArn"] + client.start_replay( + ReplayName=name, + Description="test replay", + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 2, 1, tzinfo=pytz.utc), + EventEndTime=datetime(2021, 2, 2, tzinfo=pytz.utc), + Destination={"Arn": event_bus_arn}, + ) + + # when + response = client.cancel_replay(ReplayName=name) + + # then + response["ReplayArn"].should.equal( + "arn:aws:events:eu-central-1:{0}:replay/{1}".format(ACCOUNT_ID, name) + ) + response["State"].should.equal("CANCELLING") + + response = client.describe_replay(ReplayName=name) + response["State"].should.equal("CANCELLED") + + +@mock_events +def test_cancel_replay_error_unknown_replay(): + # given + client = boto3.client("events", "eu-central-1") + name = "unknown" + + # when + with pytest.raises(ClientError) as e: + client.cancel_replay(ReplayName=name) + + # then + ex = e.value + ex.operation_name.should.equal("CancelReplay") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ResourceNotFoundException") + ex.response["Error"]["Message"].should.equal( + "Replay {} does not exist.".format(name) + ) + + +@mock_events +def test_cancel_replay_error_illegal_state(): + # given + client = boto3.client("events", "eu-central-1") + name = "test-replay" + event_bus_arn = "arn:aws:events:eu-central-1:{}:event-bus/default".format( + ACCOUNT_ID + ) + archive_arn = client.create_archive( + ArchiveName="test-archive", EventSourceArn=event_bus_arn, + )["ArchiveArn"] + client.start_replay( + ReplayName=name, + Description="test replay", + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 2, 1, tzinfo=pytz.utc), + EventEndTime=datetime(2021, 2, 2, tzinfo=pytz.utc), + Destination={"Arn": event_bus_arn}, + ) + client.cancel_replay(ReplayName=name) + + # when + with pytest.raises(ClientError) as e: + client.cancel_replay(ReplayName=name) + + # then + ex = e.value + ex.operation_name.should.equal("CancelReplay") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("IllegalStatusException") + ex.response["Error"]["Message"].should.equal( + "Replay {} is not in a valid state for this operation.".format(name) + ) + + +@mock_events +@mock_logs +def test_start_replay_send_to_log_group(): + # given + client = boto3.client("events", "eu-central-1") + logs_client = boto3.client("logs", "eu-central-1") + log_group_name = "/test-group" + rule_name = "test-rule" + logs_client.create_log_group(logGroupName=log_group_name) + event_bus_arn = "arn:aws:events:eu-central-1:{}:event-bus/default".format( + ACCOUNT_ID + ) + client.put_rule(Name=rule_name, EventPattern=json.dumps({"account": [ACCOUNT_ID]})) + client.put_targets( + Rule=rule_name, + Targets=[ + { + "Id": "test", + "Arn": "arn:aws:logs:eu-central-1:{0}:log-group:{1}".format( + ACCOUNT_ID, log_group_name + ), + } + ], + ) + archive_arn = client.create_archive( + ArchiveName="test-archive", EventSourceArn=event_bus_arn, + )["ArchiveArn"] + event_time = datetime(2021, 1, 1, 12, 23, 34) + client.put_events( + Entries=[ + { + "Time": event_time, + "Source": "source", + "DetailType": "type", + "Detail": json.dumps({"key": "value"}), + } + ] + ) + + # when + client.start_replay( + ReplayName="test-replay", + EventSourceArn=archive_arn, + EventStartTime=datetime(2021, 1, 1), + EventEndTime=datetime(2021, 1, 2), + Destination={"Arn": event_bus_arn}, + ) + + # then + events = sorted( + logs_client.filter_log_events(logGroupName=log_group_name)["events"], + key=lambda item: item["eventId"], + ) + event_original = json.loads(events[0]["message"]) + event_original["version"].should.equal("0") + event_original["id"].should_not.be.empty + event_original["detail-type"].should.equal("type") + event_original["source"].should.equal("source") + event_original["time"].should.equal( + iso_8601_datetime_without_milliseconds(event_time) + ) + event_original["region"].should.equal("eu-central-1") + event_original["resources"].should.be.empty + event_original["detail"].should.equal({"key": "value"}) + event_original.should_not.have.key("replay-name") + + event_replay = json.loads(events[1]["message"]) + event_replay["version"].should.equal("0") + event_replay["id"].should_not.equal(event_original["id"]) + event_replay["detail-type"].should.equal("type") + event_replay["source"].should.equal("source") + event_replay["time"].should.equal(event_original["time"]) + event_replay["region"].should.equal("eu-central-1") + event_replay["resources"].should.be.empty + event_replay["detail"].should.equal({"key": "value"}) + event_replay["replay-name"].should.equal("test-replay")