From 201e57585b0c290371bae75394a52df94e8b5732 Mon Sep 17 00:00:00 2001 From: rafcio19 Date: Sun, 9 Apr 2023 11:13:28 +0100 Subject: [PATCH] SQS: Improve deduplication logic (#6184) --- moto/sqs/models.py | 46 +++++++++++---- tests/test_sqs/test_sqs.py | 79 ++++++++++++++++++++++++-- tests/test_sqs/test_sqs_integration.py | 5 +- 3 files changed, 112 insertions(+), 18 deletions(-) diff --git a/moto/sqs/models.py b/moto/sqs/models.py index ee93e1f48..b4939e9d7 100644 --- a/moto/sqs/models.py +++ b/moto/sqs/models.py @@ -525,17 +525,22 @@ class Queue(CloudFormationModel): ] def add_message(self, message): - if ( - self.fifo_queue - and self.attributes.get("ContentBasedDeduplication") == "true" - ): - for m in self._messages: - if m.deduplication_id == message.deduplication_id: - diff = message.sent_timestamp - m.sent_timestamp - # if a duplicate message is received within the deduplication time then it should - # not be added to the queue - if diff / 1000 < DEDUPLICATION_TIME_IN_SECONDS: - return + if self.fifo_queue: + + # the cases in which we dedupe fifo messages + # from https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html + # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html + if ( + self.attributes.get("ContentBasedDeduplication") == "true" + or message.deduplication_id + ): + for m in self._messages: + if m.deduplication_id == message.deduplication_id: + diff = message.sent_timestamp - m.sent_timestamp + # if a duplicate message is received within the deduplication time then it should + # not be added to the queue + if diff / 1000 < DEDUPLICATION_TIME_IN_SECONDS: + return with self._messages_lock: self._messages.append(message) @@ -758,6 +763,25 @@ class SQSBackend(BaseBackend): queue = self.get_queue(queue_name) + if queue.fifo_queue: + if ( + queue.attributes.get("ContentBasedDeduplication") == "false" + and not group_id + ): + msg = "MessageGroupId" + raise MissingParameter(msg) + + if ( + queue.attributes.get("ContentBasedDeduplication") == "false" + and group_id + and not deduplication_id + ): + msg = ( + "The queue should either have ContentBasedDeduplication enabled or " + "MessageDeduplicationId provided explicitly" + ) + raise InvalidParameterValue(msg) + if len(message_body) > queue.maximum_message_size: msg = f"One or more parameters are invalid. Reason: Message must be shorter than {queue.maximum_message_size} bytes." raise InvalidParameterValue(msg) diff --git a/tests/test_sqs/test_sqs.py b/tests/test_sqs/test_sqs.py index 72f167e73..60f8fb480 100644 --- a/tests/test_sqs/test_sqs.py +++ b/tests/test_sqs/test_sqs.py @@ -2090,7 +2090,8 @@ def test_batch_change_message_visibility(): with freeze_time("2015-01-01 12:00:00"): sqs = boto3.client("sqs", region_name="us-east-1") resp = sqs.create_queue( - QueueName="test-dlr-queue.fifo", Attributes={"FifoQueue": "true"} + QueueName="test-dlr-queue.fifo", + Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, ) queue_url = resp["QueueUrl"] @@ -2524,7 +2525,8 @@ def test_queue_with_dlq(): with freeze_time("2015-01-01 12:00:00"): resp = sqs.create_queue( - QueueName=f"{str(uuid4())[0:6]}.fifo", Attributes={"FifoQueue": "true"} + QueueName=f"{str(uuid4())[0:6]}.fifo", + Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, ) queue_url1 = resp["QueueUrl"] queue_arn1 = sqs.get_queue_attributes( @@ -2535,6 +2537,7 @@ def test_queue_with_dlq(): QueueName=f"{str(uuid4())[0:6]}.fifo", Attributes={ "FifoQueue": "true", + "ContentBasedDeduplication": "true", "RedrivePolicy": json.dumps( {"deadLetterTargetArn": queue_arn1, "maxReceiveCount": 2} ), @@ -2681,7 +2684,8 @@ def test_redrive_policy_set_attributes_with_string_value(): def test_receive_messages_with_message_group_id(): sqs = boto3.resource("sqs", region_name="us-east-1") queue = sqs.create_queue( - QueueName=f"{str(uuid4())[0:6]}.fifo", Attributes={"FifoQueue": "true"} + QueueName=f"{str(uuid4())[0:6]}.fifo", + Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, ) queue.set_attributes(Attributes={"VisibilityTimeout": "3600"}) queue.send_message(MessageBody="message-1", MessageGroupId="group") @@ -2712,7 +2716,8 @@ def test_receive_messages_with_message_group_id(): def test_receive_messages_with_message_group_id_on_requeue(): sqs = boto3.resource("sqs", region_name="us-east-1") queue = sqs.create_queue( - QueueName=f"{str(uuid4())[0:6]}.fifo", Attributes={"FifoQueue": "true"} + QueueName=f"{str(uuid4())[0:6]}.fifo", + Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, ) queue.set_attributes(Attributes={"VisibilityTimeout": "3600"}) queue.send_message(MessageBody="message-1", MessageGroupId="group") @@ -2742,7 +2747,8 @@ def test_receive_messages_with_message_group_id_on_visibility_timeout(): with freeze_time("2015-01-01 12:00:00"): sqs = boto3.resource("sqs", region_name="us-east-1") queue = sqs.create_queue( - QueueName="test-queue.fifo", Attributes={"FifoQueue": "true"} + QueueName="test-queue.fifo", + Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, ) queue.set_attributes(Attributes={"VisibilityTimeout": "3600"}) queue.send_message(MessageBody="message-1", MessageGroupId="group") @@ -2989,7 +2995,10 @@ def test_fifo_send_message_when_same_group_id_is_in_dlq(): sqs = boto3.resource("sqs", region_name="us-east-1") q_name = f"{str(uuid4())[0:6]}-dlq.fifo" - dlq = sqs.create_queue(QueueName=q_name, Attributes={"FifoQueue": "true"}) + dlq = sqs.create_queue( + QueueName=q_name, + Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, + ) queue = sqs.get_queue_by_name(QueueName=q_name) dead_letter_queue_arn = queue.attributes.get("QueueArn") @@ -2998,6 +3007,7 @@ def test_fifo_send_message_when_same_group_id_is_in_dlq(): QueueName=f"{str(uuid4())[0:6]}.fifo", Attributes={ "FifoQueue": "true", + "ContentBasedDeduplication": "true", "RedrivePolicy": json.dumps( {"deadLetterTargetArn": dead_letter_queue_arn, "maxReceiveCount": 1} ), @@ -3197,3 +3207,60 @@ def test_receive_message_that_becomes_visible_while_long_polling(): assert len(messages) == 1 assert messages[0].body == msg_body assert end - begin < time_to_wait + + +@mock_sqs +def test_dedupe_fifo(): + sqs = boto3.resource("sqs", region_name="us-east-1") + queue = sqs.create_queue( + QueueName="my-queue.fifo", + Attributes={ + "FifoQueue": "true", + }, + ) + + for _ in range(5): + queue.send_message( + MessageBody="test", + MessageDeduplicationId="1", + MessageGroupId="2", + ) + assert int(queue.attributes["ApproximateNumberOfMessages"]) == 1 + + +@mock_sqs +def test_fifo_dedupe_error_no_message_group_id(): + sqs = boto3.resource("sqs", region_name="us-east-1") + queue = sqs.create_queue( + QueueName="my-queue.fifo", + Attributes={"FifoQueue": "true"}, + ) + with pytest.raises(ClientError) as exc: + queue.send_message( + MessageBody="test", + MessageDeduplicationId="1", + ) + + exc.value.response["Error"]["Code"].should.equal("MissingParameter") + exc.value.response["Error"]["Message"].should.equal( + "The request must contain the parameter MessageGroupId." + ) + + +@mock_sqs +def test_fifo_dedupe_error_no_message_dedupe_id(): + sqs = boto3.resource("sqs", region_name="us-east-1") + queue = sqs.create_queue( + QueueName="my-queue.fifo", + Attributes={"FifoQueue": "true"}, + ) + with pytest.raises(ClientError) as exc: + queue.send_message( + MessageBody="test", + MessageGroupId="1", + ) + + exc.value.response["Error"]["Code"].should.equal("InvalidParameterValue") + exc.value.response["Error"]["Message"].should.equal( + "The queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly" + ) diff --git a/tests/test_sqs/test_sqs_integration.py b/tests/test_sqs/test_sqs_integration.py index 1e89f7137..15e2c1698 100644 --- a/tests/test_sqs/test_sqs_integration.py +++ b/tests/test_sqs/test_sqs_integration.py @@ -85,7 +85,10 @@ def test_invoke_function_from_sqs_fifo_queue(): logs_conn = boto3.client("logs", region_name="us-east-1") sqs = boto3.resource("sqs", region_name="us-east-1") queue_name = str(uuid.uuid4())[0:6] + ".fifo" - queue = sqs.create_queue(QueueName=queue_name, Attributes={"FifoQueue": "true"}) + queue = sqs.create_queue( + QueueName=queue_name, + Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, + ) fn_name = str(uuid.uuid4())[0:6] conn = boto3.client("lambda", region_name="us-east-1")