SQS - Fix message attributes disappearing on multiple receives (#4599)
This commit is contained in:
parent
be197caba6
commit
88e1aef254
@ -534,7 +534,12 @@ class Queue(CloudFormationModel):
|
|||||||
[backend.delete_message(self.name, m.receipt_handle) for m in messages]
|
[backend.delete_message(self.name, m.receipt_handle) for m in messages]
|
||||||
else:
|
else:
|
||||||
# Make messages visible again
|
# Make messages visible again
|
||||||
[m.change_visibility(visibility_timeout=0) for m in messages]
|
[
|
||||||
|
backend.change_message_visibility(
|
||||||
|
self.name, m.receipt_handle, visibility_timeout=0
|
||||||
|
)
|
||||||
|
for m in messages
|
||||||
|
]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def has_cfn_attr(cls, attribute_name):
|
def has_cfn_attr(cls, attribute_name):
|
||||||
@ -862,12 +867,14 @@ class SQSBackend(BaseBackend):
|
|||||||
|
|
||||||
queue.pending_messages.add(message)
|
queue.pending_messages.add(message)
|
||||||
message.mark_received(visibility_timeout=visibility_timeout)
|
message.mark_received(visibility_timeout=visibility_timeout)
|
||||||
_filter_message_attributes(message, message_attribute_names)
|
# Create deepcopy to not mutate the message state when filtering for attributes
|
||||||
|
message_copy = deepcopy(message)
|
||||||
|
_filter_message_attributes(message_copy, message_attribute_names)
|
||||||
if not self.is_message_valid_based_on_retention_period(
|
if not self.is_message_valid_based_on_retention_period(
|
||||||
queue_name, message
|
queue_name, message
|
||||||
):
|
):
|
||||||
break
|
break
|
||||||
result.append(message)
|
result.append(message_copy)
|
||||||
if len(result) >= count:
|
if len(result) >= count:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -3315,3 +3315,38 @@ def test_message_attributes_contains_trace_header():
|
|||||||
messages[0]["Attributes"]["AWSTraceHeader"]
|
messages[0]["Attributes"]["AWSTraceHeader"]
|
||||||
== "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"
|
== "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_receive_message_again_preserves_attributes():
|
||||||
|
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||||
|
conn = boto3.client("sqs", region_name="us-east-1")
|
||||||
|
q_name = str(uuid4())[0:6]
|
||||||
|
q_resp = conn.create_queue(QueueName=q_name)
|
||||||
|
queue = sqs.Queue(q_resp["QueueUrl"])
|
||||||
|
body_one = "this is a test message"
|
||||||
|
|
||||||
|
queue.send_message(
|
||||||
|
MessageBody=body_one,
|
||||||
|
MessageAttributes={
|
||||||
|
"Custom1": {"StringValue": "Custom_Value_1", "DataType": "String"},
|
||||||
|
"Custom2": {"StringValue": "Custom_Value_2", "DataType": "String"},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
first_messages = conn.receive_message(
|
||||||
|
QueueUrl=queue.url,
|
||||||
|
MaxNumberOfMessages=2,
|
||||||
|
MessageAttributeNames=["Custom1"],
|
||||||
|
VisibilityTimeout=0,
|
||||||
|
)["Messages"]
|
||||||
|
assert len(first_messages[0]["MessageAttributes"]) == 1
|
||||||
|
assert first_messages[0]["MessageAttributes"].get("Custom1") is not None
|
||||||
|
assert first_messages[0]["MessageAttributes"].get("Custom2") is None
|
||||||
|
|
||||||
|
second_messages = conn.receive_message(
|
||||||
|
QueueUrl=queue.url, MaxNumberOfMessages=2, MessageAttributeNames=["All"],
|
||||||
|
)["Messages"]
|
||||||
|
assert len(second_messages[0]["MessageAttributes"]) == 2
|
||||||
|
assert second_messages[0]["MessageAttributes"].get("Custom1") is not None
|
||||||
|
assert second_messages[0]["MessageAttributes"].get("Custom2") is not None
|
||||||
|
Loading…
Reference in New Issue
Block a user