From 9b8e62e1f1f258d661a4bb4f71b8d44b9b1c17cd Mon Sep 17 00:00:00 2001 From: Daniel Birnstiel Date: Wed, 30 May 2018 09:22:46 +0200 Subject: [PATCH] Add MessageGroupId support to SQS queues (fixes #1655) --- moto/sqs/models.py | 53 +++++++++++++++--- tests/test_sqs/test_sqs.py | 109 +++++++++++++++++++++++++++++++++++++ 2 files changed, 153 insertions(+), 9 deletions(-) diff --git a/moto/sqs/models.py b/moto/sqs/models.py index 9c8858bc0..19def38c4 100644 --- a/moto/sqs/models.py +++ b/moto/sqs/models.py @@ -180,6 +180,7 @@ class Queue(BaseModel): self.permissions = {} self._messages = [] + self._pending_messages = set() now = unix_time() self.created_timestamp = now @@ -209,6 +210,16 @@ class Queue(BaseModel): if self.fifo_queue and not self.name.endswith('.fifo'): raise MessageAttributesInvalid('Queue name must end in .fifo for FIFO queues') + @property + def pending_messages(self): + return self._pending_messages + + @property + def pending_message_groups(self): + return set(message.group_id + for message in self._pending_messages + if message.group_id is not None) + def _set_attributes(self, attributes, now=None): if not now: now = unix_time() @@ -448,6 +459,7 @@ class SQSBackend(BaseBackend): """ queue = self.get_queue(queue_name) result = [] + previous_result_count = len(result) polling_end = unix_time() + wait_seconds_timeout @@ -457,19 +469,25 @@ class SQSBackend(BaseBackend): if result or (wait_seconds_timeout and unix_time() > polling_end): break - if len(queue.messages) == 0: - # we want to break here, otherwise it will be an infinite loop - if wait_seconds_timeout == 0: - break - - import time - time.sleep(0.001) - continue - messages_to_dlq = [] + for message in queue.messages: if not message.visible: continue + + if message in queue.pending_messages: + # The message is pending but is visible again, so the + # consumer must have timed out. + queue.pending_messages.remove(message) + + if message.group_id and queue.fifo_queue: + if message.group_id in queue.pending_message_groups: + # There is already one active message with the same + # group, so we cannot deliver this one. + continue + + queue.pending_messages.add(message) + if queue.dead_letter_queue is not None and message.approximate_receive_count >= queue.redrive_policy['maxReceiveCount']: messages_to_dlq.append(message) continue @@ -485,6 +503,18 @@ class SQSBackend(BaseBackend): queue._messages.remove(message) queue.dead_letter_queue.add_message(message) + if previous_result_count == len(result): + if wait_seconds_timeout == 0: + # There is timeout and we have added no additional results, + # so break to avoid an infinite loop. + break + + import time + time.sleep(0.001) + continue + + previous_result_count = len(result) + return result def delete_message(self, queue_name, receipt_handle): @@ -494,6 +524,7 @@ class SQSBackend(BaseBackend): # Only delete message if it is not visible and the reciept_handle # matches. if message.receipt_handle == receipt_handle: + queue.pending_messages.remove(message) continue new_messages.append(message) queue._messages = new_messages @@ -505,6 +536,10 @@ class SQSBackend(BaseBackend): if message.visible: raise MessageNotInflight message.change_visibility(visibility_timeout) + if message.visible: + # If the message is visible again, remove it from pending + # messages. + queue.pending_messages.remove(message) return raise ReceiptHandleIsInvalid diff --git a/tests/test_sqs/test_sqs.py b/tests/test_sqs/test_sqs.py index 1280fed80..cfe481bea 100644 --- a/tests/test_sqs/test_sqs.py +++ b/tests/test_sqs/test_sqs.py @@ -1063,3 +1063,112 @@ def test_redrive_policy_set_attributes(): assert 'RedrivePolicy' in copy.attributes copy_policy = json.loads(copy.attributes['RedrivePolicy']) assert copy_policy == redrive_policy + + +@mock_sqs +def test_receive_messages_with_message_group_id(): + sqs = boto3.resource('sqs', region_name='us-east-1') + queue = sqs.create_queue(QueueName="test-queue.fifo", + Attributes={ + 'FifoQueue': 'true', + }) + queue.set_attributes(Attributes={"VisibilityTimeout": "3600"}) + queue.send_message( + MessageBody="message-1", + MessageGroupId="group" + ) + queue.send_message( + MessageBody="message-2", + MessageGroupId="group" + ) + + messages = queue.receive_messages() + messages.should.have.length_of(1) + message = messages[0] + + # received message is not deleted! + + messages = queue.receive_messages(WaitTimeSeconds=0) + messages.should.have.length_of(0) + + # message is now processed, next one should be available + message.delete() + messages = queue.receive_messages() + messages.should.have.length_of(1) + + +@mock_sqs +def test_receive_messages_with_message_group_id_on_requeue(): + sqs = boto3.resource('sqs', region_name='us-east-1') + queue = sqs.create_queue(QueueName="test-queue.fifo", + Attributes={ + 'FifoQueue': 'true', + }) + queue.set_attributes(Attributes={"VisibilityTimeout": "3600"}) + queue.send_message( + MessageBody="message-1", + MessageGroupId="group" + ) + queue.send_message( + MessageBody="message-2", + MessageGroupId="group" + ) + + messages = queue.receive_messages() + messages.should.have.length_of(1) + message = messages[0] + + # received message is not deleted! + + messages = queue.receive_messages(WaitTimeSeconds=0) + messages.should.have.length_of(0) + + # message is now available again, next one should be available + message.change_visibility(VisibilityTimeout=0) + messages = queue.receive_messages() + messages.should.have.length_of(1) + messages[0].message_id.should.equal(message.message_id) + + +@mock_sqs +def test_receive_messages_with_message_group_id_on_visibility_timeout(): + if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'true': + raise SkipTest('Cant manipulate time in server mode') + + with freeze_time("2015-01-01 12:00:00"): + sqs = boto3.resource('sqs', region_name='us-east-1') + queue = sqs.create_queue(QueueName="test-queue.fifo", + Attributes={ + 'FifoQueue': 'true', + }) + queue.set_attributes(Attributes={"VisibilityTimeout": "3600"}) + queue.send_message( + MessageBody="message-1", + MessageGroupId="group" + ) + queue.send_message( + MessageBody="message-2", + MessageGroupId="group" + ) + + messages = queue.receive_messages() + messages.should.have.length_of(1) + message = messages[0] + + # received message is not deleted! + + messages = queue.receive_messages(WaitTimeSeconds=0) + messages.should.have.length_of(0) + + message.change_visibility(VisibilityTimeout=10) + + with freeze_time("2015-01-01 12:00:05"): + # no timeout yet + messages = queue.receive_messages(WaitTimeSeconds=0) + messages.should.have.length_of(0) + + with freeze_time("2015-01-01 12:00:15"): + # message is now available again, next one should be available + messages = queue.receive_messages() + messages.should.have.length_of(1) + messages[0].message_id.should.equal(message.message_id)