Merge pull request #1656 from Birne94/sqs-message-group-id
Add MessageGroupId support to SQS queues (fixes #1655)
This commit is contained in:
commit
919c0c9b04
@ -180,6 +180,7 @@ class Queue(BaseModel):
|
|||||||
self.permissions = {}
|
self.permissions = {}
|
||||||
|
|
||||||
self._messages = []
|
self._messages = []
|
||||||
|
self._pending_messages = set()
|
||||||
|
|
||||||
now = unix_time()
|
now = unix_time()
|
||||||
self.created_timestamp = now
|
self.created_timestamp = now
|
||||||
@ -209,6 +210,16 @@ class Queue(BaseModel):
|
|||||||
if self.fifo_queue and not self.name.endswith('.fifo'):
|
if self.fifo_queue and not self.name.endswith('.fifo'):
|
||||||
raise MessageAttributesInvalid('Queue name must end in .fifo for FIFO queues')
|
raise MessageAttributesInvalid('Queue name must end in .fifo for FIFO queues')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pending_messages(self):
|
||||||
|
return self._pending_messages
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pending_message_groups(self):
|
||||||
|
return set(message.group_id
|
||||||
|
for message in self._pending_messages
|
||||||
|
if message.group_id is not None)
|
||||||
|
|
||||||
def _set_attributes(self, attributes, now=None):
|
def _set_attributes(self, attributes, now=None):
|
||||||
if not now:
|
if not now:
|
||||||
now = unix_time()
|
now = unix_time()
|
||||||
@ -454,6 +465,7 @@ class SQSBackend(BaseBackend):
|
|||||||
"""
|
"""
|
||||||
queue = self.get_queue(queue_name)
|
queue = self.get_queue(queue_name)
|
||||||
result = []
|
result = []
|
||||||
|
previous_result_count = len(result)
|
||||||
|
|
||||||
polling_end = unix_time() + wait_seconds_timeout
|
polling_end = unix_time() + wait_seconds_timeout
|
||||||
|
|
||||||
@ -463,19 +475,25 @@ class SQSBackend(BaseBackend):
|
|||||||
if result or (wait_seconds_timeout and unix_time() > polling_end):
|
if result or (wait_seconds_timeout and unix_time() > polling_end):
|
||||||
break
|
break
|
||||||
|
|
||||||
if len(queue.messages) == 0:
|
|
||||||
# we want to break here, otherwise it will be an infinite loop
|
|
||||||
if wait_seconds_timeout == 0:
|
|
||||||
break
|
|
||||||
|
|
||||||
import time
|
|
||||||
time.sleep(0.001)
|
|
||||||
continue
|
|
||||||
|
|
||||||
messages_to_dlq = []
|
messages_to_dlq = []
|
||||||
|
|
||||||
for message in queue.messages:
|
for message in queue.messages:
|
||||||
if not message.visible:
|
if not message.visible:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if message in queue.pending_messages:
|
||||||
|
# The message is pending but is visible again, so the
|
||||||
|
# consumer must have timed out.
|
||||||
|
queue.pending_messages.remove(message)
|
||||||
|
|
||||||
|
if message.group_id and queue.fifo_queue:
|
||||||
|
if message.group_id in queue.pending_message_groups:
|
||||||
|
# There is already one active message with the same
|
||||||
|
# group, so we cannot deliver this one.
|
||||||
|
continue
|
||||||
|
|
||||||
|
queue.pending_messages.add(message)
|
||||||
|
|
||||||
if queue.dead_letter_queue is not None and message.approximate_receive_count >= queue.redrive_policy['maxReceiveCount']:
|
if queue.dead_letter_queue is not None and message.approximate_receive_count >= queue.redrive_policy['maxReceiveCount']:
|
||||||
messages_to_dlq.append(message)
|
messages_to_dlq.append(message)
|
||||||
continue
|
continue
|
||||||
@ -491,6 +509,18 @@ class SQSBackend(BaseBackend):
|
|||||||
queue._messages.remove(message)
|
queue._messages.remove(message)
|
||||||
queue.dead_letter_queue.add_message(message)
|
queue.dead_letter_queue.add_message(message)
|
||||||
|
|
||||||
|
if previous_result_count == len(result):
|
||||||
|
if wait_seconds_timeout == 0:
|
||||||
|
# There is timeout and we have added no additional results,
|
||||||
|
# so break to avoid an infinite loop.
|
||||||
|
break
|
||||||
|
|
||||||
|
import time
|
||||||
|
time.sleep(0.001)
|
||||||
|
continue
|
||||||
|
|
||||||
|
previous_result_count = len(result)
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def delete_message(self, queue_name, receipt_handle):
|
def delete_message(self, queue_name, receipt_handle):
|
||||||
@ -500,6 +530,7 @@ class SQSBackend(BaseBackend):
|
|||||||
# Only delete message if it is not visible and the reciept_handle
|
# Only delete message if it is not visible and the reciept_handle
|
||||||
# matches.
|
# matches.
|
||||||
if message.receipt_handle == receipt_handle:
|
if message.receipt_handle == receipt_handle:
|
||||||
|
queue.pending_messages.remove(message)
|
||||||
continue
|
continue
|
||||||
new_messages.append(message)
|
new_messages.append(message)
|
||||||
queue._messages = new_messages
|
queue._messages = new_messages
|
||||||
@ -511,6 +542,10 @@ class SQSBackend(BaseBackend):
|
|||||||
if message.visible:
|
if message.visible:
|
||||||
raise MessageNotInflight
|
raise MessageNotInflight
|
||||||
message.change_visibility(visibility_timeout)
|
message.change_visibility(visibility_timeout)
|
||||||
|
if message.visible:
|
||||||
|
# If the message is visible again, remove it from pending
|
||||||
|
# messages.
|
||||||
|
queue.pending_messages.remove(message)
|
||||||
return
|
return
|
||||||
raise ReceiptHandleIsInvalid
|
raise ReceiptHandleIsInvalid
|
||||||
|
|
||||||
|
@ -1063,3 +1063,112 @@ def test_redrive_policy_set_attributes():
|
|||||||
assert 'RedrivePolicy' in copy.attributes
|
assert 'RedrivePolicy' in copy.attributes
|
||||||
copy_policy = json.loads(copy.attributes['RedrivePolicy'])
|
copy_policy = json.loads(copy.attributes['RedrivePolicy'])
|
||||||
assert copy_policy == redrive_policy
|
assert copy_policy == redrive_policy
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_receive_messages_with_message_group_id():
|
||||||
|
sqs = boto3.resource('sqs', region_name='us-east-1')
|
||||||
|
queue = sqs.create_queue(QueueName="test-queue.fifo",
|
||||||
|
Attributes={
|
||||||
|
'FifoQueue': 'true',
|
||||||
|
})
|
||||||
|
queue.set_attributes(Attributes={"VisibilityTimeout": "3600"})
|
||||||
|
queue.send_message(
|
||||||
|
MessageBody="message-1",
|
||||||
|
MessageGroupId="group"
|
||||||
|
)
|
||||||
|
queue.send_message(
|
||||||
|
MessageBody="message-2",
|
||||||
|
MessageGroupId="group"
|
||||||
|
)
|
||||||
|
|
||||||
|
messages = queue.receive_messages()
|
||||||
|
messages.should.have.length_of(1)
|
||||||
|
message = messages[0]
|
||||||
|
|
||||||
|
# received message is not deleted!
|
||||||
|
|
||||||
|
messages = queue.receive_messages(WaitTimeSeconds=0)
|
||||||
|
messages.should.have.length_of(0)
|
||||||
|
|
||||||
|
# message is now processed, next one should be available
|
||||||
|
message.delete()
|
||||||
|
messages = queue.receive_messages()
|
||||||
|
messages.should.have.length_of(1)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_receive_messages_with_message_group_id_on_requeue():
|
||||||
|
sqs = boto3.resource('sqs', region_name='us-east-1')
|
||||||
|
queue = sqs.create_queue(QueueName="test-queue.fifo",
|
||||||
|
Attributes={
|
||||||
|
'FifoQueue': 'true',
|
||||||
|
})
|
||||||
|
queue.set_attributes(Attributes={"VisibilityTimeout": "3600"})
|
||||||
|
queue.send_message(
|
||||||
|
MessageBody="message-1",
|
||||||
|
MessageGroupId="group"
|
||||||
|
)
|
||||||
|
queue.send_message(
|
||||||
|
MessageBody="message-2",
|
||||||
|
MessageGroupId="group"
|
||||||
|
)
|
||||||
|
|
||||||
|
messages = queue.receive_messages()
|
||||||
|
messages.should.have.length_of(1)
|
||||||
|
message = messages[0]
|
||||||
|
|
||||||
|
# received message is not deleted!
|
||||||
|
|
||||||
|
messages = queue.receive_messages(WaitTimeSeconds=0)
|
||||||
|
messages.should.have.length_of(0)
|
||||||
|
|
||||||
|
# message is now available again, next one should be available
|
||||||
|
message.change_visibility(VisibilityTimeout=0)
|
||||||
|
messages = queue.receive_messages()
|
||||||
|
messages.should.have.length_of(1)
|
||||||
|
messages[0].message_id.should.equal(message.message_id)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_receive_messages_with_message_group_id_on_visibility_timeout():
|
||||||
|
if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'true':
|
||||||
|
raise SkipTest('Cant manipulate time in server mode')
|
||||||
|
|
||||||
|
with freeze_time("2015-01-01 12:00:00"):
|
||||||
|
sqs = boto3.resource('sqs', region_name='us-east-1')
|
||||||
|
queue = sqs.create_queue(QueueName="test-queue.fifo",
|
||||||
|
Attributes={
|
||||||
|
'FifoQueue': 'true',
|
||||||
|
})
|
||||||
|
queue.set_attributes(Attributes={"VisibilityTimeout": "3600"})
|
||||||
|
queue.send_message(
|
||||||
|
MessageBody="message-1",
|
||||||
|
MessageGroupId="group"
|
||||||
|
)
|
||||||
|
queue.send_message(
|
||||||
|
MessageBody="message-2",
|
||||||
|
MessageGroupId="group"
|
||||||
|
)
|
||||||
|
|
||||||
|
messages = queue.receive_messages()
|
||||||
|
messages.should.have.length_of(1)
|
||||||
|
message = messages[0]
|
||||||
|
|
||||||
|
# received message is not deleted!
|
||||||
|
|
||||||
|
messages = queue.receive_messages(WaitTimeSeconds=0)
|
||||||
|
messages.should.have.length_of(0)
|
||||||
|
|
||||||
|
message.change_visibility(VisibilityTimeout=10)
|
||||||
|
|
||||||
|
with freeze_time("2015-01-01 12:00:05"):
|
||||||
|
# no timeout yet
|
||||||
|
messages = queue.receive_messages(WaitTimeSeconds=0)
|
||||||
|
messages.should.have.length_of(0)
|
||||||
|
|
||||||
|
with freeze_time("2015-01-01 12:00:15"):
|
||||||
|
# message is now available again, next one should be available
|
||||||
|
messages = queue.receive_messages()
|
||||||
|
messages.should.have.length_of(1)
|
||||||
|
messages[0].message_id.should.equal(message.message_id)
|
||||||
|
Loading…
Reference in New Issue
Block a user