From 9c89c24cafdac58df785199dbea0e7ee64e8e25b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anton=20Gr=C3=BCbel?= Date: Tue, 30 Mar 2021 15:13:10 +0200 Subject: [PATCH] Add events target integration for sqs queue (#3815) --- moto/events/models.py | 60 +++++++++--- moto/events/responses.py | 24 +---- tests/test_events/test_events.py | 78 +++++++++++++-- tests/test_events/test_events_targets.py | 120 +++++++++++++++++++++++ 4 files changed, 239 insertions(+), 43 deletions(-) create mode 100644 tests/test_events/test_events_targets.py diff --git a/moto/events/models.py b/moto/events/models.py index da75d4483..6818aa3ec 100644 --- a/moto/events/models.py +++ b/moto/events/models.py @@ -100,7 +100,10 @@ class Rule(CloudFormationModel): if not self._validate_event(event): return - # for now only CW Log groups are supported + # supported targets + # - CloudWatch Log Group + # - EventBridge Archive + # - SQS Queue (not FIFO) for target in self.targets: arn = self._parse_arn(target["Arn"]) @@ -111,6 +114,8 @@ class Rule(CloudFormationModel): archive_arn = self._parse_arn(input_template["archive-arn"]) self._send_to_events_archive(archive_arn.resource_id, event) + elif arn.service == "sqs": + self._send_to_sqs_queue(arn.resource_id, event) else: raise NotImplementedError("Expr not defined for {0}".format(type(self))) @@ -185,6 +190,18 @@ class Rule(CloudFormationModel): if archive.uuid == archive_uuid: archive.events.append(event) + def _send_to_sqs_queue(self, resource_id, event): + from moto.sqs import sqs_backends + + event_copy = copy.deepcopy(event) + event_copy["time"] = iso_8601_datetime_without_milliseconds( + datetime.utcfromtimestamp(event_copy["time"]) + ) + + sqs_backends[self.region_name].send_message( + queue_name=resource_id, message_body=json.dumps(event_copy) + ) + def get_cfn_attribute(self, attribute_name): from moto.cloudformation.exceptions import UnformattedGetAttTemplateException @@ -709,14 +726,30 @@ class EventsBackend(BaseBackend): self.rules_order.append(new_rule.name) return new_rule - def put_targets(self, name, targets): + def put_targets(self, name, event_bus_name, targets): + # super simple ARN check + invalid_arn = next( + ( + target["Arn"] + for target in targets + if not re.match(r"arn:[\d\w:\-/]*", target["Arn"]) + ), + None, + ) + if invalid_arn: + raise ValidationException( + "Parameter {} is not valid. " + "Reason: Provided Arn is not in correct format.".format(invalid_arn) + ) + rule = self.rules.get(name) - if rule: - rule.put_targets(targets) - return True + if not rule: + raise ResourceNotFoundException( + "Rule {0} does not exist on EventBus {1}.".format(name, event_bus_name) + ) - return False + rule.put_targets(targets) def put_events(self, events): num_events = len(events) @@ -788,18 +821,16 @@ class EventsBackend(BaseBackend): return entries - def remove_targets(self, name, ids): + def remove_targets(self, name, event_bus_name, ids): rule = self.rules.get(name) - if rule: - rule.remove_targets(ids) - return {"FailedEntries": [], "FailedEntryCount": 0} - else: - raise JsonRESTError( - "ResourceNotFoundException", - "An entity that you specified does not exist", + if not rule: + raise ResourceNotFoundException( + "Rule {0} does not exist on EventBus {1}.".format(name, event_bus_name) ) + rule.remove_targets(ids) + def test_event_pattern(self): raise NotImplementedError() @@ -955,6 +986,7 @@ class EventsBackend(BaseBackend): ) self.put_targets( rule.name, + rule.event_bus_name, [ { "Id": rule.name, diff --git a/moto/events/responses.py b/moto/events/responses.py index bc016f858..78109c7f5 100644 --- a/moto/events/responses.py +++ b/moto/events/responses.py @@ -207,18 +207,10 @@ class EventsHandler(BaseResponse): def put_targets(self): rule_name = self._get_param("Rule") + event_bus_name = self._get_param("EventBusName", "default") targets = self._get_param("Targets") - if not rule_name: - return self.error("ValidationException", "Parameter Rule is required.") - - if not targets: - return self.error("ValidationException", "Parameter Targets is required.") - - if not self.events_backend.put_targets(rule_name, targets): - return self.error( - "ResourceNotFoundException", "Rule " + rule_name + " does not exist." - ) + self.events_backend.put_targets(rule_name, event_bus_name, targets) return ( json.dumps({"FailedEntryCount": 0, "FailedEntries": []}), @@ -227,18 +219,10 @@ class EventsHandler(BaseResponse): def remove_targets(self): rule_name = self._get_param("Rule") + event_bus_name = self._get_param("EventBusName", "default") ids = self._get_param("Ids") - if not rule_name: - return self.error("ValidationException", "Parameter Rule is required.") - - if not ids: - return self.error("ValidationException", "Parameter Ids is required.") - - if not self.events_backend.remove_targets(rule_name, ids): - return self.error( - "ResourceNotFoundException", "Rule " + rule_name + " does not exist." - ) + self.events_backend.remove_targets(rule_name, event_bus_name, ids) return ( json.dumps({"FailedEntryCount": 0, "FailedEntries": []}), diff --git a/tests/test_events/test_events.py b/tests/test_events/test_events.py index 3a08fd595..073ba56cd 100644 --- a/tests/test_events/test_events.py +++ b/tests/test_events/test_events.py @@ -301,14 +301,21 @@ def test_update_rule_with_targets(): @mock_events -def test_remove_targets_errors(): - client = boto3.client("events", "us-east-1") +def test_remove_targets_error_unknown_rule(): + # given + client = boto3.client("events", "eu-central-1") - client.remove_targets.when.called_with( - Rule="non-existent", Ids=["Id12345678"] - ).should.throw( - client.exceptions.ResourceNotFoundException, - "An entity that you specified does not exist", + # when + with pytest.raises(ClientError) as e: + client.remove_targets(Rule="unknown", Ids=["something"]) + + # then + ex = e.value + ex.operation_name.should.equal("RemoveTargets") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ResourceNotFoundException") + ex.response["Error"]["Message"].should.equal( + "Rule unknown does not exist on EventBus default." ) @@ -328,7 +335,7 @@ def test_put_targets(): targets_before = len(targets) assert targets_before == 0 - targets_data = [{"Arn": "test_arn", "Id": "test_id"}] + targets_data = [{"Arn": "arn:aws:s3:::test-arn", "Id": "test_id"}] resp = client.put_targets(Rule=rule_name, Targets=targets_data) assert resp["FailedEntryCount"] == 0 assert len(resp["FailedEntries"]) == 0 @@ -337,10 +344,63 @@ def test_put_targets(): targets_after = len(targets) assert targets_before + 1 == targets_after - assert targets[0]["Arn"] == "test_arn" + assert targets[0]["Arn"] == "arn:aws:s3:::test-arn" assert targets[0]["Id"] == "test_id" +@mock_events +def test_put_targets_error_invalid_arn(): + # given + client = boto3.client("events", "eu-central-1") + rule_name = "test-rule" + client.put_rule( + Name=rule_name, + EventPattern=json.dumps({"account": [ACCOUNT_ID]}), + State="ENABLED", + ) + + # when + with pytest.raises(ClientError) as e: + client.put_targets( + Rule=rule_name, + Targets=[ + {"Id": "s3", "Arn": "arn:aws:s3:::test-bucket"}, + {"Id": "s3", "Arn": "test-bucket"}, + ], + ) + + # then + ex = e.value + ex.operation_name.should.equal("PutTargets") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ValidationException") + ex.response["Error"]["Message"].should.equal( + "Parameter test-bucket is not valid. " + "Reason: Provided Arn is not in correct format." + ) + + +@mock_events +def test_put_targets_error_unknown_rule(): + # given + client = boto3.client("events", "eu-central-1") + + # when + with pytest.raises(ClientError) as e: + client.put_targets( + Rule="unknown", Targets=[{"Id": "s3", "Arn": "arn:aws:s3:::test-bucket"}] + ) + + # then + ex = e.value + ex.operation_name.should.equal("PutTargets") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ResourceNotFoundException") + ex.response["Error"]["Message"].should.equal( + "Rule unknown does not exist on EventBus default." + ) + + @mock_events def test_permissions(): client = boto3.client("events", "eu-central-1") diff --git a/tests/test_events/test_events_targets.py b/tests/test_events/test_events_targets.py new file mode 100644 index 000000000..08915edce --- /dev/null +++ b/tests/test_events/test_events_targets.py @@ -0,0 +1,120 @@ +import json +from datetime import datetime + +import boto3 +import sure # noqa + +from moto import mock_events, mock_sqs, mock_logs +from moto.core import ACCOUNT_ID +from moto.core.utils import iso_8601_datetime_without_milliseconds + + +@mock_events +@mock_logs +def test_send_to_cw_log_group(): + # given + client_events = boto3.client("events", "eu-central-1") + client_logs = boto3.client("logs", region_name="eu-central-1") + log_group_name = "/test-group" + rule_name = "test-rule" + client_logs.create_log_group(logGroupName=log_group_name) + client_events.put_rule( + Name=rule_name, + EventPattern=json.dumps({"account": [ACCOUNT_ID]}), + State="ENABLED", + ) + client_events.put_targets( + Rule=rule_name, + Targets=[ + { + "Id": "logs", + "Arn": "arn:aws:logs:eu-central-1:{0}:log-group:{1}".format( + ACCOUNT_ID, log_group_name + ), + } + ], + ) + + # when + event_time = datetime(2021, 1, 1, 12, 23, 34) + client_events.put_events( + Entries=[ + { + "Time": event_time, + "Source": "source", + "DetailType": "type", + "Detail": json.dumps({"key": "value"}), + } + ], + ) + + # then + response = client_logs.filter_log_events(logGroupName=log_group_name) + response["events"].should.have.length_of(1) + event = response["events"][0] + event["logStreamName"].should_not.be.empty + event["timestamp"].should.be.a(float) + event["ingestionTime"].should.be.a(int) + event["eventId"].should_not.be.empty + + message = json.loads(event["message"]) + message["version"].should.equal("0") + message["id"].should_not.be.empty + message["detail-type"].should.equal("type") + message["source"].should.equal("source") + message["time"].should.equal(iso_8601_datetime_without_milliseconds(event_time)) + message["region"].should.equal("eu-central-1") + message["resources"].should.be.empty + message["detail"].should.equal({"key": "value"}) + + +@mock_events +@mock_sqs +def test_send_to_sqs_queue(): + # given + client_events = boto3.client("events", "eu-central-1") + client_sqs = boto3.client("sqs", region_name="eu-central-1") + rule_name = "test-rule" + queue_url = client_sqs.create_queue(QueueName="test-queue")["QueueUrl"] + queue_arn = client_sqs.get_queue_attributes( + QueueUrl=queue_url, AttributeNames=["QueueArn"] + )["Attributes"]["QueueArn"] + client_events.put_rule( + Name=rule_name, + EventPattern=json.dumps({"account": [ACCOUNT_ID]}), + State="ENABLED", + ) + client_events.put_targets( + Rule=rule_name, Targets=[{"Id": "sqs", "Arn": queue_arn}], + ) + + # when + event_time = datetime(2021, 1, 1, 12, 23, 34) + client_events.put_events( + Entries=[ + { + "Time": event_time, + "Source": "source", + "DetailType": "type", + "Detail": json.dumps({"key": "value"}), + } + ], + ) + + # then + response = client_sqs.receive_message(QueueUrl=queue_url) + response["Messages"].should.have.length_of(1) + message = response["Messages"][0] + message["MessageId"].should_not.be.empty + message["ReceiptHandle"].should_not.be.empty + message["MD5OfBody"].should_not.be.empty + + body = json.loads(message["Body"]) + body["version"].should.equal("0") + body["id"].should_not.be.empty + body["detail-type"].should.equal("type") + body["source"].should.equal("source") + body["time"].should.equal(iso_8601_datetime_without_milliseconds(event_time)) + body["region"].should.equal("eu-central-1") + body["resources"].should.be.empty + body["detail"].should.equal({"key": "value"})