From 65c5502a62e81308d3907002f6f6b827a8f1c342 Mon Sep 17 00:00:00 2001 From: gruebel Date: Thu, 17 Oct 2019 22:09:14 +0200 Subject: [PATCH 1/5] Add error handling for sqs.send_message_batch --- moto/sqs/responses.py | 70 ++++++++++++++++----- tests/test_sqs/test_sqs.py | 121 +++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+), 14 deletions(-) diff --git a/moto/sqs/responses.py b/moto/sqs/responses.py index 747fa2363..0ddf5a314 100644 --- a/moto/sqs/responses.py +++ b/moto/sqs/responses.py @@ -242,25 +242,59 @@ class SQSResponse(BaseResponse): queue_name = self._get_queue_name() - messages = [] - for index in range(1, 11): - # Loop through looking for messages - message_key = 'SendMessageBatchRequestEntry.{0}.MessageBody'.format( - index) - message_body = self.querystring.get(message_key) - if not message_body: - # Found all messages - break + try: + self.sqs_backend.get_queue(queue_name) + except QueueDoesNotExist as e: + return self._error('AWS.SimpleQueueService.NonExistentQueue', + e.description) - message_user_id_key = 'SendMessageBatchRequestEntry.{0}.Id'.format( - index) - message_user_id = self.querystring.get(message_user_id_key)[0] + if self.querystring.get('Entries'): + return self._error('AWS.SimpleQueueService.EmptyBatchRequest', + 'There should be at least one SendMessageBatchRequestEntry in the request.') + + entries = {} + for key, value in self.querystring.items(): + match = re.match(r'^SendMessageBatchRequestEntry\.(\d+)\.Id', key) + if match: + entries[match.group(1)] = { + 'Id': value[0], + 'MessageBody': self.querystring.get( + 'SendMessageBatchRequestEntry.{}.MessageBody'.format(match.group(1)))[0] + } + + if any(not re.match(r'^[\w-]{1,80}$', entry['Id']) for entry in entries.values()): + return self._error('AWS.SimpleQueueService.InvalidBatchEntryId', + 'A batch entry id can only contain alphanumeric characters, ' + 'hyphens and underscores. It can be at most 80 letters long.') + + body_length = next( + (len(entry['MessageBody']) for entry in entries.values() if len(entry['MessageBody']) > 262144), + False + ) + if body_length: + return self._error('AWS.SimpleQueueService.BatchRequestTooLong', + 'Batch requests cannot be longer than 262144 bytes. ' + 'You have sent {} bytes.'.format(body_length)) + + duplicate_id = self._get_first_duplicate_id([entry['Id'] for entry in entries.values()]) + if duplicate_id: + return self._error('AWS.SimpleQueueService.BatchEntryIdsNotDistinct', + 'Id {} repeated.'.format(duplicate_id)) + + if len(entries) > 10: + return self._error('AWS.SimpleQueueService.TooManyEntriesInBatchRequest', + 'Maximum number of entries per request are 10. ' + 'You have sent 11.') + + messages = [] + for index, entry in entries.items(): + # Loop through looking for messages delay_key = 'SendMessageBatchRequestEntry.{0}.DelaySeconds'.format( index) delay_seconds = self.querystring.get(delay_key, [None])[0] message = self.sqs_backend.send_message( - queue_name, message_body[0], delay_seconds=delay_seconds) - message.user_id = message_user_id + queue_name, entry['MessageBody'], delay_seconds=delay_seconds) + message.user_id = entry['Id'] message_attributes = parse_message_attributes( self.querystring, base='SendMessageBatchRequestEntry.{0}.'.format(index)) @@ -273,6 +307,14 @@ class SQSResponse(BaseResponse): template = self.response_template(SEND_MESSAGE_BATCH_RESPONSE) return template.render(messages=messages) + def _get_first_duplicate_id(self, ids): + unique_ids = set() + for id in ids: + if id in unique_ids: + return id + unique_ids.add(id) + return None + def delete_message(self): queue_name = self._get_queue_name() receipt_handle = self.querystring.get("ReceiptHandle")[0] diff --git a/tests/test_sqs/test_sqs.py b/tests/test_sqs/test_sqs.py index bc9fa8e4d..4d06411e6 100644 --- a/tests/test_sqs/test_sqs.py +++ b/tests/test_sqs/test_sqs.py @@ -882,6 +882,127 @@ def test_delete_message_after_visibility_timeout(): assert new_queue.count() == 0 +@mock_sqs +def test_send_message_batch_errors(): + client = boto3.client('sqs', region_name = 'us-east-1') + + response = client.create_queue(QueueName='test-queue-with-tags') + queue_url = response['QueueUrl'] + + client.send_message_batch.when.called_with( + QueueUrl=queue_url + '-not-existing', + Entries=[ + { + 'Id': 'id_1', + 'MessageBody': 'body_1' + } + ] + ).should.throw( + ClientError, + 'The specified queue does not exist for this wsdl version.' + ) + + client.send_message_batch.when.called_with( + QueueUrl=queue_url, + Entries=[] + ).should.throw( + ClientError, + 'There should be at least one SendMessageBatchRequestEntry in the request.' + ) + + client.send_message_batch.when.called_with( + QueueUrl=queue_url, + Entries=[ + { + 'Id': '', + 'MessageBody': 'body_1' + } + ] + ).should.throw( + ClientError, + 'A batch entry id can only contain alphanumeric characters, ' + 'hyphens and underscores. It can be at most 80 letters long.' + ) + + client.send_message_batch.when.called_with( + QueueUrl=queue_url, + Entries=[ + { + 'Id': '.!@#$%^&*()+=', + 'MessageBody': 'body_1' + } + ] + ).should.throw( + ClientError, + 'A batch entry id can only contain alphanumeric characters, ' + 'hyphens and underscores. It can be at most 80 letters long.' + ) + + client.send_message_batch.when.called_with( + QueueUrl=queue_url, + Entries=[ + { + 'Id': 'i' * 81, + 'MessageBody': 'body_1' + } + ] + ).should.throw( + ClientError, + 'A batch entry id can only contain alphanumeric characters, ' + 'hyphens and underscores. It can be at most 80 letters long.' + ) + + client.send_message_batch.when.called_with( + QueueUrl=queue_url, + Entries=[ + { + 'Id': 'id_1', + 'MessageBody': 'b' * 262145 + } + ] + ).should.throw( + ClientError, + 'Batch requests cannot be longer than 262144 bytes. ' + 'You have sent 262145 bytes.' + ) + + # only the first duplicated Id is reported + client.send_message_batch.when.called_with( + QueueUrl=queue_url, + Entries=[ + { + 'Id': 'id_1', + 'MessageBody': 'body_1' + }, + { + 'Id': 'id_2', + 'MessageBody': 'body_2' + }, + { + 'Id': 'id_2', + 'MessageBody': 'body_2' + }, + { + 'Id': 'id_1', + 'MessageBody': 'body_1' + } + ] + ).should.throw( + ClientError, + 'Id id_2 repeated.' + ) + + entries = [{'Id': 'id_{}'.format(i), 'MessageBody': 'body_{}'.format(i)} for i in range(11)] + client.send_message_batch.when.called_with( + QueueUrl=queue_url, + Entries=entries + ).should.throw( + ClientError, + 'Maximum number of entries per request are 10. ' + 'You have sent 11.' + ) + + @mock_sqs def test_batch_change_message_visibility(): if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'true': From dbfb319defaba8de98ae0d305900fee2a14c3e84 Mon Sep 17 00:00:00 2001 From: gruebel Date: Thu, 17 Oct 2019 22:36:37 +0200 Subject: [PATCH 2/5] Add error handling for sqs.tag_queue --- moto/sqs/responses.py | 14 +++++++++++ tests/test_sqs/test_sqs.py | 49 +++++++++++++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/moto/sqs/responses.py b/moto/sqs/responses.py index 0ddf5a314..f84d293d3 100644 --- a/moto/sqs/responses.py +++ b/moto/sqs/responses.py @@ -439,6 +439,20 @@ class SQSResponse(BaseResponse): queue_name = self._get_queue_name() tags = self._get_map_prefix('Tag', key_end='.Key', value_end='.Value') + try: + self.sqs_backend.get_queue(queue_name) + except QueueDoesNotExist as e: + return self._error('AWS.SimpleQueueService.NonExistentQueue', + e.description) + + if len(tags) == 0: + return self._error('MissingParameter', + 'The request must contain the parameter Tags.') + + if len(tags) > 50: + return self._error('InvalidParameterValue', + 'Too many tags added for queue {}.'.format(queue_name)) + self.sqs_backend.tag_queue(queue_name, tags) template = self.response_template(TAG_QUEUE_RESPONSE) diff --git a/tests/test_sqs/test_sqs.py b/tests/test_sqs/test_sqs.py index 4d06411e6..f5d09ba4b 100644 --- a/tests/test_sqs/test_sqs.py +++ b/tests/test_sqs/test_sqs.py @@ -1108,6 +1108,53 @@ def test_tags(): }) +@mock_sqs +def test_tag_queue_errors(): + client = boto3.client('sqs', region_name='us-east-1') + + response = client.create_queue( + QueueName='test-queue-with-tags', + tags={ + 'tag_key_1': 'tag_value_X' + } + ) + queue_url = response['QueueUrl'] + + client.tag_queue.when.called_with( + QueueUrl=queue_url + '-not-existing', + Tags={ + 'tag_key_1': 'tag_value_1' + } + ).should.throw( + ClientError, + 'The specified queue does not exist for this wsdl version.' + ) + + client.tag_queue.when.called_with( + QueueUrl=queue_url, + Tags={} + ).should.throw( + ClientError, + 'The request must contain the parameter Tags.' + ) + + too_many_tags = {'tag_key_{}'.format(i): 'tag_value_{}'.format(i) for i in range(51)} + client.tag_queue.when.called_with( + QueueUrl=queue_url, + Tags=too_many_tags + ).should.throw( + ClientError, + 'Too many tags added for queue test-queue-with-tags.' + ) + + # when the request fails, the tags should not be updated + client.list_queue_tags(QueueUrl=queue_url)['Tags'].should.equal( + { + 'tag_key_1': 'tag_value_X' + } + ) + + @mock_sqs def test_untag_queue_errors(): client = boto3.client('sqs', region_name='us-east-1') @@ -1127,7 +1174,7 @@ def test_untag_queue_errors(): ] ).should.throw( ClientError, - "The specified queue does not exist for this wsdl version." + 'The specified queue does not exist for this wsdl version.' ) client.untag_queue.when.called_with( From 19a34ea57ab4455d1c37982e48018b16f1ceab9f Mon Sep 17 00:00:00 2001 From: gruebel Date: Thu, 17 Oct 2019 22:38:16 +0200 Subject: [PATCH 3/5] Add error handling for sqs.list_queue_tags --- moto/sqs/responses.py | 6 +++++- tests/test_sqs/test_sqs.py | 20 ++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/moto/sqs/responses.py b/moto/sqs/responses.py index f84d293d3..75f121b76 100644 --- a/moto/sqs/responses.py +++ b/moto/sqs/responses.py @@ -474,7 +474,11 @@ class SQSResponse(BaseResponse): def list_queue_tags(self): queue_name = self._get_queue_name() - queue = self.sqs_backend.get_queue(queue_name) + try: + queue = self.sqs_backend.get_queue(queue_name) + except QueueDoesNotExist as e: + return self._error('AWS.SimpleQueueService.NonExistentQueue', + e.description) template = self.response_template(LIST_QUEUE_TAGS_RESPONSE) return template.render(tags=queue.tags) diff --git a/tests/test_sqs/test_sqs.py b/tests/test_sqs/test_sqs.py index f5d09ba4b..1ad2e1a80 100644 --- a/tests/test_sqs/test_sqs.py +++ b/tests/test_sqs/test_sqs.py @@ -1108,6 +1108,26 @@ def test_tags(): }) +@mock_sqs +def test_list_queue_tags_errors(): + client = boto3.client('sqs', region_name='us-east-1') + + response = client.create_queue( + QueueName='test-queue-with-tags', + tags={ + 'tag_key_1': 'tag_value_X' + } + ) + queue_url = response['QueueUrl'] + + client.list_queue_tags.when.called_with( + QueueUrl=queue_url + '-not-existing', + ).should.throw( + ClientError, + 'The specified queue does not exist for this wsdl version.' + ) + + @mock_sqs def test_tag_queue_errors(): client = boto3.client('sqs', region_name='us-east-1') From 05dc97b468cdfbb05686966444e2ed9c4dff4e3d Mon Sep 17 00:00:00 2001 From: gruebel Date: Thu, 17 Oct 2019 22:41:46 +0200 Subject: [PATCH 4/5] Update implementation coverage --- IMPLEMENTATION_COVERAGE.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index 57f169b8a..060f9f7a7 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -5902,7 +5902,7 @@ - [x] untag_resource ## sqs -65% implemented +75% implemented - [X] add_permission - [X] change_message_visibility - [ ] change_message_visibility_batch @@ -5913,13 +5913,13 @@ - [ ] get_queue_attributes - [ ] get_queue_url - [X] list_dead_letter_source_queues -- [ ] list_queue_tags +- [x] list_queue_tags - [X] list_queues - [X] purge_queue - [ ] receive_message - [X] remove_permission - [X] send_message -- [ ] send_message_batch +- [x] send_message_batch - [X] set_queue_attributes - [X] tag_queue - [X] untag_queue From ed1c799bdcd2c6bd11d8a679014cbd21d0d36a12 Mon Sep 17 00:00:00 2001 From: gruebel Date: Fri, 18 Oct 2019 09:04:29 +0200 Subject: [PATCH 5/5] CR fix --- moto/sqs/exceptions.py | 9 ++++--- moto/sqs/models.py | 11 ++++++++- moto/sqs/responses.py | 53 +++++++----------------------------------- 3 files changed, 24 insertions(+), 49 deletions(-) diff --git a/moto/sqs/exceptions.py b/moto/sqs/exceptions.py index 5f1cc46b2..02f28b2d2 100644 --- a/moto/sqs/exceptions.py +++ b/moto/sqs/exceptions.py @@ -19,9 +19,12 @@ class MessageAttributesInvalid(Exception): self.description = description -class QueueDoesNotExist(Exception): - status_code = 404 - description = "The specified queue does not exist for this wsdl version." +class QueueDoesNotExist(RESTError): + code = 404 + + def __init__(self): + super(QueueDoesNotExist, self).__init__( + "QueueDoesNotExist", "The specified queue does not exist for this wsdl version.") class QueueAlreadyExists(RESTError): diff --git a/moto/sqs/models.py b/moto/sqs/models.py index 922da704f..eb237e437 100644 --- a/moto/sqs/models.py +++ b/moto/sqs/models.py @@ -654,12 +654,21 @@ class SQSBackend(BaseBackend): def tag_queue(self, queue_name, tags): queue = self.get_queue(queue_name) + + if not len(tags): + raise RESTError('MissingParameter', + 'The request must contain the parameter Tags.') + + if len(tags) > 50: + raise RESTError('InvalidParameterValue', + 'Too many tags added for queue {}.'.format(queue_name)) + queue.tags.update(tags) def untag_queue(self, queue_name, tag_keys): queue = self.get_queue(queue_name) - if len(tag_keys) == 0: + if not len(tag_keys): raise RESTError('InvalidParameterValue', 'Tag keys must be between 1 and 128 characters in length.') for key in tag_keys: diff --git a/moto/sqs/responses.py b/moto/sqs/responses.py index 75f121b76..b6f717f3b 100644 --- a/moto/sqs/responses.py +++ b/moto/sqs/responses.py @@ -10,7 +10,6 @@ from .models import sqs_backends from .exceptions import ( MessageAttributesInvalid, MessageNotInflight, - QueueDoesNotExist, ReceiptHandleIsInvalid, ) @@ -90,11 +89,7 @@ class SQSResponse(BaseResponse): request_url = urlparse(self.uri) queue_name = self._get_param("QueueName") - try: - queue = self.sqs_backend.get_queue(queue_name) - except QueueDoesNotExist as e: - return self._error('AWS.SimpleQueueService.NonExistentQueue', - e.description) + queue = self.sqs_backend.get_queue(queue_name) if queue: template = self.response_template(GET_QUEUE_URL_RESPONSE) @@ -175,11 +170,8 @@ class SQSResponse(BaseResponse): def get_queue_attributes(self): queue_name = self._get_queue_name() - try: - queue = self.sqs_backend.get_queue(queue_name) - except QueueDoesNotExist as e: - return self._error('AWS.SimpleQueueService.NonExistentQueue', - e.description) + + queue = self.sqs_backend.get_queue(queue_name) template = self.response_template(GET_QUEUE_ATTRIBUTES_RESPONSE) return template.render(queue=queue) @@ -242,11 +234,7 @@ class SQSResponse(BaseResponse): queue_name = self._get_queue_name() - try: - self.sqs_backend.get_queue(queue_name) - except QueueDoesNotExist as e: - return self._error('AWS.SimpleQueueService.NonExistentQueue', - e.description) + self.sqs_backend.get_queue(queue_name) if self.querystring.get('Entries'): return self._error('AWS.SimpleQueueService.EmptyBatchRequest', @@ -268,7 +256,7 @@ class SQSResponse(BaseResponse): 'hyphens and underscores. It can be at most 80 letters long.') body_length = next( - (len(entry['MessageBody']) for entry in entries.values() if len(entry['MessageBody']) > 262144), + (len(entry['MessageBody']) for entry in entries.values() if len(entry['MessageBody']) > MAXIMUM_MESSAGE_LENGTH), False ) if body_length: @@ -363,10 +351,7 @@ class SQSResponse(BaseResponse): def receive_message(self): queue_name = self._get_queue_name() - try: - queue = self.sqs_backend.get_queue(queue_name) - except QueueDoesNotExist as e: - return self._error('QueueDoesNotExist', e.description) + queue = self.sqs_backend.get_queue(queue_name) try: message_count = int(self.querystring.get("MaxNumberOfMessages")[0]) @@ -439,20 +424,6 @@ class SQSResponse(BaseResponse): queue_name = self._get_queue_name() tags = self._get_map_prefix('Tag', key_end='.Key', value_end='.Value') - try: - self.sqs_backend.get_queue(queue_name) - except QueueDoesNotExist as e: - return self._error('AWS.SimpleQueueService.NonExistentQueue', - e.description) - - if len(tags) == 0: - return self._error('MissingParameter', - 'The request must contain the parameter Tags.') - - if len(tags) > 50: - return self._error('InvalidParameterValue', - 'Too many tags added for queue {}.'.format(queue_name)) - self.sqs_backend.tag_queue(queue_name, tags) template = self.response_template(TAG_QUEUE_RESPONSE) @@ -462,11 +433,7 @@ class SQSResponse(BaseResponse): queue_name = self._get_queue_name() tag_keys = self._get_multi_param('TagKey') - try: - self.sqs_backend.untag_queue(queue_name, tag_keys) - except QueueDoesNotExist as e: - return self._error('AWS.SimpleQueueService.NonExistentQueue', - e.description) + self.sqs_backend.untag_queue(queue_name, tag_keys) template = self.response_template(UNTAG_QUEUE_RESPONSE) return template.render() @@ -474,11 +441,7 @@ class SQSResponse(BaseResponse): def list_queue_tags(self): queue_name = self._get_queue_name() - try: - queue = self.sqs_backend.get_queue(queue_name) - except QueueDoesNotExist as e: - return self._error('AWS.SimpleQueueService.NonExistentQueue', - e.description) + queue = self.sqs_backend.get_queue(queue_name) template = self.response_template(LIST_QUEUE_TAGS_RESPONSE) return template.render(tags=queue.tags)