SQS: Improve deduplication logic (#6184)

This commit is contained in:
rafcio19 2023-04-09 11:13:28 +01:00 committed by GitHub
parent 0e6d27bac0
commit 201e57585b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 112 additions and 18 deletions

View File

@ -525,17 +525,22 @@ class Queue(CloudFormationModel):
]
def add_message(self, message):
if (
self.fifo_queue
and self.attributes.get("ContentBasedDeduplication") == "true"
):
for m in self._messages:
if m.deduplication_id == message.deduplication_id:
diff = message.sent_timestamp - m.sent_timestamp
# if a duplicate message is received within the deduplication time then it should
# not be added to the queue
if diff / 1000 < DEDUPLICATION_TIME_IN_SECONDS:
return
if self.fifo_queue:
# the cases in which we dedupe fifo messages
# from https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html
# https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
if (
self.attributes.get("ContentBasedDeduplication") == "true"
or message.deduplication_id
):
for m in self._messages:
if m.deduplication_id == message.deduplication_id:
diff = message.sent_timestamp - m.sent_timestamp
# if a duplicate message is received within the deduplication time then it should
# not be added to the queue
if diff / 1000 < DEDUPLICATION_TIME_IN_SECONDS:
return
with self._messages_lock:
self._messages.append(message)
@ -758,6 +763,25 @@ class SQSBackend(BaseBackend):
queue = self.get_queue(queue_name)
if queue.fifo_queue:
if (
queue.attributes.get("ContentBasedDeduplication") == "false"
and not group_id
):
msg = "MessageGroupId"
raise MissingParameter(msg)
if (
queue.attributes.get("ContentBasedDeduplication") == "false"
and group_id
and not deduplication_id
):
msg = (
"The queue should either have ContentBasedDeduplication enabled or "
"MessageDeduplicationId provided explicitly"
)
raise InvalidParameterValue(msg)
if len(message_body) > queue.maximum_message_size:
msg = f"One or more parameters are invalid. Reason: Message must be shorter than {queue.maximum_message_size} bytes."
raise InvalidParameterValue(msg)

View File

