diff --git a/moto/sqs/models.py b/moto/sqs/models.py index 1ab98e94c..5b0f5f430 100644 --- a/moto/sqs/models.py +++ b/moto/sqs/models.py @@ -63,6 +63,8 @@ BINARY_LIST_TYPE_FIELD_INDEX = 4 # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html ATTRIBUTE_NAME_PATTERN = re.compile("^([a-z]|[A-Z]|[0-9]|[_.\\-])+$") +DEDUPLICATION_TIME_IN_SECONDS = 300 + class Message(BaseModel): def __init__(self, message_id, body): @@ -478,6 +480,18 @@ class Queue(CloudFormationModel): ] def add_message(self, message): + if ( + self.fifo_queue + and self.attributes.get("ContentBasedDeduplication") == "true" + ): + for m in self._messages: + if m.deduplication_id == message.deduplication_id: + diff = message.sent_timestamp - m.sent_timestamp + # if a duplicate message is received within the deduplication time then it should + # not be added to the queue + if diff / 1000 < DEDUPLICATION_TIME_IN_SECONDS: + return + self._messages.append(message) from moto.awslambda import lambda_backends @@ -675,6 +689,13 @@ class SQSBackend(BaseBackend): message_id = get_random_message_id() message = Message(message_id, message_body) + # if content based deduplication is set then set sha256 hash of the message + # as the deduplication_id + if queue.attributes.get("ContentBasedDeduplication") == "true": + sha256 = hashlib.sha256() + sha256.update(message_body.encode("utf-8")) + message.deduplication_id = sha256.hexdigest() + # Attributes, but not *message* attributes if deduplication_id is not None: message.deduplication_id = deduplication_id diff --git a/tests/test_sqs/test_sqs.py b/tests/test_sqs/test_sqs.py index c234f5cdc..df7b8dec8 100644 --- a/tests/test_sqs/test_sqs.py +++ b/tests/test_sqs/test_sqs.py @@ -5,11 +5,13 @@ import base64 import json import time import uuid +import hashlib import boto import boto3 import botocore.exceptions import six +import sys import sure # noqa from boto.exception import SQSError from boto.sqs.message import Message, RawMessage @@ -17,6 +19,12 @@ from botocore.exceptions import ClientError from freezegun import freeze_time from moto import mock_sqs, mock_sqs_deprecated, mock_lambda, mock_logs, settings from unittest import SkipTest + +if sys.version_info[0] < 3: + import mock + from unittest import SkipTest +else: + from unittest import SkipTest, mock import pytest from tests.helpers import requires_boto_gte from tests.test_awslambda.test_lambda import get_test_zip_file1, get_role_name @@ -46,6 +54,8 @@ TEST_POLICY = """ } """ +MOCK_DEDUPLICATION_TIME_IN_SECONDS = 5 + @mock_sqs def test_create_fifo_queue_fail(): @@ -2283,3 +2293,94 @@ def test_send_message_fails_when_message_size_greater_than_max_message_size(): ex.response["Error"]["Message"].should.contain( "{} bytes".format(message_size_limit) ) + + +@mock_sqs +@pytest.mark.parametrize( + "msg_1, msg_2, dedupid_1, dedupid_2, expected_count", + [ + ("msg1", "msg1", "1", "1", 1), + ("msg1", "msg1", "1", "2", 2), + ("msg1", "msg2", "1", "1", 1), + ("msg1", "msg2", "1", "2", 2), + ], +) +def test_fifo_queue_deduplication_with_id( + msg_1, msg_2, dedupid_1, dedupid_2, expected_count +): + + sqs = boto3.resource("sqs", region_name="us-east-1") + msg_queue = sqs.create_queue( + QueueName="test-queue-dlq.fifo", + Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, + ) + + msg_queue.send_message( + MessageBody=msg_1, MessageDeduplicationId=dedupid_1, MessageGroupId="1" + ) + msg_queue.send_message( + MessageBody=msg_2, MessageDeduplicationId=dedupid_2, MessageGroupId="2" + ) + messages = msg_queue.receive_messages(MaxNumberOfMessages=2) + messages.should.have.length_of(expected_count) + + +@mock_sqs +@pytest.mark.parametrize( + "msg_1, msg_2, expected_count", [("msg1", "msg1", 1), ("msg1", "msg2", 2),], +) +def test_fifo_queue_deduplication_withoutid(msg_1, msg_2, expected_count): + + sqs = boto3.resource("sqs", region_name="us-east-1") + msg_queue = sqs.create_queue( + QueueName="test-queue-dlq.fifo", + Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, + ) + + msg_queue.send_message(MessageBody=msg_1, MessageGroupId="1") + msg_queue.send_message(MessageBody=msg_2, MessageGroupId="2") + messages = msg_queue.receive_messages(MaxNumberOfMessages=2) + messages.should.have.length_of(expected_count) + + +@mock.patch( + "moto.sqs.models.DEDUPLICATION_TIME_IN_SECONDS", MOCK_DEDUPLICATION_TIME_IN_SECONDS +) +@mock_sqs +def test_fifo_queue_send_duplicate_messages_after_deduplication_time_limit(): + if settings.TEST_SERVER_MODE: + raise SkipTest("Cant manipulate time in server mode") + + sqs = boto3.resource("sqs", region_name="us-east-1") + msg_queue = sqs.create_queue( + QueueName="test-queue-dlq.fifo", + Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, + ) + + msg_queue.send_message(MessageBody="first", MessageGroupId="1") + time.sleep(MOCK_DEDUPLICATION_TIME_IN_SECONDS + 5) + msg_queue.send_message(MessageBody="first", MessageGroupId="2") + messages = msg_queue.receive_messages(MaxNumberOfMessages=2) + messages.should.have.length_of(2) + + +@mock_sqs +def test_fifo_queue_send_deduplicationid_same_as_sha256_of_old_message(): + + sqs = boto3.resource("sqs", region_name="us-east-1") + msg_queue = sqs.create_queue( + QueueName="test-queue-dlq.fifo", + Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, + ) + + msg_queue.send_message(MessageBody="first", MessageGroupId="1") + + sha256 = hashlib.sha256() + sha256.update("first".encode("utf-8")) + deduplicationid = sha256.hexdigest() + + msg_queue.send_message( + MessageBody="second", MessageGroupId="2", MessageDeduplicationId=deduplicationid + ) + messages = msg_queue.receive_messages(MaxNumberOfMessages=2) + messages.should.have.length_of(1)