diff --git a/moto/sqs/models.py b/moto/sqs/models.py index f88d906b9..ea3b89f04 100644 --- a/moto/sqs/models.py +++ b/moto/sqs/models.py @@ -6,6 +6,7 @@ import json import re import six import struct +from copy import deepcopy from xml.sax.saxutils import escape from boto3 import Session @@ -101,7 +102,6 @@ class Message(BaseModel): if data_type == "String" or data_type == "Number": value = attr["string_value"] elif data_type == "Binary": - print(data_type, attr["binary_value"], type(attr["binary_value"])) value = base64.b64decode(attr["binary_value"]) else: print( @@ -722,6 +722,7 @@ class SQSBackend(BaseBackend): previous_result_count = len(result) polling_end = unix_time() + wait_seconds_timeout + currently_pending_groups = deepcopy(queue.pending_message_groups) # queue.messages only contains visible messages while True: @@ -739,11 +740,11 @@ class SQSBackend(BaseBackend): # The message is pending but is visible again, so the # consumer must have timed out. queue.pending_messages.remove(message) + currently_pending_groups = deepcopy(queue.pending_message_groups) if message.group_id and queue.fifo_queue: - if message.group_id in queue.pending_message_groups: - # There is already one active message with the same - # group, so we cannot deliver this one. + if message.group_id in currently_pending_groups: + # A previous call is still processing messages in this group, so we cannot deliver this one. continue queue.pending_messages.add(message) diff --git a/moto/sqs/responses.py b/moto/sqs/responses.py index f5481cc10..eed50a527 100644 --- a/moto/sqs/responses.py +++ b/moto/sqs/responses.py @@ -232,6 +232,14 @@ class SQSResponse(BaseResponse): queue_name = self._get_queue_name() + if not message_group_id: + queue = self.sqs_backend.get_queue(queue_name) + if queue.attributes.get("FifoQueue", False): + return self._error( + "MissingParameter", + "The request must contain the parameter MessageGroupId.", + ) + message = self.sqs_backend.send_message( queue_name, message, diff --git a/tests/test_sqs/test_sqs.py b/tests/test_sqs/test_sqs.py index 01e34de0b..31bbafffb 100644 --- a/tests/test_sqs/test_sqs.py +++ b/tests/test_sqs/test_sqs.py @@ -1164,7 +1164,7 @@ def test_send_message_batch_with_empty_list(): @mock_sqs def test_batch_change_message_visibility(): - if os.environ.get("TEST_SERVER_MODE", "false").lower() == "true": + if settings.TEST_SERVER_MODE: raise SkipTest("Cant manipulate time in server mode") with freeze_time("2015-01-01 12:00:00"): @@ -1174,9 +1174,15 @@ def test_batch_change_message_visibility(): ) queue_url = resp["QueueUrl"] - sqs.send_message(QueueUrl=queue_url, MessageBody="msg1") - sqs.send_message(QueueUrl=queue_url, MessageBody="msg2") - sqs.send_message(QueueUrl=queue_url, MessageBody="msg3") + sqs.send_message( + QueueUrl=queue_url, MessageBody="msg1", MessageGroupId="group1" + ) + sqs.send_message( + QueueUrl=queue_url, MessageBody="msg2", MessageGroupId="group2" + ) + sqs.send_message( + QueueUrl=queue_url, MessageBody="msg3", MessageGroupId="group3" + ) with freeze_time("2015-01-01 12:01:00"): receive_resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=2) @@ -1529,7 +1535,7 @@ def test_create_fifo_queue_with_dlq(): @mock_sqs def test_queue_with_dlq(): - if os.environ.get("TEST_SERVER_MODE", "false").lower() == "true": + if settings.TEST_SERVER_MODE: raise SkipTest("Cant manipulate time in server mode") sqs = boto3.client("sqs", region_name="us-east-1") @@ -1554,8 +1560,12 @@ def test_queue_with_dlq(): ) queue_url2 = resp["QueueUrl"] - sqs.send_message(QueueUrl=queue_url2, MessageBody="msg1") - sqs.send_message(QueueUrl=queue_url2, MessageBody="msg2") + sqs.send_message( + QueueUrl=queue_url2, MessageBody="msg1", MessageGroupId="group" + ) + sqs.send_message( + QueueUrl=queue_url2, MessageBody="msg2", MessageGroupId="group" + ) with freeze_time("2015-01-01 13:00:00"): resp = sqs.receive_message( @@ -1686,20 +1696,24 @@ def test_receive_messages_with_message_group_id(): queue.set_attributes(Attributes={"VisibilityTimeout": "3600"}) queue.send_message(MessageBody="message-1", MessageGroupId="group") queue.send_message(MessageBody="message-2", MessageGroupId="group") + queue.send_message(MessageBody="message-3", MessageGroupId="group") + queue.send_message(MessageBody="separate-message", MessageGroupId="anothergroup") - messages = queue.receive_messages() - messages.should.have.length_of(1) - message = messages[0] + messages = queue.receive_messages(MaxNumberOfMessages=2) + messages.should.have.length_of(2) + messages[0].attributes["MessageGroupId"].should.equal("group") - # received message is not deleted! - - messages = queue.receive_messages(WaitTimeSeconds=0) - messages.should.have.length_of(0) + # Different client can not 'see' messages from the group until they are processed + messages_for_client_2 = queue.receive_messages(WaitTimeSeconds=0) + messages_for_client_2.should.have.length_of(1) + messages_for_client_2[0].body.should.equal("separate-message") # message is now processed, next one should be available - message.delete() + for message in messages: + message.delete() messages = queue.receive_messages() messages.should.have.length_of(1) + messages[0].body.should.equal("message-3") @mock_sqs @@ -1730,7 +1744,7 @@ def test_receive_messages_with_message_group_id_on_requeue(): @mock_sqs def test_receive_messages_with_message_group_id_on_visibility_timeout(): - if os.environ.get("TEST_SERVER_MODE", "false").lower() == "true": + if settings.TEST_SERVER_MODE: raise SkipTest("Cant manipulate time in server mode") with freeze_time("2015-01-01 12:00:00"): @@ -1746,12 +1760,12 @@ def test_receive_messages_with_message_group_id_on_visibility_timeout(): messages.should.have.length_of(1) message = messages[0] - # received message is not deleted! + # received message is not processed yet + messages_for_second_client = queue.receive_messages(WaitTimeSeconds=0) + messages_for_second_client.should.have.length_of(0) - messages = queue.receive_messages(WaitTimeSeconds=0) - messages.should.have.length_of(0) - - message.change_visibility(VisibilityTimeout=10) + for message in messages: + message.change_visibility(VisibilityTimeout=10) with freeze_time("2015-01-01 12:00:05"): # no timeout yet @@ -1794,3 +1808,20 @@ def test_list_queues_limits_to_1000_queues(): list(resource.queues.filter(QueueNamePrefix="test-queue")).should.have.length_of( 1000 ) + + +@mock_sqs +def test_send_messages_to_fifo_without_message_group_id(): + sqs = boto3.resource("sqs", region_name="eu-west-3") + queue = sqs.create_queue( + QueueName="blah.fifo", + Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, + ) + + with assert_raises(Exception) as e: + queue.send_message(MessageBody="message-1") + ex = e.exception + ex.response["Error"]["Code"].should.equal("MissingParameter") + ex.response["Error"]["Message"].should.equal( + "The request must contain the parameter MessageGroupId." + )