@ -2090,7 +2090,8 @@ def test_batch_change_message_visibility():
with freeze_time("2015-01-01 12:00:00"):
sqs = boto3.client("sqs", region_name="us-east-1")
resp = sqs.create_queue(
QueueName="test-dlr-queue.fifo", Attributes={"FifoQueue": "true"}
QueueName="test-dlr-queue.fifo",
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)
queue_url = resp["QueueUrl"]
@ -2524,7 +2525,8 @@ def test_queue_with_dlq():
with freeze_time("2015-01-01 12:00:00"):
resp = sqs.create_queue(
QueueName=f"{str(uuid4())[0:6]}.fifo", Attributes={"FifoQueue": "true"}
QueueName=f"{str(uuid4())[0:6]}.fifo",
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)
queue_url1 = resp["QueueUrl"]
queue_arn1 = sqs.get_queue_attributes(
@ -2535,6 +2537,7 @@ def test_queue_with_dlq():
QueueName=f"{str(uuid4())[0:6]}.fifo",
Attributes={
"FifoQueue": "true",
"ContentBasedDeduplication": "true",
"RedrivePolicy": json.dumps(
{"deadLetterTargetArn": queue_arn1, "maxReceiveCount": 2}
),
@ -2681,7 +2684,8 @@ def test_redrive_policy_set_attributes_with_string_value():
def test_receive_messages_with_message_group_id():
sqs = boto3.resource("sqs", region_name="us-east-1")
queue = sqs.create_queue(
QueueName=f"{str(uuid4())[0:6]}.fifo", Attributes={"FifoQueue": "true"}
QueueName=f"{str(uuid4())[0:6]}.fifo",
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)
queue.set_attributes(Attributes={"VisibilityTimeout": "3600"})
queue.send_message(MessageBody="message-1", MessageGroupId="group")
@ -2712,7 +2716,8 @@ def test_receive_messages_with_message_group_id():
def test_receive_messages_with_message_group_id_on_requeue():
sqs = boto3.resource("sqs", region_name="us-east-1")
queue = sqs.create_queue(
QueueName=f"{str(uuid4())[0:6]}.fifo", Attributes={"FifoQueue": "true"}
QueueName=f"{str(uuid4())[0:6]}.fifo",
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)
queue.set_attributes(Attributes={"VisibilityTimeout": "3600"})
queue.send_message(MessageBody="message-1", MessageGroupId="group")
@ -2742,7 +2747,8 @@ def test_receive_messages_with_message_group_id_on_visibility_timeout():
with freeze_time("2015-01-01 12:00:00"):
sqs = boto3.resource("sqs", region_name="us-east-1")
queue = sqs.create_queue(
QueueName="test-queue.fifo", Attributes={"FifoQueue": "true"}
QueueName="test-queue.fifo",
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)
queue.set_attributes(Attributes={"VisibilityTimeout": "3600"})
queue.send_message(MessageBody="message-1", MessageGroupId="group")
@ -2989,7 +2995,10 @@ def test_fifo_send_message_when_same_group_id_is_in_dlq():
sqs = boto3.resource("sqs", region_name="us-east-1")
q_name = f"{str(uuid4())[0:6]}-dlq.fifo"
dlq = sqs.create_queue(QueueName=q_name, Attributes={"FifoQueue": "true"})
dlq = sqs.create_queue(
QueueName=q_name,
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)
queue = sqs.get_queue_by_name(QueueName=q_name)
dead_letter_queue_arn = queue.attributes.get("QueueArn")
@ -2998,6 +3007,7 @@ def test_fifo_send_message_when_same_group_id_is_in_dlq():
QueueName=f"{str(uuid4())[0:6]}.fifo",
Attributes={
"FifoQueue": "true",
"ContentBasedDeduplication": "true",
"RedrivePolicy": json.dumps(
{"deadLetterTargetArn": dead_letter_queue_arn, "maxReceiveCount": 1}
),
@ -3197,3 +3207,60 @@ def test_receive_message_that_becomes_visible_while_long_polling():
assert len(messages) == 1
assert messages[0].body == msg_body
assert end - begin < time_to_wait
@mock_sqs
def test_dedupe_fifo():
sqs = boto3.resource("sqs", region_name="us-east-1")
queue = sqs.create_queue(
QueueName="my-queue.fifo",
Attributes={
"FifoQueue": "true",
},
)
for _ in range(5):
queue.send_message(
MessageBody="test",
MessageDeduplicationId="1",
MessageGroupId="2",
)
assert int(queue.attributes["ApproximateNumberOfMessages"]) == 1
@mock_sqs
def test_fifo_dedupe_error_no_message_group_id():
sqs = boto3.resource("sqs", region_name="us-east-1")
queue = sqs.create_queue(
QueueName="my-queue.fifo",
Attributes={"FifoQueue": "true"},
)
with pytest.raises(ClientError) as exc:
queue.send_message(
MessageBody="test",
MessageDeduplicationId="1",
)
exc.value.response["Error"]["Code"].should.equal("MissingParameter")
exc.value.response["Error"]["Message"].should.equal(
"The request must contain the parameter MessageGroupId."
)
@mock_sqs
def test_fifo_dedupe_error_no_message_dedupe_id():
sqs = boto3.resource("sqs", region_name="us-east-1")
queue = sqs.create_queue(
QueueName="my-queue.fifo",
Attributes={"FifoQueue": "true"},
)
with pytest.raises(ClientError) as exc:
queue.send_message(
MessageBody="test",
MessageGroupId="1",
)
exc.value.response["Error"]["Code"].should.equal("InvalidParameterValue")
exc.value.response["Error"]["Message"].should.equal(
"The queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly"
)

View File

@ -85,7 +85,10 @@ def test_invoke_function_from_sqs_fifo_queue():
logs_conn = boto3.client("logs", region_name="us-east-1")
sqs = boto3.resource("sqs", region_name="us-east-1")
queue_name = str(uuid.uuid4())[0:6] + ".fifo"
queue = sqs.create_queue(QueueName=queue_name, Attributes={"FifoQueue": "true"})
queue = sqs.create_queue(
QueueName=queue_name,
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)
fn_name = str(uuid.uuid4())[0:6]
conn = boto3.client("lambda", region_name="us-east-1")