fix Fifo queue delivers duplicate message(s) (#3538)

* fix https://github.com/localstack/localstack/issues/3339

* fixe lint issues

* Fix review comments
- move deduplication time to constants
- make tests parameterized
- update tests as per review comments

* change variable name expectedCount => expected_count

* fix tests for python 2.7
increase deduplication mock config to account for delays

* ignore time mocking test in server mode
This commit is contained in:
irahulranjan 2020-12-13 20:09:10 +05:30 committed by GitHub
parent dffc0e449c
commit 7b97141184
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 122 additions and 0 deletions

View File

@ -63,6 +63,8 @@ BINARY_LIST_TYPE_FIELD_INDEX = 4
# https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html
ATTRIBUTE_NAME_PATTERN = re.compile("^([a-z]|[A-Z]|[0-9]|[_.\\-])+$") ATTRIBUTE_NAME_PATTERN = re.compile("^([a-z]|[A-Z]|[0-9]|[_.\\-])+$")
DEDUPLICATION_TIME_IN_SECONDS = 300
class Message(BaseModel): class Message(BaseModel):
def __init__(self, message_id, body): def __init__(self, message_id, body):
@ -478,6 +480,18 @@ class Queue(CloudFormationModel):
] ]
def add_message(self, message): def add_message(self, message):
if (
self.fifo_queue
and self.attributes.get("ContentBasedDeduplication") == "true"
):
for m in self._messages:
if m.deduplication_id == message.deduplication_id:
diff = message.sent_timestamp - m.sent_timestamp
# if a duplicate message is received within the deduplication time then it should
# not be added to the queue
if diff / 1000 < DEDUPLICATION_TIME_IN_SECONDS:
return
self._messages.append(message) self._messages.append(message)
from moto.awslambda import lambda_backends from moto.awslambda import lambda_backends
@ -675,6 +689,13 @@ class SQSBackend(BaseBackend):
message_id = get_random_message_id() message_id = get_random_message_id()
message = Message(message_id, message_body) message = Message(message_id, message_body)
# if content based deduplication is set then set sha256 hash of the message
# as the deduplication_id
if queue.attributes.get("ContentBasedDeduplication") == "true":
sha256 = hashlib.sha256()
sha256.update(message_body.encode("utf-8"))
message.deduplication_id = sha256.hexdigest()
# Attributes, but not *message* attributes # Attributes, but not *message* attributes
if deduplication_id is not None: if deduplication_id is not None:
message.deduplication_id = deduplication_id message.deduplication_id = deduplication_id

View File

@ -5,11 +5,13 @@ import base64
import json import json
import time import time
import uuid import uuid
import hashlib
import boto import boto
import boto3 import boto3
import botocore.exceptions import botocore.exceptions
import six import six
import sys
import sure # noqa import sure # noqa
from boto.exception import SQSError from boto.exception import SQSError
from boto.sqs.message import Message, RawMessage from boto.sqs.message import Message, RawMessage
@ -17,6 +19,12 @@ from botocore.exceptions import ClientError
from freezegun import freeze_time from freezegun import freeze_time
from moto import mock_sqs, mock_sqs_deprecated, mock_lambda, mock_logs, settings from moto import mock_sqs, mock_sqs_deprecated, mock_lambda, mock_logs, settings
from unittest import SkipTest from unittest import SkipTest
if sys.version_info[0] < 3:
import mock
from unittest import SkipTest
else:
from unittest import SkipTest, mock
import pytest import pytest
from tests.helpers import requires_boto_gte from tests.helpers import requires_boto_gte
from tests.test_awslambda.test_lambda import get_test_zip_file1, get_role_name from tests.test_awslambda.test_lambda import get_test_zip_file1, get_role_name
@ -46,6 +54,8 @@ TEST_POLICY = """
} }
""" """
MOCK_DEDUPLICATION_TIME_IN_SECONDS = 5
@mock_sqs @mock_sqs
def test_create_fifo_queue_fail(): def test_create_fifo_queue_fail():
@ -2283,3 +2293,94 @@ def test_send_message_fails_when_message_size_greater_than_max_message_size():
ex.response["Error"]["Message"].should.contain( ex.response["Error"]["Message"].should.contain(
"{} bytes".format(message_size_limit) "{} bytes".format(message_size_limit)
) )
@mock_sqs
@pytest.mark.parametrize(
"msg_1, msg_2, dedupid_1, dedupid_2, expected_count",
[
("msg1", "msg1", "1", "1", 1),
("msg1", "msg1", "1", "2", 2),
("msg1", "msg2", "1", "1", 1),
("msg1", "msg2", "1", "2", 2),
],
)
def test_fifo_queue_deduplication_with_id(
msg_1, msg_2, dedupid_1, dedupid_2, expected_count
):
sqs = boto3.resource("sqs", region_name="us-east-1")
msg_queue = sqs.create_queue(
QueueName="test-queue-dlq.fifo",
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)
msg_queue.send_message(
MessageBody=msg_1, MessageDeduplicationId=dedupid_1, MessageGroupId="1"
)
msg_queue.send_message(
MessageBody=msg_2, MessageDeduplicationId=dedupid_2, MessageGroupId="2"
)
messages = msg_queue.receive_messages(MaxNumberOfMessages=2)
messages.should.have.length_of(expected_count)
@mock_sqs
@pytest.mark.parametrize(
"msg_1, msg_2, expected_count", [("msg1", "msg1", 1), ("msg1", "msg2", 2),],
)
def test_fifo_queue_deduplication_withoutid(msg_1, msg_2, expected_count):
sqs = boto3.resource("sqs", region_name="us-east-1")
msg_queue = sqs.create_queue(
QueueName="test-queue-dlq.fifo",
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)
msg_queue.send_message(MessageBody=msg_1, MessageGroupId="1")
msg_queue.send_message(MessageBody=msg_2, MessageGroupId="2")
messages = msg_queue.receive_messages(MaxNumberOfMessages=2)
messages.should.have.length_of(expected_count)
@mock.patch(
"moto.sqs.models.DEDUPLICATION_TIME_IN_SECONDS", MOCK_DEDUPLICATION_TIME_IN_SECONDS
)
@mock_sqs
def test_fifo_queue_send_duplicate_messages_after_deduplication_time_limit():
if settings.TEST_SERVER_MODE:
raise SkipTest("Cant manipulate time in server mode")
sqs = boto3.resource("sqs", region_name="us-east-1")
msg_queue = sqs.create_queue(
QueueName="test-queue-dlq.fifo",
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)
msg_queue.send_message(MessageBody="first", MessageGroupId="1")
time.sleep(MOCK_DEDUPLICATION_TIME_IN_SECONDS + 5)
msg_queue.send_message(MessageBody="first", MessageGroupId="2")
messages = msg_queue.receive_messages(MaxNumberOfMessages=2)
messages.should.have.length_of(2)
@mock_sqs
def test_fifo_queue_send_deduplicationid_same_as_sha256_of_old_message():
sqs = boto3.resource("sqs", region_name="us-east-1")
msg_queue = sqs.create_queue(
QueueName="test-queue-dlq.fifo",
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)
msg_queue.send_message(MessageBody="first", MessageGroupId="1")
sha256 = hashlib.sha256()
sha256.update("first".encode("utf-8"))
deduplicationid = sha256.hexdigest()
msg_queue.send_message(
MessageBody="second", MessageGroupId="2", MessageDeduplicationId=deduplicationid
)
messages = msg_queue.receive_messages(MaxNumberOfMessages=2)
messages.should.have.length_of(1)