Add MessageGroupId support to SQS queues (fixes #1655)

This commit is contained in:
Daniel Birnstiel 2018-05-30 09:22:46 +02:00
parent cb364eedc6
commit 9b8e62e1f1
2 changed files with 153 additions and 9 deletions

View File

@ -180,6 +180,7 @@ class Queue(BaseModel):
self.permissions = {}
self._messages = []
self._pending_messages = set()
now = unix_time()
self.created_timestamp = now
@ -209,6 +210,16 @@ class Queue(BaseModel):
if self.fifo_queue and not self.name.endswith('.fifo'):
raise MessageAttributesInvalid('Queue name must end in .fifo for FIFO queues')
@property
def pending_messages(self):
return self._pending_messages
@property
def pending_message_groups(self):
return set(message.group_id
for message in self._pending_messages
if message.group_id is not None)
def _set_attributes(self, attributes, now=None):
if not now:
now = unix_time()
@ -448,6 +459,7 @@ class SQSBackend(BaseBackend):
"""
queue = self.get_queue(queue_name)
result = []
previous_result_count = len(result)
polling_end = unix_time() + wait_seconds_timeout
@ -457,19 +469,25 @@ class SQSBackend(BaseBackend):
if result or (wait_seconds_timeout and unix_time() > polling_end):
break
if len(queue.messages) == 0:
# we want to break here, otherwise it will be an infinite loop
if wait_seconds_timeout == 0:
break
import time
time.sleep(0.001)
continue
messages_to_dlq = []
for message in queue.messages:
if not message.visible:
continue
if message in queue.pending_messages:
# The message is pending but is visible again, so the
# consumer must have timed out.
queue.pending_messages.remove(message)
if message.group_id and queue.fifo_queue:
if message.group_id in queue.pending_message_groups:
# There is already one active message with the same
# group, so we cannot deliver this one.
continue
queue.pending_messages.add(message)
if queue.dead_letter_queue is not None and message.approximate_receive_count >= queue.redrive_policy['maxReceiveCount']:
messages_to_dlq.append(message)
continue
@ -485,6 +503,18 @@ class SQSBackend(BaseBackend):
queue._messages.remove(message)
queue.dead_letter_queue.add_message(message)
if previous_result_count == len(result):
if wait_seconds_timeout == 0:
# There is timeout and we have added no additional results,
# so break to avoid an infinite loop.
break
import time
time.sleep(0.001)
continue
previous_result_count = len(result)
return result
def delete_message(self, queue_name, receipt_handle):
@ -494,6 +524,7 @@ class SQSBackend(BaseBackend):
# Only delete message if it is not visible and the reciept_handle
# matches.
if message.receipt_handle == receipt_handle:
queue.pending_messages.remove(message)
continue
new_messages.append(message)
queue._messages = new_messages
@ -505,6 +536,10 @@ class SQSBackend(BaseBackend):
if message.visible:
raise MessageNotInflight
message.change_visibility(visibility_timeout)
if message.visible:
# If the message is visible again, remove it from pending
# messages.
queue.pending_messages.remove(message)
return
raise ReceiptHandleIsInvalid

View File

@ -1063,3 +1063,112 @@ def test_redrive_policy_set_attributes():
assert 'RedrivePolicy' in copy.attributes
copy_policy = json.loads(copy.attributes['RedrivePolicy'])
assert copy_policy == redrive_policy
@mock_sqs
def test_receive_messages_with_message_group_id():
sqs = boto3.resource('sqs', region_name='us-east-1')
queue = sqs.create_queue(QueueName="test-queue.fifo",
Attributes={
'FifoQueue': 'true',
})
queue.set_attributes(Attributes={"VisibilityTimeout": "3600"})
queue.send_message(
MessageBody="message-1",
MessageGroupId="group"
)
queue.send_message(
MessageBody="message-2",
MessageGroupId="group"
)
messages = queue.receive_messages()
messages.should.have.length_of(1)
message = messages[0]
# received message is not deleted!
messages = queue.receive_messages(WaitTimeSeconds=0)
messages.should.have.length_of(0)
# message is now processed, next one should be available
message.delete()
messages = queue.receive_messages()
messages.should.have.length_of(1)
@mock_sqs
def test_receive_messages_with_message_group_id_on_requeue():
sqs = boto3.resource('sqs', region_name='us-east-1')
queue = sqs.create_queue(QueueName="test-queue.fifo",
Attributes={
'FifoQueue': 'true',
})
queue.set_attributes(Attributes={"VisibilityTimeout": "3600"})
queue.send_message(
MessageBody="message-1",
MessageGroupId="group"
)
queue.send_message(
MessageBody="message-2",
MessageGroupId="group"
)
messages = queue.receive_messages()
messages.should.have.length_of(1)
message = messages[0]
# received message is not deleted!
messages = queue.receive_messages(WaitTimeSeconds=0)
messages.should.have.length_of(0)
# message is now available again, next one should be available
message.change_visibility(VisibilityTimeout=0)
messages = queue.receive_messages()
messages.should.have.length_of(1)
messages[0].message_id.should.equal(message.message_id)
@mock_sqs
def test_receive_messages_with_message_group_id_on_visibility_timeout():
if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'true':
raise SkipTest('Cant manipulate time in server mode')
with freeze_time("2015-01-01 12:00:00"):
sqs = boto3.resource('sqs', region_name='us-east-1')
queue = sqs.create_queue(QueueName="test-queue.fifo",
Attributes={
'FifoQueue': 'true',
})
queue.set_attributes(Attributes={"VisibilityTimeout": "3600"})
queue.send_message(
MessageBody="message-1",
MessageGroupId="group"
)
queue.send_message(
MessageBody="message-2",
MessageGroupId="group"
)
messages = queue.receive_messages()
messages.should.have.length_of(1)
message = messages[0]
# received message is not deleted!
messages = queue.receive_messages(WaitTimeSeconds=0)
messages.should.have.length_of(0)
message.change_visibility(VisibilityTimeout=10)
with freeze_time("2015-01-01 12:00:05"):
# no timeout yet
messages = queue.receive_messages(WaitTimeSeconds=0)
messages.should.have.length_of(0)
with freeze_time("2015-01-01 12:00:15"):
# message is now available again, next one should be available
messages = queue.receive_messages()
messages.should.have.length_of(1)
messages[0].message_id.should.equal(message.message_id)