SQS: Implement DelaySeconds validation (#5703)
This commit is contained in:
parent
ff1f420689
commit
97b5e8b3ab
@ -44,6 +44,8 @@ MAXIMUM_MESSAGE_LENGTH = 262144 # 256 KiB
|
|||||||
MAXIMUM_MESSAGE_SIZE_ATTR_LOWER_BOUND = 1024
|
MAXIMUM_MESSAGE_SIZE_ATTR_LOWER_BOUND = 1024
|
||||||
MAXIMUM_MESSAGE_SIZE_ATTR_UPPER_BOUND = MAXIMUM_MESSAGE_LENGTH
|
MAXIMUM_MESSAGE_SIZE_ATTR_UPPER_BOUND = MAXIMUM_MESSAGE_LENGTH
|
||||||
|
|
||||||
|
MAXIMUM_MESSAGE_DELAY = 900
|
||||||
|
|
||||||
TRANSPORT_TYPE_ENCODINGS = {
|
TRANSPORT_TYPE_ENCODINGS = {
|
||||||
"String": b"\x01",
|
"String": b"\x01",
|
||||||
"Binary": b"\x02",
|
"Binary": b"\x02",
|
||||||
@ -799,6 +801,13 @@ class SQSBackend(BaseBackend):
|
|||||||
if message_attributes:
|
if message_attributes:
|
||||||
message.message_attributes = message_attributes
|
message.message_attributes = message_attributes
|
||||||
|
|
||||||
|
if delay_seconds > MAXIMUM_MESSAGE_DELAY:
|
||||||
|
msg = (
|
||||||
|
f"Value {delay_seconds} for parameter DelaySeconds is invalid. "
|
||||||
|
"Reason: DelaySeconds must be >= 0 and <= 900."
|
||||||
|
)
|
||||||
|
raise InvalidParameterValue(msg)
|
||||||
|
|
||||||
message.mark_sent(delay_seconds=delay_seconds)
|
message.mark_sent(delay_seconds=delay_seconds)
|
||||||
|
|
||||||
queue.add_message(message)
|
queue.add_message(message)
|
||||||
@ -834,21 +843,24 @@ class SQSBackend(BaseBackend):
|
|||||||
raise TooManyEntriesInBatchRequest(len(entries))
|
raise TooManyEntriesInBatchRequest(len(entries))
|
||||||
|
|
||||||
messages = []
|
messages = []
|
||||||
|
failedInvalidDelay = []
|
||||||
for entry in entries.values():
|
for entry in entries.values():
|
||||||
# Loop through looking for messages
|
try:
|
||||||
message = self.send_message(
|
# Loop through looking for messages
|
||||||
queue_name,
|
message = self.send_message(
|
||||||
entry["MessageBody"],
|
queue_name,
|
||||||
message_attributes=entry["MessageAttributes"],
|
entry["MessageBody"],
|
||||||
delay_seconds=entry["DelaySeconds"],
|
message_attributes=entry["MessageAttributes"],
|
||||||
group_id=entry.get("MessageGroupId"),
|
delay_seconds=entry["DelaySeconds"],
|
||||||
deduplication_id=entry.get("MessageDeduplicationId"),
|
group_id=entry.get("MessageGroupId"),
|
||||||
)
|
deduplication_id=entry.get("MessageDeduplicationId"),
|
||||||
message.user_id = entry["Id"]
|
)
|
||||||
|
message.user_id = entry["Id"]
|
||||||
|
messages.append(message)
|
||||||
|
except InvalidParameterValue:
|
||||||
|
failedInvalidDelay.append(entry)
|
||||||
|
|
||||||
messages.append(message)
|
return messages, failedInvalidDelay
|
||||||
|
|
||||||
return messages
|
|
||||||
|
|
||||||
def _get_first_duplicate_id(self, ids):
|
def _get_first_duplicate_id(self, ids):
|
||||||
unique_ids = set()
|
unique_ids = set()
|
||||||
|
@ -299,10 +299,23 @@ class SQSResponse(BaseResponse):
|
|||||||
if entries == {}:
|
if entries == {}:
|
||||||
raise EmptyBatchRequest()
|
raise EmptyBatchRequest()
|
||||||
|
|
||||||
messages = self.sqs_backend.send_message_batch(queue_name, entries)
|
messages, failedInvalidDelay = self.sqs_backend.send_message_batch(
|
||||||
|
queue_name, entries
|
||||||
|
)
|
||||||
|
|
||||||
|
errors = []
|
||||||
|
for entry in failedInvalidDelay:
|
||||||
|
errors.append(
|
||||||
|
{
|
||||||
|
"Id": entry["Id"],
|
||||||
|
"SenderFault": "true",
|
||||||
|
"Code": "InvalidParameterValue",
|
||||||
|
"Message": "Value 1800 for parameter DelaySeconds is invalid. Reason: DelaySeconds must be >= 0 and <= 900.",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
template = self.response_template(SEND_MESSAGE_BATCH_RESPONSE)
|
template = self.response_template(SEND_MESSAGE_BATCH_RESPONSE)
|
||||||
return template.render(messages=messages)
|
return template.render(messages=messages, errors=errors)
|
||||||
|
|
||||||
def delete_message(self):
|
def delete_message(self):
|
||||||
queue_name = self._get_queue_name()
|
queue_name = self._get_queue_name()
|
||||||
@ -650,6 +663,7 @@ RECEIVE_MESSAGE_RESPONSE = """<ReceiveMessageResponse>
|
|||||||
</ResponseMetadata>
|
</ResponseMetadata>
|
||||||
</ReceiveMessageResponse>"""
|
</ReceiveMessageResponse>"""
|
||||||
|
|
||||||
|
# UPDATED Line 681-688
|
||||||
SEND_MESSAGE_BATCH_RESPONSE = """<SendMessageBatchResponse>
|
SEND_MESSAGE_BATCH_RESPONSE = """<SendMessageBatchResponse>
|
||||||
<SendMessageBatchResult>
|
<SendMessageBatchResult>
|
||||||
{% for message in messages %}
|
{% for message in messages %}
|
||||||
@ -662,6 +676,14 @@ SEND_MESSAGE_BATCH_RESPONSE = """<SendMessageBatchResponse>
|
|||||||
{% endif %}
|
{% endif %}
|
||||||
</SendMessageBatchResultEntry>
|
</SendMessageBatchResultEntry>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
|
{% for error_dict in errors %}
|
||||||
|
<BatchResultErrorEntry>
|
||||||
|
<Id>{{ error_dict['Id'] }}</Id>
|
||||||
|
<Code>{{ error_dict['Code'] }}</Code>
|
||||||
|
<Message>{{ error_dict['Message'] }}</Message>
|
||||||
|
<SenderFault>{{ error_dict['SenderFault'] }}</SenderFault>
|
||||||
|
</BatchResultErrorEntry>
|
||||||
|
{% endfor %}
|
||||||
</SendMessageBatchResult>
|
</SendMessageBatchResult>
|
||||||
<ResponseMetadata>
|
<ResponseMetadata>
|
||||||
<RequestId></RequestId>
|
<RequestId></RequestId>
|
||||||
|
@ -3115,3 +3115,68 @@ def test_message_has_windows_return():
|
|||||||
messages = queue.receive_messages()
|
messages = queue.receive_messages()
|
||||||
messages.should.have.length_of(1)
|
messages.should.have.length_of(1)
|
||||||
messages[0].body.should.match(message)
|
messages[0].body.should.match(message)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_message_delay_is_more_than_15_minutes():
|
||||||
|
client = boto3.client("sqs", region_name="us-east-1")
|
||||||
|
response = client.create_queue(
|
||||||
|
QueueName=f"{str(uuid4())[0:6]}.fifo", Attributes={"FifoQueue": "true"}
|
||||||
|
)
|
||||||
|
queue_url = response["QueueUrl"]
|
||||||
|
|
||||||
|
response = client.send_message_batch(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
Entries=[
|
||||||
|
{
|
||||||
|
"Id": "id_1",
|
||||||
|
"MessageBody": "body_1",
|
||||||
|
"DelaySeconds": 3,
|
||||||
|
"MessageAttributes": {
|
||||||
|
"attribute_name_1": {
|
||||||
|
"StringValue": "attribute_value_1",
|
||||||
|
"DataType": "String",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"MessageGroupId": "message_group_id_1",
|
||||||
|
"MessageDeduplicationId": "message_deduplication_id_1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Id": "id_2",
|
||||||
|
"MessageBody": "body_2",
|
||||||
|
"DelaySeconds": 1800,
|
||||||
|
"MessageAttributes": {
|
||||||
|
"attribute_name_2": {"StringValue": "123", "DataType": "Number"}
|
||||||
|
},
|
||||||
|
"MessageGroupId": "message_group_id_2",
|
||||||
|
"MessageDeduplicationId": "message_deduplication_id_2",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
sorted([entry["Id"] for entry in response["Successful"]]).should.equal(["id_1"])
|
||||||
|
|
||||||
|
sorted([entry["Id"] for entry in response["Failed"]]).should.equal(["id_2"])
|
||||||
|
|
||||||
|
# print(response)
|
||||||
|
|
||||||
|
time.sleep(4)
|
||||||
|
|
||||||
|
response = client.receive_message(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
MaxNumberOfMessages=10,
|
||||||
|
MessageAttributeNames=["attribute_name_1", "attribute_name_2"],
|
||||||
|
AttributeNames=["MessageDeduplicationId", "MessageGroupId"],
|
||||||
|
)
|
||||||
|
|
||||||
|
response["Messages"].should.have.length_of(1)
|
||||||
|
response["Messages"][0]["Body"].should.equal("body_1")
|
||||||
|
response["Messages"][0]["MessageAttributes"].should.equal(
|
||||||
|
{"attribute_name_1": {"StringValue": "attribute_value_1", "DataType": "String"}}
|
||||||
|
)
|
||||||
|
response["Messages"][0]["Attributes"]["MessageGroupId"].should.equal(
|
||||||
|
"message_group_id_1"
|
||||||
|
)
|
||||||
|
response["Messages"][0]["Attributes"]["MessageDeduplicationId"].should.equal(
|
||||||
|
"message_deduplication_id_1"
|
||||||
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user