Fix:SQS-message retention period consideration (#3642)
* Fix:SQS-message retention period consideration * Fix:SQS-message retention period consideration * Fix:SQS-message retention period consideration * Added comments &linting * Fixed tests
This commit is contained in:
parent
4d0ee82f98
commit
d8097b24dc
@ -827,6 +827,8 @@ class SQSBackend(BaseBackend):
|
|||||||
queue.pending_messages.add(message)
|
queue.pending_messages.add(message)
|
||||||
message.mark_received(visibility_timeout=visibility_timeout)
|
message.mark_received(visibility_timeout=visibility_timeout)
|
||||||
_filter_message_attributes(message, message_attribute_names)
|
_filter_message_attributes(message, message_attribute_names)
|
||||||
|
if not self.is_message_valid_based_on_retention_period(queue_name):
|
||||||
|
break
|
||||||
result.append(message)
|
result.append(message)
|
||||||
if len(result) >= count:
|
if len(result) >= count:
|
||||||
break
|
break
|
||||||
@ -1004,6 +1006,15 @@ class SQSBackend(BaseBackend):
|
|||||||
def list_queue_tags(self, queue_name):
|
def list_queue_tags(self, queue_name):
|
||||||
return self.get_queue(queue_name)
|
return self.get_queue(queue_name)
|
||||||
|
|
||||||
|
def is_message_valid_based_on_retention_period(self, queue_name):
|
||||||
|
message_attributes = self.get_queue_attributes(queue_name, [])
|
||||||
|
retain_until = message_attributes.get(
|
||||||
|
"MessageRetentionPeriod"
|
||||||
|
) + message_attributes.get("CreatedTimestamp")
|
||||||
|
if retain_until <= unix_time():
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
sqs_backends = {}
|
sqs_backends = {}
|
||||||
for region in Session().get_available_regions("sqs"):
|
for region in Session().get_available_regions("sqs"):
|
||||||
|
@ -279,6 +279,40 @@ def test_message_send_with_attributes():
|
|||||||
messages.should.have.length_of(1)
|
messages.should.have.length_of(1)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_message_retention_period():
|
||||||
|
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||||
|
queue = sqs.create_queue(
|
||||||
|
QueueName="blah", Attributes={"MessageRetentionPeriod": "3"}
|
||||||
|
)
|
||||||
|
queue.send_message(
|
||||||
|
MessageBody="derp",
|
||||||
|
MessageAttributes={
|
||||||
|
"SOME_Valid.attribute-Name": {
|
||||||
|
"StringValue": "1493147359900",
|
||||||
|
"DataType": "Number",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
messages = queue.receive_messages()
|
||||||
|
assert len(messages) == 1
|
||||||
|
|
||||||
|
queue.send_message(
|
||||||
|
MessageBody="derp",
|
||||||
|
MessageAttributes={
|
||||||
|
"SOME_Valid.attribute-Name": {
|
||||||
|
"StringValue": "1493147359900",
|
||||||
|
"DataType": "Number",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
time.sleep(5)
|
||||||
|
messages = queue.receive_messages()
|
||||||
|
assert len(messages) == 0
|
||||||
|
|
||||||
|
|
||||||
@mock_sqs
|
@mock_sqs
|
||||||
def test_message_with_invalid_attributes():
|
def test_message_with_invalid_attributes():
|
||||||
sqs = boto3.resource("sqs", region_name="us-east-1")
|
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||||
@ -1935,29 +1969,30 @@ def test_queue_with_dlq():
|
|||||||
resp = sqs.receive_message(
|
resp = sqs.receive_message(
|
||||||
QueueUrl=queue_url2, VisibilityTimeout=30, WaitTimeSeconds=0
|
QueueUrl=queue_url2, VisibilityTimeout=30, WaitTimeSeconds=0
|
||||||
)
|
)
|
||||||
resp["Messages"][0]["Body"].should.equal("msg1")
|
assert resp["Messages"][0]["Body"] == "msg1"
|
||||||
|
|
||||||
with freeze_time("2015-01-01 13:01:00"):
|
with freeze_time("2015-01-01 13:01:00"):
|
||||||
resp = sqs.receive_message(
|
resp = sqs.receive_message(
|
||||||
QueueUrl=queue_url2, VisibilityTimeout=30, WaitTimeSeconds=0
|
QueueUrl=queue_url2, VisibilityTimeout=30, WaitTimeSeconds=0
|
||||||
)
|
)
|
||||||
resp["Messages"][0]["Body"].should.equal("msg1")
|
assert resp["Messages"][0]["Body"] == "msg1"
|
||||||
|
|
||||||
with freeze_time("2015-01-01 13:02:00"):
|
with freeze_time("2015-01-01 13:02:00"):
|
||||||
resp = sqs.receive_message(
|
resp = sqs.receive_message(
|
||||||
QueueUrl=queue_url2, VisibilityTimeout=30, WaitTimeSeconds=0
|
QueueUrl=queue_url2, VisibilityTimeout=30, WaitTimeSeconds=0
|
||||||
)
|
)
|
||||||
len(resp["Messages"]).should.equal(1)
|
assert len(resp["Messages"]) == 1
|
||||||
|
|
||||||
|
with freeze_time("2015-01-01 13:02:00"):
|
||||||
resp = sqs.receive_message(
|
resp = sqs.receive_message(
|
||||||
QueueUrl=queue_url1, VisibilityTimeout=30, WaitTimeSeconds=0
|
QueueUrl=queue_url1, VisibilityTimeout=30, WaitTimeSeconds=0
|
||||||
)
|
)
|
||||||
resp["Messages"][0]["Body"].should.equal("msg1")
|
assert resp["Messages"][0]["Body"] == "msg1"
|
||||||
|
|
||||||
# Might as well test list source queues
|
# Might as well test list source queues
|
||||||
|
|
||||||
resp = sqs.list_dead_letter_source_queues(QueueUrl=queue_url1)
|
resp = sqs.list_dead_letter_source_queues(QueueUrl=queue_url1)
|
||||||
resp["queueUrls"][0].should.equal(queue_url2)
|
assert resp["queueUrls"][0] == queue_url2
|
||||||
|
|
||||||
|
|
||||||
@mock_sqs
|
@mock_sqs
|
||||||
|
Loading…
Reference in New Issue
Block a user