SQS - Return multiple group-messages in the same request

This commit is contained in:
Bert Blommers 2020-05-24 12:12:35 +01:00
parent 5bd3588f74
commit 1ef3094e45
3 changed files with 65 additions and 25 deletions

View File

@ -6,6 +6,7 @@ import json
import re
import six
import struct
from copy import deepcopy
from xml.sax.saxutils import escape
from boto3 import Session
@ -101,7 +102,6 @@ class Message(BaseModel):
if data_type == "String" or data_type == "Number":
value = attr["string_value"]
elif data_type == "Binary":
print(data_type, attr["binary_value"], type(attr["binary_value"]))
value = base64.b64decode(attr["binary_value"])
else:
print(
@ -722,6 +722,7 @@ class SQSBackend(BaseBackend):
previous_result_count = len(result)
polling_end = unix_time() + wait_seconds_timeout
currently_pending_groups = deepcopy(queue.pending_message_groups)
# queue.messages only contains visible messages
while True:
@ -739,11 +740,11 @@ class SQSBackend(BaseBackend):
# The message is pending but is visible again, so the
# consumer must have timed out.
queue.pending_messages.remove(message)
currently_pending_groups = deepcopy(queue.pending_message_groups)
if message.group_id and queue.fifo_queue:
if message.group_id in queue.pending_message_groups:
# There is already one active message with the same
# group, so we cannot deliver this one.
if message.group_id in currently_pending_groups:
# A previous call is still processing messages in this group, so we cannot deliver this one.
continue
queue.pending_messages.add(message)

View File

@ -232,6 +232,14 @@ class SQSResponse(BaseResponse):
queue_name = self._get_queue_name()
if not message_group_id:
queue = self.sqs_backend.get_queue(queue_name)
if queue.attributes.get("FifoQueue", False):
return self._error(
"MissingParameter",
"The request must contain the parameter MessageGroupId.",
)
message = self.sqs_backend.send_message(
queue_name,
message,

View File

@ -1164,7 +1164,7 @@ def test_send_message_batch_with_empty_list():
@mock_sqs
def test_batch_change_message_visibility():
if os.environ.get("TEST_SERVER_MODE", "false").lower() == "true":
if settings.TEST_SERVER_MODE:
raise SkipTest("Cant manipulate time in server mode")
with freeze_time("2015-01-01 12:00:00"):
@ -1174,9 +1174,15 @@ def test_batch_change_message_visibility():
)
queue_url = resp["QueueUrl"]
sqs.send_message(QueueUrl=queue_url, MessageBody="msg1")
sqs.send_message(QueueUrl=queue_url, MessageBody="msg2")
sqs.send_message(QueueUrl=queue_url, MessageBody="msg3")
sqs.send_message(
QueueUrl=queue_url, MessageBody="msg1", MessageGroupId="group1"
)
sqs.send_message(
QueueUrl=queue_url, MessageBody="msg2", MessageGroupId="group2"
)
sqs.send_message(
QueueUrl=queue_url, MessageBody="msg3", MessageGroupId="group3"
)
with freeze_time("2015-01-01 12:01:00"):
receive_resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=2)
@ -1529,7 +1535,7 @@ def test_create_fifo_queue_with_dlq():
@mock_sqs
def test_queue_with_dlq():
if os.environ.get("TEST_SERVER_MODE", "false").lower() == "true":
if settings.TEST_SERVER_MODE:
raise SkipTest("Cant manipulate time in server mode")
sqs = boto3.client("sqs", region_name="us-east-1")
@ -1554,8 +1560,12 @@ def test_queue_with_dlq():
)
queue_url2 = resp["QueueUrl"]
sqs.send_message(QueueUrl=queue_url2, MessageBody="msg1")
sqs.send_message(QueueUrl=queue_url2, MessageBody="msg2")
sqs.send_message(
QueueUrl=queue_url2, MessageBody="msg1", MessageGroupId="group"
)
sqs.send_message(
QueueUrl=queue_url2, MessageBody="msg2", MessageGroupId="group"
)
with freeze_time("2015-01-01 13:00:00"):
resp = sqs.receive_message(
@ -1686,20 +1696,24 @@ def test_receive_messages_with_message_group_id():
queue.set_attributes(Attributes={"VisibilityTimeout": "3600"})
queue.send_message(MessageBody="message-1", MessageGroupId="group")
queue.send_message(MessageBody="message-2", MessageGroupId="group")
queue.send_message(MessageBody="message-3", MessageGroupId="group")
queue.send_message(MessageBody="separate-message", MessageGroupId="anothergroup")
messages = queue.receive_messages()
messages.should.have.length_of(1)
message = messages[0]
messages = queue.receive_messages(MaxNumberOfMessages=2)
messages.should.have.length_of(2)
messages[0].attributes["MessageGroupId"].should.equal("group")
# received message is not deleted!
messages = queue.receive_messages(WaitTimeSeconds=0)
messages.should.have.length_of(0)
# Different client can not 'see' messages from the group until they are processed
messages_for_client_2 = queue.receive_messages(WaitTimeSeconds=0)
messages_for_client_2.should.have.length_of(1)
messages_for_client_2[0].body.should.equal("separate-message")
# message is now processed, next one should be available
message.delete()
for message in messages:
message.delete()
messages = queue.receive_messages()
messages.should.have.length_of(1)
messages[0].body.should.equal("message-3")
@mock_sqs
@ -1730,7 +1744,7 @@ def test_receive_messages_with_message_group_id_on_requeue():
@mock_sqs
def test_receive_messages_with_message_group_id_on_visibility_timeout():
if os.environ.get("TEST_SERVER_MODE", "false").lower() == "true":
if settings.TEST_SERVER_MODE:
raise SkipTest("Cant manipulate time in server mode")
with freeze_time("2015-01-01 12:00:00"):
@ -1746,12 +1760,12 @@ def test_receive_messages_with_message_group_id_on_visibility_timeout():
messages.should.have.length_of(1)
message = messages[0]
# received message is not deleted!
# received message is not processed yet
messages_for_second_client = queue.receive_messages(WaitTimeSeconds=0)
messages_for_second_client.should.have.length_of(0)
messages = queue.receive_messages(WaitTimeSeconds=0)
messages.should.have.length_of(0)
message.change_visibility(VisibilityTimeout=10)
for message in messages:
message.change_visibility(VisibilityTimeout=10)
with freeze_time("2015-01-01 12:00:05"):
# no timeout yet
@ -1794,3 +1808,20 @@ def test_list_queues_limits_to_1000_queues():
list(resource.queues.filter(QueueNamePrefix="test-queue")).should.have.length_of(
1000
)
@mock_sqs
def test_send_messages_to_fifo_without_message_group_id():
sqs = boto3.resource("sqs", region_name="eu-west-3")
queue = sqs.create_queue(
QueueName="blah.fifo",
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)
with assert_raises(Exception) as e:
queue.send_message(MessageBody="message-1")
ex = e.exception
ex.response["Error"]["Code"].should.equal("MissingParameter")
ex.response["Error"]["Message"].should.equal(
"The request must contain the parameter MessageGroupId."
)