Add events target integration for sqs fifo queue (#3830)

* Add events target integration for sqs fifo queue

* Rename test file
This commit is contained in:
Anton Grübel 2021-04-02 15:29:05 +02:00 committed by GitHub
parent b01c58785b
commit e90858b2e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 149 additions and 4 deletions

View File

@ -3,6 +3,7 @@ import os
import re import re
import json import json
import sys import sys
import warnings
from collections import namedtuple from collections import namedtuple
from datetime import datetime from datetime import datetime
from enum import Enum, unique from enum import Enum, unique
@ -121,7 +122,7 @@ class Rule(CloudFormationModel):
# supported targets # supported targets
# - CloudWatch Log Group # - CloudWatch Log Group
# - EventBridge Archive # - EventBridge Archive
# - SQS Queue (not FIFO) # - SQS Queue + FIFO Queue
for target in self.targets: for target in self.targets:
arn = self._parse_arn(target["Arn"]) arn = self._parse_arn(target["Arn"])
@ -133,7 +134,8 @@ class Rule(CloudFormationModel):
self._send_to_events_archive(archive_arn.resource_id, event) self._send_to_events_archive(archive_arn.resource_id, event)
elif arn.service == "sqs": elif arn.service == "sqs":
self._send_to_sqs_queue(arn.resource_id, event) group_id = target.get("SqsParameters", {}).get("MessageGroupId")
self._send_to_sqs_queue(arn.resource_id, event, group_id)
else: else:
raise NotImplementedError("Expr not defined for {0}".format(type(self))) raise NotImplementedError("Expr not defined for {0}".format(type(self)))
@ -211,7 +213,7 @@ class Rule(CloudFormationModel):
if self._does_event_match_pattern(event, pattern): if self._does_event_match_pattern(event, pattern):
archive.events.append(event) archive.events.append(event)
def _send_to_sqs_queue(self, resource_id, event): def _send_to_sqs_queue(self, resource_id, event, group_id=None):
from moto.sqs import sqs_backends from moto.sqs import sqs_backends
event_copy = copy.deepcopy(event) event_copy = copy.deepcopy(event)
@ -219,8 +221,20 @@ class Rule(CloudFormationModel):
datetime.utcfromtimestamp(event_copy["time"]) datetime.utcfromtimestamp(event_copy["time"])
) )
if group_id:
queue_attr = sqs_backends[self.region_name].get_queue_attributes(
queue_name=resource_id, attribute_names=["ContentBasedDeduplication"]
)
if queue_attr["ContentBasedDeduplication"] == "false":
warnings.warn(
"To let EventBridge send messages to your SQS FIFO queue, you must enable content-based deduplication."
)
return
sqs_backends[self.region_name].send_message( sqs_backends[self.region_name].send_message(
queue_name=resource_id, message_body=json.dumps(event_copy) queue_name=resource_id,
message_body=json.dumps(event_copy),
group_id=group_id,
) )
def get_cfn_attribute(self, attribute_name): def get_cfn_attribute(self, attribute_name):
@ -763,6 +777,20 @@ class EventsBackend(BaseBackend):
"Reason: Provided Arn is not in correct format.".format(invalid_arn) "Reason: Provided Arn is not in correct format.".format(invalid_arn)
) )
for target in targets:
arn = target["Arn"]
if (
":sqs:" in arn
and arn.endswith(".fifo")
and not target.get("SqsParameters")
):
raise ValidationException(
"Parameter(s) SqsParameters must be specified for target: {}.".format(
target["Id"]
)
)
rule = self.rules.get(name) rule = self.rules.get(name)
if not rule: if not rule:

View File

@ -401,6 +401,35 @@ def test_put_targets_error_unknown_rule():
) )
@mock_events
def test_put_targets_error_missing_parameter_sqs_fifo():
# given
client = boto3.client("events", "eu-central-1")
# when
with pytest.raises(ClientError) as e:
client.put_targets(
Rule="unknown",
Targets=[
{
"Id": "sqs-fifo",
"Arn": "arn:aws:sqs:eu-central-1:{}:test-queue.fifo".format(
ACCOUNT_ID
),
}
],
)
# then
ex = e.value
ex.operation_name.should.equal("PutTargets")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ValidationException")
ex.response["Error"]["Message"].should.equal(
"Parameter(s) SqsParameters must be specified for target: sqs-fifo."
)
@mock_events @mock_events
def test_permissions(): def test_permissions():
client = boto3.client("events", "eu-central-1") client = boto3.client("events", "eu-central-1")

View File

@ -68,6 +68,94 @@ def test_send_to_cw_log_group():
message["detail"].should.equal({"key": "value"}) message["detail"].should.equal({"key": "value"})
@mock_events
@mock_sqs
def test_send_to_sqs_fifo_queue():
# given
client_events = boto3.client("events", "eu-central-1")
client_sqs = boto3.client("sqs", region_name="eu-central-1")
rule_name = "test-rule"
queue_url = client_sqs.create_queue(
QueueName="test-queue.fifo", Attributes={"FifoQueue": "true"}
)["QueueUrl"]
queue_arn = client_sqs.get_queue_attributes(
QueueUrl=queue_url, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
queue_url_dedup = client_sqs.create_queue(
QueueName="test-queue-dedup.fifo",
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)["QueueUrl"]
queue_arn_dedup = client_sqs.get_queue_attributes(
QueueUrl=queue_url_dedup, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
client_events.put_rule(
Name=rule_name,
EventPattern=json.dumps({"account": [ACCOUNT_ID]}),
State="ENABLED",
)
client_events.put_targets(
Rule=rule_name,
Targets=[
{
"Id": "sqs-fifo",
"Arn": queue_arn,
"SqsParameters": {"MessageGroupId": "group-id"},
},
{
"Id": "sqs-dedup-fifo",
"Arn": queue_arn_dedup,
"SqsParameters": {"MessageGroupId": "group-id"},
},
],
)
# when
event_time = datetime(2021, 1, 1, 12, 23, 34)
client_events.put_events(
Entries=[
{
"Time": event_time,
"Source": "source",
"DetailType": "type",
"Detail": json.dumps({"key": "value"}),
}
],
)
# then
response = client_sqs.receive_message(
QueueUrl=queue_url_dedup,
AttributeNames=["MessageDeduplicationId", "MessageGroupId"],
)
response["Messages"].should.have.length_of(1)
message = response["Messages"][0]
message["MessageId"].should_not.be.empty
message["ReceiptHandle"].should_not.be.empty
message["MD5OfBody"].should_not.be.empty
message["Attributes"]["MessageDeduplicationId"].should_not.be.empty
message["Attributes"]["MessageGroupId"].should.equal("group-id")
body = json.loads(message["Body"])
body["version"].should.equal("0")
body["id"].should_not.be.empty
body["detail-type"].should.equal("type")
body["source"].should.equal("source")
body["time"].should.equal(iso_8601_datetime_without_milliseconds(event_time))
body["region"].should.equal("eu-central-1")
body["resources"].should.be.empty
body["detail"].should.equal({"key": "value"})
# A FIFO queue without content-based deduplication enabled
# does not receive any event from the Event Bus
response = client_sqs.receive_message(
QueueUrl=queue_url, AttributeNames=["MessageDeduplicationId", "MessageGroupId"]
)
response.should_not.have.key("Messages")
@mock_events @mock_events
@mock_sqs @mock_sqs
def test_send_to_sqs_queue(): def test_send_to_sqs_queue():