Merge pull request #2488 from edekadigital/add-sqs-error-handling
Add sqs error handling
This commit is contained in:
commit
e469a64ed5
@ -5902,7 +5902,7 @@
|
|||||||
- [x] untag_resource
|
- [x] untag_resource
|
||||||
|
|
||||||
## sqs
|
## sqs
|
||||||
65% implemented
|
75% implemented
|
||||||
- [X] add_permission
|
- [X] add_permission
|
||||||
- [X] change_message_visibility
|
- [X] change_message_visibility
|
||||||
- [ ] change_message_visibility_batch
|
- [ ] change_message_visibility_batch
|
||||||
@ -5913,13 +5913,13 @@
|
|||||||
- [ ] get_queue_attributes
|
- [ ] get_queue_attributes
|
||||||
- [ ] get_queue_url
|
- [ ] get_queue_url
|
||||||
- [X] list_dead_letter_source_queues
|
- [X] list_dead_letter_source_queues
|
||||||
- [ ] list_queue_tags
|
- [x] list_queue_tags
|
||||||
- [X] list_queues
|
- [X] list_queues
|
||||||
- [X] purge_queue
|
- [X] purge_queue
|
||||||
- [ ] receive_message
|
- [ ] receive_message
|
||||||
- [X] remove_permission
|
- [X] remove_permission
|
||||||
- [X] send_message
|
- [X] send_message
|
||||||
- [ ] send_message_batch
|
- [x] send_message_batch
|
||||||
- [X] set_queue_attributes
|
- [X] set_queue_attributes
|
||||||
- [X] tag_queue
|
- [X] tag_queue
|
||||||
- [X] untag_queue
|
- [X] untag_queue
|
||||||
|
@ -19,9 +19,12 @@ class MessageAttributesInvalid(Exception):
|
|||||||
self.description = description
|
self.description = description
|
||||||
|
|
||||||
|
|
||||||
class QueueDoesNotExist(Exception):
|
class QueueDoesNotExist(RESTError):
|
||||||
status_code = 404
|
code = 404
|
||||||
description = "The specified queue does not exist for this wsdl version."
|
|
||||||
|
def __init__(self):
|
||||||
|
super(QueueDoesNotExist, self).__init__(
|
||||||
|
"QueueDoesNotExist", "The specified queue does not exist for this wsdl version.")
|
||||||
|
|
||||||
|
|
||||||
class QueueAlreadyExists(RESTError):
|
class QueueAlreadyExists(RESTError):
|
||||||
|
@ -654,12 +654,21 @@ class SQSBackend(BaseBackend):
|
|||||||
|
|
||||||
def tag_queue(self, queue_name, tags):
|
def tag_queue(self, queue_name, tags):
|
||||||
queue = self.get_queue(queue_name)
|
queue = self.get_queue(queue_name)
|
||||||
|
|
||||||
|
if not len(tags):
|
||||||
|
raise RESTError('MissingParameter',
|
||||||
|
'The request must contain the parameter Tags.')
|
||||||
|
|
||||||
|
if len(tags) > 50:
|
||||||
|
raise RESTError('InvalidParameterValue',
|
||||||
|
'Too many tags added for queue {}.'.format(queue_name))
|
||||||
|
|
||||||
queue.tags.update(tags)
|
queue.tags.update(tags)
|
||||||
|
|
||||||
def untag_queue(self, queue_name, tag_keys):
|
def untag_queue(self, queue_name, tag_keys):
|
||||||
queue = self.get_queue(queue_name)
|
queue = self.get_queue(queue_name)
|
||||||
|
|
||||||
if len(tag_keys) == 0:
|
if not len(tag_keys):
|
||||||
raise RESTError('InvalidParameterValue', 'Tag keys must be between 1 and 128 characters in length.')
|
raise RESTError('InvalidParameterValue', 'Tag keys must be between 1 and 128 characters in length.')
|
||||||
|
|
||||||
for key in tag_keys:
|
for key in tag_keys:
|
||||||
|
@ -10,7 +10,6 @@ from .models import sqs_backends
|
|||||||
from .exceptions import (
|
from .exceptions import (
|
||||||
MessageAttributesInvalid,
|
MessageAttributesInvalid,
|
||||||
MessageNotInflight,
|
MessageNotInflight,
|
||||||
QueueDoesNotExist,
|
|
||||||
ReceiptHandleIsInvalid,
|
ReceiptHandleIsInvalid,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -90,11 +89,7 @@ class SQSResponse(BaseResponse):
|
|||||||
request_url = urlparse(self.uri)
|
request_url = urlparse(self.uri)
|
||||||
queue_name = self._get_param("QueueName")
|
queue_name = self._get_param("QueueName")
|
||||||
|
|
||||||
try:
|
queue = self.sqs_backend.get_queue(queue_name)
|
||||||
queue = self.sqs_backend.get_queue(queue_name)
|
|
||||||
except QueueDoesNotExist as e:
|
|
||||||
return self._error('AWS.SimpleQueueService.NonExistentQueue',
|
|
||||||
e.description)
|
|
||||||
|
|
||||||
if queue:
|
if queue:
|
||||||
template = self.response_template(GET_QUEUE_URL_RESPONSE)
|
template = self.response_template(GET_QUEUE_URL_RESPONSE)
|
||||||
@ -175,11 +170,8 @@ class SQSResponse(BaseResponse):
|
|||||||
|
|
||||||
def get_queue_attributes(self):
|
def get_queue_attributes(self):
|
||||||
queue_name = self._get_queue_name()
|
queue_name = self._get_queue_name()
|
||||||
try:
|
|
||||||
queue = self.sqs_backend.get_queue(queue_name)
|
queue = self.sqs_backend.get_queue(queue_name)
|
||||||
except QueueDoesNotExist as e:
|
|
||||||
return self._error('AWS.SimpleQueueService.NonExistentQueue',
|
|
||||||
e.description)
|
|
||||||
|
|
||||||
template = self.response_template(GET_QUEUE_ATTRIBUTES_RESPONSE)
|
template = self.response_template(GET_QUEUE_ATTRIBUTES_RESPONSE)
|
||||||
return template.render(queue=queue)
|
return template.render(queue=queue)
|
||||||
@ -242,25 +234,55 @@ class SQSResponse(BaseResponse):
|
|||||||
|
|
||||||
queue_name = self._get_queue_name()
|
queue_name = self._get_queue_name()
|
||||||
|
|
||||||
messages = []
|
self.sqs_backend.get_queue(queue_name)
|
||||||
for index in range(1, 11):
|
|
||||||
# Loop through looking for messages
|
|
||||||
message_key = 'SendMessageBatchRequestEntry.{0}.MessageBody'.format(
|
|
||||||
index)
|
|
||||||
message_body = self.querystring.get(message_key)
|
|
||||||
if not message_body:
|
|
||||||
# Found all messages
|
|
||||||
break
|
|
||||||
|
|
||||||
message_user_id_key = 'SendMessageBatchRequestEntry.{0}.Id'.format(
|
if self.querystring.get('Entries'):
|
||||||
index)
|
return self._error('AWS.SimpleQueueService.EmptyBatchRequest',
|
||||||
message_user_id = self.querystring.get(message_user_id_key)[0]
|
'There should be at least one SendMessageBatchRequestEntry in the request.')
|
||||||
|
|
||||||
|
entries = {}
|
||||||
|
for key, value in self.querystring.items():
|
||||||
|
match = re.match(r'^SendMessageBatchRequestEntry\.(\d+)\.Id', key)
|
||||||
|
if match:
|
||||||
|
entries[match.group(1)] = {
|
||||||
|
'Id': value[0],
|
||||||
|
'MessageBody': self.querystring.get(
|
||||||
|
'SendMessageBatchRequestEntry.{}.MessageBody'.format(match.group(1)))[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
if any(not re.match(r'^[\w-]{1,80}$', entry['Id']) for entry in entries.values()):
|
||||||
|
return self._error('AWS.SimpleQueueService.InvalidBatchEntryId',
|
||||||
|
'A batch entry id can only contain alphanumeric characters, '
|
||||||
|
'hyphens and underscores. It can be at most 80 letters long.')
|
||||||
|
|
||||||
|
body_length = next(
|
||||||
|
(len(entry['MessageBody']) for entry in entries.values() if len(entry['MessageBody']) > MAXIMUM_MESSAGE_LENGTH),
|
||||||
|
False
|
||||||
|
)
|
||||||
|
if body_length:
|
||||||
|
return self._error('AWS.SimpleQueueService.BatchRequestTooLong',
|
||||||
|
'Batch requests cannot be longer than 262144 bytes. '
|
||||||
|
'You have sent {} bytes.'.format(body_length))
|
||||||
|
|
||||||
|
duplicate_id = self._get_first_duplicate_id([entry['Id'] for entry in entries.values()])
|
||||||
|
if duplicate_id:
|
||||||
|
return self._error('AWS.SimpleQueueService.BatchEntryIdsNotDistinct',
|
||||||
|
'Id {} repeated.'.format(duplicate_id))
|
||||||
|
|
||||||
|
if len(entries) > 10:
|
||||||
|
return self._error('AWS.SimpleQueueService.TooManyEntriesInBatchRequest',
|
||||||
|
'Maximum number of entries per request are 10. '
|
||||||
|
'You have sent 11.')
|
||||||
|
|
||||||
|
messages = []
|
||||||
|
for index, entry in entries.items():
|
||||||
|
# Loop through looking for messages
|
||||||
delay_key = 'SendMessageBatchRequestEntry.{0}.DelaySeconds'.format(
|
delay_key = 'SendMessageBatchRequestEntry.{0}.DelaySeconds'.format(
|
||||||
index)
|
index)
|
||||||
delay_seconds = self.querystring.get(delay_key, [None])[0]
|
delay_seconds = self.querystring.get(delay_key, [None])[0]
|
||||||
message = self.sqs_backend.send_message(
|
message = self.sqs_backend.send_message(
|
||||||
queue_name, message_body[0], delay_seconds=delay_seconds)
|
queue_name, entry['MessageBody'], delay_seconds=delay_seconds)
|
||||||
message.user_id = message_user_id
|
message.user_id = entry['Id']
|
||||||
|
|
||||||
message_attributes = parse_message_attributes(
|
message_attributes = parse_message_attributes(
|
||||||
self.querystring, base='SendMessageBatchRequestEntry.{0}.'.format(index))
|
self.querystring, base='SendMessageBatchRequestEntry.{0}.'.format(index))
|
||||||
@ -273,6 +295,14 @@ class SQSResponse(BaseResponse):
|
|||||||
template = self.response_template(SEND_MESSAGE_BATCH_RESPONSE)
|
template = self.response_template(SEND_MESSAGE_BATCH_RESPONSE)
|
||||||
return template.render(messages=messages)
|
return template.render(messages=messages)
|
||||||
|
|
||||||
|
def _get_first_duplicate_id(self, ids):
|
||||||
|
unique_ids = set()
|
||||||
|
for id in ids:
|
||||||
|
if id in unique_ids:
|
||||||
|
return id
|
||||||
|
unique_ids.add(id)
|
||||||
|
return None
|
||||||
|
|
||||||
def delete_message(self):
|
def delete_message(self):
|
||||||
queue_name = self._get_queue_name()
|
queue_name = self._get_queue_name()
|
||||||
receipt_handle = self.querystring.get("ReceiptHandle")[0]
|
receipt_handle = self.querystring.get("ReceiptHandle")[0]
|
||||||
@ -321,10 +351,7 @@ class SQSResponse(BaseResponse):
|
|||||||
def receive_message(self):
|
def receive_message(self):
|
||||||
queue_name = self._get_queue_name()
|
queue_name = self._get_queue_name()
|
||||||
|
|
||||||
try:
|
queue = self.sqs_backend.get_queue(queue_name)
|
||||||
queue = self.sqs_backend.get_queue(queue_name)
|
|
||||||
except QueueDoesNotExist as e:
|
|
||||||
return self._error('QueueDoesNotExist', e.description)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
message_count = int(self.querystring.get("MaxNumberOfMessages")[0])
|
message_count = int(self.querystring.get("MaxNumberOfMessages")[0])
|
||||||
@ -406,11 +433,7 @@ class SQSResponse(BaseResponse):
|
|||||||
queue_name = self._get_queue_name()
|
queue_name = self._get_queue_name()
|
||||||
tag_keys = self._get_multi_param('TagKey')
|
tag_keys = self._get_multi_param('TagKey')
|
||||||
|
|
||||||
try:
|
self.sqs_backend.untag_queue(queue_name, tag_keys)
|
||||||
self.sqs_backend.untag_queue(queue_name, tag_keys)
|
|
||||||
except QueueDoesNotExist as e:
|
|
||||||
return self._error('AWS.SimpleQueueService.NonExistentQueue',
|
|
||||||
e.description)
|
|
||||||
|
|
||||||
template = self.response_template(UNTAG_QUEUE_RESPONSE)
|
template = self.response_template(UNTAG_QUEUE_RESPONSE)
|
||||||
return template.render()
|
return template.render()
|
||||||
|
@ -882,6 +882,127 @@ def test_delete_message_after_visibility_timeout():
|
|||||||
assert new_queue.count() == 0
|
assert new_queue.count() == 0
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_send_message_batch_errors():
|
||||||
|
client = boto3.client('sqs', region_name = 'us-east-1')
|
||||||
|
|
||||||
|
response = client.create_queue(QueueName='test-queue-with-tags')
|
||||||
|
queue_url = response['QueueUrl']
|
||||||
|
|
||||||
|
client.send_message_batch.when.called_with(
|
||||||
|
QueueUrl=queue_url + '-not-existing',
|
||||||
|
Entries=[
|
||||||
|
{
|
||||||
|
'Id': 'id_1',
|
||||||
|
'MessageBody': 'body_1'
|
||||||
|
}
|
||||||
|
]
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'The specified queue does not exist for this wsdl version.'
|
||||||
|
)
|
||||||
|
|
||||||
|
client.send_message_batch.when.called_with(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
Entries=[]
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'There should be at least one SendMessageBatchRequestEntry in the request.'
|
||||||
|
)
|
||||||
|
|
||||||
|
client.send_message_batch.when.called_with(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
Entries=[
|
||||||
|
{
|
||||||
|
'Id': '',
|
||||||
|
'MessageBody': 'body_1'
|
||||||
|
}
|
||||||
|
]
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'A batch entry id can only contain alphanumeric characters, '
|
||||||
|
'hyphens and underscores. It can be at most 80 letters long.'
|
||||||
|
)
|
||||||
|
|
||||||
|
client.send_message_batch.when.called_with(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
Entries=[
|
||||||
|
{
|
||||||
|
'Id': '.!@#$%^&*()+=',
|
||||||
|
'MessageBody': 'body_1'
|
||||||
|
}
|
||||||
|
]
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'A batch entry id can only contain alphanumeric characters, '
|
||||||
|
'hyphens and underscores. It can be at most 80 letters long.'
|
||||||
|
)
|
||||||
|
|
||||||
|
client.send_message_batch.when.called_with(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
Entries=[
|
||||||
|
{
|
||||||
|
'Id': 'i' * 81,
|
||||||
|
'MessageBody': 'body_1'
|
||||||
|
}
|
||||||
|
]
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'A batch entry id can only contain alphanumeric characters, '
|
||||||
|
'hyphens and underscores. It can be at most 80 letters long.'
|
||||||
|
)
|
||||||
|
|
||||||
|
client.send_message_batch.when.called_with(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
Entries=[
|
||||||
|
{
|
||||||
|
'Id': 'id_1',
|
||||||
|
'MessageBody': 'b' * 262145
|
||||||
|
}
|
||||||
|
]
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'Batch requests cannot be longer than 262144 bytes. '
|
||||||
|
'You have sent 262145 bytes.'
|
||||||
|
)
|
||||||
|
|
||||||
|
# only the first duplicated Id is reported
|
||||||
|
client.send_message_batch.when.called_with(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
Entries=[
|
||||||
|
{
|
||||||
|
'Id': 'id_1',
|
||||||
|
'MessageBody': 'body_1'
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'Id': 'id_2',
|
||||||
|
'MessageBody': 'body_2'
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'Id': 'id_2',
|
||||||
|
'MessageBody': 'body_2'
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'Id': 'id_1',
|
||||||
|
'MessageBody': 'body_1'
|
||||||
|
}
|
||||||
|
]
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'Id id_2 repeated.'
|
||||||
|
)
|
||||||
|
|
||||||
|
entries = [{'Id': 'id_{}'.format(i), 'MessageBody': 'body_{}'.format(i)} for i in range(11)]
|
||||||
|
client.send_message_batch.when.called_with(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
Entries=entries
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'Maximum number of entries per request are 10. '
|
||||||
|
'You have sent 11.'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@mock_sqs
|
@mock_sqs
|
||||||
def test_batch_change_message_visibility():
|
def test_batch_change_message_visibility():
|
||||||
if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'true':
|
if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'true':
|
||||||
@ -987,6 +1108,73 @@ def test_tags():
|
|||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_list_queue_tags_errors():
|
||||||
|
client = boto3.client('sqs', region_name='us-east-1')
|
||||||
|
|
||||||
|
response = client.create_queue(
|
||||||
|
QueueName='test-queue-with-tags',
|
||||||
|
tags={
|
||||||
|
'tag_key_1': 'tag_value_X'
|
||||||
|
}
|
||||||
|
)
|
||||||
|
queue_url = response['QueueUrl']
|
||||||
|
|
||||||
|
client.list_queue_tags.when.called_with(
|
||||||
|
QueueUrl=queue_url + '-not-existing',
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'The specified queue does not exist for this wsdl version.'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_tag_queue_errors():
|
||||||
|
client = boto3.client('sqs', region_name='us-east-1')
|
||||||
|
|
||||||
|
response = client.create_queue(
|
||||||
|
QueueName='test-queue-with-tags',
|
||||||
|
tags={
|
||||||
|
'tag_key_1': 'tag_value_X'
|
||||||
|
}
|
||||||
|
)
|
||||||
|
queue_url = response['QueueUrl']
|
||||||
|
|
||||||
|
client.tag_queue.when.called_with(
|
||||||
|
QueueUrl=queue_url + '-not-existing',
|
||||||
|
Tags={
|
||||||
|
'tag_key_1': 'tag_value_1'
|
||||||
|
}
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'The specified queue does not exist for this wsdl version.'
|
||||||
|
)
|
||||||
|
|
||||||
|
client.tag_queue.when.called_with(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
Tags={}
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'The request must contain the parameter Tags.'
|
||||||
|
)
|
||||||
|
|
||||||
|
too_many_tags = {'tag_key_{}'.format(i): 'tag_value_{}'.format(i) for i in range(51)}
|
||||||
|
client.tag_queue.when.called_with(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
Tags=too_many_tags
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'Too many tags added for queue test-queue-with-tags.'
|
||||||
|
)
|
||||||
|
|
||||||
|
# when the request fails, the tags should not be updated
|
||||||
|
client.list_queue_tags(QueueUrl=queue_url)['Tags'].should.equal(
|
||||||
|
{
|
||||||
|
'tag_key_1': 'tag_value_X'
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@mock_sqs
|
@mock_sqs
|
||||||
def test_untag_queue_errors():
|
def test_untag_queue_errors():
|
||||||
client = boto3.client('sqs', region_name='us-east-1')
|
client = boto3.client('sqs', region_name='us-east-1')
|
||||||
@ -1006,7 +1194,7 @@ def test_untag_queue_errors():
|
|||||||
]
|
]
|
||||||
).should.throw(
|
).should.throw(
|
||||||
ClientError,
|
ClientError,
|
||||||
"The specified queue does not exist for this wsdl version."
|
'The specified queue does not exist for this wsdl version.'
|
||||||
)
|
)
|
||||||
|
|
||||||
client.untag_queue.when.called_with(
|
client.untag_queue.when.called_with(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user