Fix: FIFO DLQ locks message group id (#3537)
* fix https://github.com/localstack/localstack/issues/3319 * fix review comments
This commit is contained in:
parent
7b97141184
commit
54bd336457
@ -818,8 +818,6 @@ class SQSBackend(BaseBackend):
|
|||||||
# A previous call is still processing messages in this group, so we cannot deliver this one.
|
# A previous call is still processing messages in this group, so we cannot deliver this one.
|
||||||
continue
|
continue
|
||||||
|
|
||||||
queue.pending_messages.add(message)
|
|
||||||
|
|
||||||
if (
|
if (
|
||||||
queue.dead_letter_queue is not None
|
queue.dead_letter_queue is not None
|
||||||
and message.approximate_receive_count
|
and message.approximate_receive_count
|
||||||
@ -828,6 +826,7 @@ class SQSBackend(BaseBackend):
|
|||||||
messages_to_dlq.append(message)
|
messages_to_dlq.append(message)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
queue.pending_messages.add(message)
|
||||||
message.mark_received(visibility_timeout=visibility_timeout)
|
message.mark_received(visibility_timeout=visibility_timeout)
|
||||||
_filter_message_attributes(message, message_attribute_names)
|
_filter_message_attributes(message, message_attribute_names)
|
||||||
result.append(message)
|
result.append(message)
|
||||||
|
@ -2384,3 +2384,42 @@ def test_fifo_queue_send_deduplicationid_same_as_sha256_of_old_message():
|
|||||||
)
|
)
|
||||||
messages = msg_queue.receive_messages(MaxNumberOfMessages=2)
|
messages = msg_queue.receive_messages(MaxNumberOfMessages=2)
|
||||||
messages.should.have.length_of(1)
|
messages.should.have.length_of(1)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_fifo_send_message_when_same_group_id_is_in_dlq():
|
||||||
|
|
||||||
|
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||||
|
dlq = sqs.create_queue(
|
||||||
|
QueueName="test-queue-dlq.fifo", Attributes={"FifoQueue": "true"}
|
||||||
|
)
|
||||||
|
|
||||||
|
queue = sqs.get_queue_by_name(QueueName="test-queue-dlq.fifo")
|
||||||
|
dead_letter_queue_arn = queue.attributes.get("QueueArn")
|
||||||
|
|
||||||
|
msg_queue = sqs.create_queue(
|
||||||
|
QueueName="test-queue.fifo",
|
||||||
|
Attributes={
|
||||||
|
"FifoQueue": "true",
|
||||||
|
"RedrivePolicy": json.dumps(
|
||||||
|
{"deadLetterTargetArn": dead_letter_queue_arn, "maxReceiveCount": 1},
|
||||||
|
),
|
||||||
|
"VisibilityTimeout": "1",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
msg_queue.send_message(MessageBody="first", MessageGroupId="1")
|
||||||
|
messages = msg_queue.receive_messages()
|
||||||
|
messages.should.have.length_of(1)
|
||||||
|
|
||||||
|
time.sleep(1.1)
|
||||||
|
|
||||||
|
messages = msg_queue.receive_messages()
|
||||||
|
messages.should.have.length_of(0)
|
||||||
|
|
||||||
|
messages = dlq.receive_messages()
|
||||||
|
messages.should.have.length_of(1)
|
||||||
|
|
||||||
|
msg_queue.send_message(MessageBody="second", MessageGroupId="1")
|
||||||
|
messages = msg_queue.receive_messages()
|
||||||
|
messages.should.have.length_of(1)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user