Fix sqs message retention logic (#3924)
* Fix sqs message retention logic * Apply lint to moto/sqs/models.py * Fix failed tests because of freezing time * Fix freezing time in test_publish_to_sqs_in_different_region
This commit is contained in:
parent
9e3faf7784
commit
f9e0595e12
@ -836,7 +836,9 @@ class SQSBackend(BaseBackend):
|
|||||||
queue.pending_messages.add(message)
|
queue.pending_messages.add(message)
|
||||||
message.mark_received(visibility_timeout=visibility_timeout)
|
message.mark_received(visibility_timeout=visibility_timeout)
|
||||||
_filter_message_attributes(message, message_attribute_names)
|
_filter_message_attributes(message, message_attribute_names)
|
||||||
if not self.is_message_valid_based_on_retention_period(queue_name):
|
if not self.is_message_valid_based_on_retention_period(
|
||||||
|
queue_name, message
|
||||||
|
):
|
||||||
break
|
break
|
||||||
result.append(message)
|
result.append(message)
|
||||||
if len(result) >= count:
|
if len(result) >= count:
|
||||||
@ -1015,11 +1017,12 @@ class SQSBackend(BaseBackend):
|
|||||||
def list_queue_tags(self, queue_name):
|
def list_queue_tags(self, queue_name):
|
||||||
return self.get_queue(queue_name)
|
return self.get_queue(queue_name)
|
||||||
|
|
||||||
def is_message_valid_based_on_retention_period(self, queue_name):
|
def is_message_valid_based_on_retention_period(self, queue_name, message):
|
||||||
message_attributes = self.get_queue_attributes(queue_name, [])
|
message_attributes = self.get_queue_attributes(queue_name, [])
|
||||||
retain_until = message_attributes.get(
|
retain_until = (
|
||||||
"MessageRetentionPeriod"
|
message_attributes.get("MessageRetentionPeriod")
|
||||||
) + message_attributes.get("CreatedTimestamp")
|
+ message.sent_timestamp / 1000
|
||||||
|
)
|
||||||
if retain_until <= unix_time():
|
if retain_until <= unix_time():
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
@ -46,6 +46,7 @@ def test_publish_to_sqs():
|
|||||||
]
|
]
|
||||||
|
|
||||||
queue = sqs_conn.get_queue("test-queue")
|
queue = sqs_conn.get_queue("test-queue")
|
||||||
|
with freeze_time("2015-01-01 12:00:01"):
|
||||||
message = queue.read(1)
|
message = queue.read(1)
|
||||||
expected = MESSAGE_FROM_SQS_TEMPLATE % (
|
expected = MESSAGE_FROM_SQS_TEMPLATE % (
|
||||||
message_to_publish,
|
message_to_publish,
|
||||||
@ -89,6 +90,7 @@ def test_publish_to_sqs_in_different_region():
|
|||||||
]
|
]
|
||||||
|
|
||||||
queue = sqs_conn.get_queue("test-queue")
|
queue = sqs_conn.get_queue("test-queue")
|
||||||
|
with freeze_time("2015-01-01 12:00:01"):
|
||||||
message = queue.read(1)
|
message = queue.read(1)
|
||||||
expected = MESSAGE_FROM_SQS_TEMPLATE % (
|
expected = MESSAGE_FROM_SQS_TEMPLATE % (
|
||||||
message_to_publish,
|
message_to_publish,
|
||||||
|
@ -46,6 +46,7 @@ def test_publish_to_sqs():
|
|||||||
published_message_id = published_message["MessageId"]
|
published_message_id = published_message["MessageId"]
|
||||||
|
|
||||||
queue = sqs_conn.get_queue_by_name(QueueName="test-queue")
|
queue = sqs_conn.get_queue_by_name(QueueName="test-queue")
|
||||||
|
with freeze_time("2015-01-01 12:00:01"):
|
||||||
messages = queue.receive_messages(MaxNumberOfMessages=1)
|
messages = queue.receive_messages(MaxNumberOfMessages=1)
|
||||||
expected = MESSAGE_FROM_SQS_TEMPLATE % (message, published_message_id, "us-east-1")
|
expected = MESSAGE_FROM_SQS_TEMPLATE % (message, published_message_id, "us-east-1")
|
||||||
acquired_message = re.sub(
|
acquired_message = re.sub(
|
||||||
@ -77,6 +78,7 @@ def test_publish_to_sqs_raw():
|
|||||||
with freeze_time("2015-01-01 12:00:00"):
|
with freeze_time("2015-01-01 12:00:00"):
|
||||||
topic.publish(Message=message)
|
topic.publish(Message=message)
|
||||||
|
|
||||||
|
with freeze_time("2015-01-01 12:00:01"):
|
||||||
messages = queue.receive_messages(MaxNumberOfMessages=1)
|
messages = queue.receive_messages(MaxNumberOfMessages=1)
|
||||||
messages[0].body.should.equal(message)
|
messages[0].body.should.equal(message)
|
||||||
|
|
||||||
@ -279,6 +281,7 @@ def test_publish_to_sqs_dump_json():
|
|||||||
published_message_id = published_message["MessageId"]
|
published_message_id = published_message["MessageId"]
|
||||||
|
|
||||||
queue = sqs_conn.get_queue_by_name(QueueName="test-queue")
|
queue = sqs_conn.get_queue_by_name(QueueName="test-queue")
|
||||||
|
with freeze_time("2015-01-01 12:00:01"):
|
||||||
messages = queue.receive_messages(MaxNumberOfMessages=1)
|
messages = queue.receive_messages(MaxNumberOfMessages=1)
|
||||||
|
|
||||||
escaped = message.replace('"', '\\"')
|
escaped = message.replace('"', '\\"')
|
||||||
@ -314,6 +317,7 @@ def test_publish_to_sqs_in_different_region():
|
|||||||
published_message_id = published_message["MessageId"]
|
published_message_id = published_message["MessageId"]
|
||||||
|
|
||||||
queue = sqs_conn.get_queue_by_name(QueueName="test-queue")
|
queue = sqs_conn.get_queue_by_name(QueueName="test-queue")
|
||||||
|
with freeze_time("2015-01-01 12:00:01"):
|
||||||
messages = queue.receive_messages(MaxNumberOfMessages=1)
|
messages = queue.receive_messages(MaxNumberOfMessages=1)
|
||||||
expected = MESSAGE_FROM_SQS_TEMPLATE % (message, published_message_id, "us-west-1")
|
expected = MESSAGE_FROM_SQS_TEMPLATE % (message, published_message_id, "us-west-1")
|
||||||
acquired_message = re.sub(
|
acquired_message = re.sub(
|
||||||
|
@ -313,6 +313,29 @@ def test_message_retention_period():
|
|||||||
assert len(messages) == 0
|
assert len(messages) == 0
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_queue_retention_period():
|
||||||
|
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||||
|
queue = sqs.create_queue(
|
||||||
|
QueueName="blah", Attributes={"MessageRetentionPeriod": "3"}
|
||||||
|
)
|
||||||
|
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
queue.send_message(
|
||||||
|
MessageBody="derp",
|
||||||
|
MessageAttributes={
|
||||||
|
"SOME_Valid.attribute-Name": {
|
||||||
|
"StringValue": "1493147359900",
|
||||||
|
"DataType": "Number",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
messages = queue.receive_messages()
|
||||||
|
assert len(messages) == 1
|
||||||
|
|
||||||
|
|
||||||
@mock_sqs
|
@mock_sqs
|
||||||
def test_message_with_invalid_attributes():
|
def test_message_with_invalid_attributes():
|
||||||
sqs = boto3.resource("sqs", region_name="us-east-1")
|
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user