Add error handling for sqs.send_message_batch
This commit is contained in:
parent
36a8a4ad00
commit
65c5502a62
@ -242,25 +242,59 @@ class SQSResponse(BaseResponse):
|
||||
|
||||
queue_name = self._get_queue_name()
|
||||
|
||||
messages = []
|
||||
for index in range(1, 11):
|
||||
# Loop through looking for messages
|
||||
message_key = 'SendMessageBatchRequestEntry.{0}.MessageBody'.format(
|
||||
index)
|
||||
message_body = self.querystring.get(message_key)
|
||||
if not message_body:
|
||||
# Found all messages
|
||||
break
|
||||
try:
|
||||
self.sqs_backend.get_queue(queue_name)
|
||||
except QueueDoesNotExist as e:
|
||||
return self._error('AWS.SimpleQueueService.NonExistentQueue',
|
||||
e.description)
|
||||
|
||||
message_user_id_key = 'SendMessageBatchRequestEntry.{0}.Id'.format(
|
||||
index)
|
||||
message_user_id = self.querystring.get(message_user_id_key)[0]
|
||||
if self.querystring.get('Entries'):
|
||||
return self._error('AWS.SimpleQueueService.EmptyBatchRequest',
|
||||
'There should be at least one SendMessageBatchRequestEntry in the request.')
|
||||
|
||||
entries = {}
|
||||
for key, value in self.querystring.items():
|
||||
match = re.match(r'^SendMessageBatchRequestEntry\.(\d+)\.Id', key)
|
||||
if match:
|
||||
entries[match.group(1)] = {
|
||||
'Id': value[0],
|
||||
'MessageBody': self.querystring.get(
|
||||
'SendMessageBatchRequestEntry.{}.MessageBody'.format(match.group(1)))[0]
|
||||
}
|
||||
|
||||
if any(not re.match(r'^[\w-]{1,80}$', entry['Id']) for entry in entries.values()):
|
||||
return self._error('AWS.SimpleQueueService.InvalidBatchEntryId',
|
||||
'A batch entry id can only contain alphanumeric characters, '
|
||||
'hyphens and underscores. It can be at most 80 letters long.')
|
||||
|
||||
body_length = next(
|
||||
(len(entry['MessageBody']) for entry in entries.values() if len(entry['MessageBody']) > 262144),
|
||||
False
|
||||
)
|
||||
if body_length:
|
||||
return self._error('AWS.SimpleQueueService.BatchRequestTooLong',
|
||||
'Batch requests cannot be longer than 262144 bytes. '
|
||||
'You have sent {} bytes.'.format(body_length))
|
||||
|
||||
duplicate_id = self._get_first_duplicate_id([entry['Id'] for entry in entries.values()])
|
||||
if duplicate_id:
|
||||
return self._error('AWS.SimpleQueueService.BatchEntryIdsNotDistinct',
|
||||
'Id {} repeated.'.format(duplicate_id))
|
||||
|
||||
if len(entries) > 10:
|
||||
return self._error('AWS.SimpleQueueService.TooManyEntriesInBatchRequest',
|
||||
'Maximum number of entries per request are 10. '
|
||||
'You have sent 11.')
|
||||
|
||||
messages = []
|
||||
for index, entry in entries.items():
|
||||
# Loop through looking for messages
|
||||
delay_key = 'SendMessageBatchRequestEntry.{0}.DelaySeconds'.format(
|
||||
index)
|
||||
delay_seconds = self.querystring.get(delay_key, [None])[0]
|
||||
message = self.sqs_backend.send_message(
|
||||
queue_name, message_body[0], delay_seconds=delay_seconds)
|
||||
message.user_id = message_user_id
|
||||
queue_name, entry['MessageBody'], delay_seconds=delay_seconds)
|
||||
message.user_id = entry['Id']
|
||||
|
||||
message_attributes = parse_message_attributes(
|
||||
self.querystring, base='SendMessageBatchRequestEntry.{0}.'.format(index))
|
||||
@ -273,6 +307,14 @@ class SQSResponse(BaseResponse):
|
||||
template = self.response_template(SEND_MESSAGE_BATCH_RESPONSE)
|
||||
return template.render(messages=messages)
|
||||
|
||||
def _get_first_duplicate_id(self, ids):
|
||||
unique_ids = set()
|
||||
for id in ids:
|
||||
if id in unique_ids:
|
||||
return id
|
||||
unique_ids.add(id)
|
||||
return None
|
||||
|
||||
def delete_message(self):
|
||||
queue_name = self._get_queue_name()
|
||||
receipt_handle = self.querystring.get("ReceiptHandle")[0]
|
||||
|
@ -882,6 +882,127 @@ def test_delete_message_after_visibility_timeout():
|
||||
assert new_queue.count() == 0
|
||||
|
||||
|
||||
@mock_sqs
|
||||
def test_send_message_batch_errors():
|
||||
client = boto3.client('sqs', region_name = 'us-east-1')
|
||||
|
||||
response = client.create_queue(QueueName='test-queue-with-tags')
|
||||
queue_url = response['QueueUrl']
|
||||
|
||||
client.send_message_batch.when.called_with(
|
||||
QueueUrl=queue_url + '-not-existing',
|
||||
Entries=[
|
||||
{
|
||||
'Id': 'id_1',
|
||||
'MessageBody': 'body_1'
|
||||
}
|
||||
]
|
||||
).should.throw(
|
||||
ClientError,
|
||||
'The specified queue does not exist for this wsdl version.'
|
||||
)
|
||||
|
||||
client.send_message_batch.when.called_with(
|
||||
QueueUrl=queue_url,
|
||||
Entries=[]
|
||||
).should.throw(
|
||||
ClientError,
|
||||
'There should be at least one SendMessageBatchRequestEntry in the request.'
|
||||
)
|
||||
|
||||
client.send_message_batch.when.called_with(
|
||||
QueueUrl=queue_url,
|
||||
Entries=[
|
||||
{
|
||||
'Id': '',
|
||||
'MessageBody': 'body_1'
|
||||
}
|
||||
]
|
||||
).should.throw(
|
||||
ClientError,
|
||||
'A batch entry id can only contain alphanumeric characters, '
|
||||
'hyphens and underscores. It can be at most 80 letters long.'
|
||||
)
|
||||
|
||||
client.send_message_batch.when.called_with(
|
||||
QueueUrl=queue_url,
|
||||
Entries=[
|
||||
{
|
||||
'Id': '.!@#$%^&*()+=',
|
||||
'MessageBody': 'body_1'
|
||||
}
|
||||
]
|
||||
).should.throw(
|
||||
ClientError,
|
||||
'A batch entry id can only contain alphanumeric characters, '
|
||||
'hyphens and underscores. It can be at most 80 letters long.'
|
||||
)
|
||||
|
||||
client.send_message_batch.when.called_with(
|
||||
QueueUrl=queue_url,
|
||||
Entries=[
|
||||
{
|
||||
'Id': 'i' * 81,
|
||||
'MessageBody': 'body_1'
|
||||
}
|
||||
]
|
||||
).should.throw(
|
||||
ClientError,
|
||||
'A batch entry id can only contain alphanumeric characters, '
|
||||
'hyphens and underscores. It can be at most 80 letters long.'
|
||||
)
|
||||
|
||||
client.send_message_batch.when.called_with(
|
||||
QueueUrl=queue_url,
|
||||
Entries=[
|
||||
{
|
||||
'Id': 'id_1',
|
||||
'MessageBody': 'b' * 262145
|
||||
}
|
||||
]
|
||||
).should.throw(
|
||||
ClientError,
|
||||
'Batch requests cannot be longer than 262144 bytes. '
|
||||
'You have sent 262145 bytes.'
|
||||
)
|
||||
|
||||
# only the first duplicated Id is reported
|
||||
client.send_message_batch.when.called_with(
|
||||
QueueUrl=queue_url,
|
||||
Entries=[
|
||||
{
|
||||
'Id': 'id_1',
|
||||
'MessageBody': 'body_1'
|
||||
},
|
||||
{
|
||||
'Id': 'id_2',
|
||||
'MessageBody': 'body_2'
|
||||
},
|
||||
{
|
||||
'Id': 'id_2',
|
||||
'MessageBody': 'body_2'
|
||||
},
|
||||
{
|
||||
'Id': 'id_1',
|
||||
'MessageBody': 'body_1'
|
||||
}
|
||||
]
|
||||
).should.throw(
|
||||
ClientError,
|
||||
'Id id_2 repeated.'
|
||||
)
|
||||
|
||||
entries = [{'Id': 'id_{}'.format(i), 'MessageBody': 'body_{}'.format(i)} for i in range(11)]
|
||||
client.send_message_batch.when.called_with(
|
||||
QueueUrl=queue_url,
|
||||
Entries=entries
|
||||
).should.throw(
|
||||
ClientError,
|
||||
'Maximum number of entries per request are 10. '
|
||||
'You have sent 11.'
|
||||
)
|
||||
|
||||
|
||||
@mock_sqs
|
||||
def test_batch_change_message_visibility():
|
||||
if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'true':
|
||||
|
Loading…
Reference in New Issue
Block a user