diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index 1e7c84424..cf9f40f80 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -6083,7 +6083,7 @@ - [X] untag_resource ## sqs -65% implemented +85% implemented - [X] add_permission - [X] change_message_visibility - [ ] change_message_visibility_batch @@ -6091,16 +6091,16 @@ - [X] delete_message - [ ] delete_message_batch - [X] delete_queue -- [ ] get_queue_attributes -- [ ] get_queue_url +- [X] get_queue_attributes +- [X] get_queue_url - [X] list_dead_letter_source_queues -- [ ] list_queue_tags +- [X] list_queue_tags - [X] list_queues - [X] purge_queue - [ ] receive_message - [X] remove_permission - [X] send_message -- [ ] send_message_batch +- [X] send_message_batch - [X] set_queue_attributes - [X] tag_queue - [X] untag_queue diff --git a/moto/core/responses.py b/moto/core/responses.py index b60f10a20..213fa278c 100644 --- a/moto/core/responses.py +++ b/moto/core/responses.py @@ -454,7 +454,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin): index = 1 while True: value_dict = self._get_multi_param_helper(prefix + str(index)) - if not value_dict: + if not value_dict and value_dict != '': break values.append(value_dict) diff --git a/moto/sqs/exceptions.py b/moto/sqs/exceptions.py index 02f28b2d2..68c4abaae 100644 --- a/moto/sqs/exceptions.py +++ b/moto/sqs/exceptions.py @@ -7,9 +7,14 @@ class MessageNotInflight(Exception): status_code = 400 -class ReceiptHandleIsInvalid(Exception): - description = "The receipt handle provided is not valid." - status_code = 400 +class ReceiptHandleIsInvalid(RESTError): + code = 400 + + def __init__(self): + super(ReceiptHandleIsInvalid, self).__init__( + 'ReceiptHandleIsInvalid', + 'The input receipt handle is invalid.' + ) class MessageAttributesInvalid(Exception): @@ -33,3 +38,66 @@ class QueueAlreadyExists(RESTError): def __init__(self, message): super(QueueAlreadyExists, self).__init__( "QueueAlreadyExists", message) + + +class EmptyBatchRequest(RESTError): + code = 400 + + def __init__(self): + super(EmptyBatchRequest, self).__init__( + 'EmptyBatchRequest', + 'There should be at least one SendMessageBatchRequestEntry in the request.' + ) + + +class InvalidBatchEntryId(RESTError): + code = 400 + + def __init__(self): + super(InvalidBatchEntryId, self).__init__( + 'InvalidBatchEntryId', + 'A batch entry id can only contain alphanumeric characters, ' + 'hyphens and underscores. It can be at most 80 letters long.' + ) + + +class BatchRequestTooLong(RESTError): + code = 400 + + def __init__(self, length): + super(BatchRequestTooLong, self).__init__( + 'BatchRequestTooLong', + 'Batch requests cannot be longer than 262144 bytes. ' + 'You have sent {} bytes.'.format(length) + ) + + +class BatchEntryIdsNotDistinct(RESTError): + code = 400 + + def __init__(self, entry_id): + super(BatchEntryIdsNotDistinct, self).__init__( + 'BatchEntryIdsNotDistinct', + 'Id {} repeated.'.format(entry_id) + ) + + +class TooManyEntriesInBatchRequest(RESTError): + code = 400 + + def __init__(self, number): + super(TooManyEntriesInBatchRequest, self).__init__( + 'TooManyEntriesInBatchRequest', + 'Maximum number of entries per request are 10. ' + 'You have sent {}.'.format(number) + ) + + +class InvalidAttributeName(RESTError): + code = 400 + + def __init__(self, attribute_name): + super(InvalidAttributeName, self).__init__( + 'InvalidAttributeName', + 'Unknown Attribute {}.'.format(attribute_name) + ) diff --git a/moto/sqs/models.py b/moto/sqs/models.py index eb237e437..8d02fe529 100644 --- a/moto/sqs/models.py +++ b/moto/sqs/models.py @@ -20,11 +20,18 @@ from .exceptions import ( QueueDoesNotExist, QueueAlreadyExists, ReceiptHandleIsInvalid, + InvalidBatchEntryId, + BatchRequestTooLong, + BatchEntryIdsNotDistinct, + TooManyEntriesInBatchRequest, + InvalidAttributeName ) DEFAULT_ACCOUNT_ID = 123456789012 DEFAULT_SENDER_ID = "AIDAIT2UOQQY3AUEKVGXU" +MAXIMUM_MESSAGE_LENGTH = 262144 # 256 KiB + TRANSPORT_TYPE_ENCODINGS = {'String': b'\x01', 'Binary': b'\x02', 'Number': b'\x01'} @@ -155,7 +162,7 @@ class Message(BaseModel): class Queue(BaseModel): - base_attributes = ['ApproximateNumberOfMessages', + BASE_ATTRIBUTES = ['ApproximateNumberOfMessages', 'ApproximateNumberOfMessagesDelayed', 'ApproximateNumberOfMessagesNotVisible', 'CreatedTimestamp', @@ -166,9 +173,9 @@ class Queue(BaseModel): 'QueueArn', 'ReceiveMessageWaitTimeSeconds', 'VisibilityTimeout'] - fifo_attributes = ['FifoQueue', + FIFO_ATTRIBUTES = ['FifoQueue', 'ContentBasedDeduplication'] - kms_attributes = ['KmsDataKeyReusePeriodSeconds', + KMS_ATTRIBUTES = ['KmsDataKeyReusePeriodSeconds', 'KmsMasterKeyId'] ALLOWED_PERMISSIONS = ('*', 'ChangeMessageVisibility', 'DeleteMessage', 'GetQueueAttributes', 'GetQueueUrl', @@ -185,8 +192,9 @@ class Queue(BaseModel): now = unix_time() self.created_timestamp = now - self.queue_arn = 'arn:aws:sqs:{0}:123456789012:{1}'.format(self.region, - self.name) + self.queue_arn = 'arn:aws:sqs:{0}:{1}:{2}'.format(self.region, + DEFAULT_ACCOUNT_ID, + self.name) self.dead_letter_queue = None self.lambda_event_source_mappings = {} @@ -330,17 +338,17 @@ class Queue(BaseModel): def attributes(self): result = {} - for attribute in self.base_attributes: + for attribute in self.BASE_ATTRIBUTES: attr = getattr(self, camelcase_to_underscores(attribute)) result[attribute] = attr if self.fifo_queue: - for attribute in self.fifo_attributes: + for attribute in self.FIFO_ATTRIBUTES: attr = getattr(self, camelcase_to_underscores(attribute)) result[attribute] = attr if self.kms_master_key_id: - for attribute in self.kms_attributes: + for attribute in self.KMS_ATTRIBUTES: attr = getattr(self, camelcase_to_underscores(attribute)) result[attribute] = attr @@ -460,6 +468,9 @@ class SQSBackend(BaseBackend): return queue + def get_queue_url(self, queue_name): + return self.get_queue(queue_name) + def list_queues(self, queue_name_prefix): re_str = '.*' if queue_name_prefix: @@ -482,6 +493,28 @@ class SQSBackend(BaseBackend): return self.queues.pop(queue_name) return False + def get_queue_attributes(self, queue_name, attribute_names): + queue = self.get_queue(queue_name) + + if not len(attribute_names): + attribute_names.append('All') + + valid_names = ['All'] + queue.BASE_ATTRIBUTES + queue.FIFO_ATTRIBUTES + queue.KMS_ATTRIBUTES + invalid_name = next((name for name in attribute_names if name not in valid_names), None) + + if invalid_name or invalid_name == '': + raise InvalidAttributeName(invalid_name) + + attributes = {} + + if 'All' in attribute_names: + attributes = queue.attributes + else: + for name in (name for name in attribute_names if name in queue.attributes): + attributes[name] = queue.attributes.get(name) + + return attributes + def set_queue_attributes(self, queue_name, attributes): queue = self.get_queue(queue_name) queue._set_attributes(attributes) @@ -516,6 +549,49 @@ class SQSBackend(BaseBackend): return message + def send_message_batch(self, queue_name, entries): + self.get_queue(queue_name) + + if any(not re.match(r'^[\w-]{1,80}$', entry['Id']) for entry in entries.values()): + raise InvalidBatchEntryId() + + body_length = next( + (len(entry['MessageBody']) for entry in entries.values() if len(entry['MessageBody']) > MAXIMUM_MESSAGE_LENGTH), + False + ) + if body_length: + raise BatchRequestTooLong(body_length) + + duplicate_id = self._get_first_duplicate_id([entry['Id'] for entry in entries.values()]) + if duplicate_id: + raise BatchEntryIdsNotDistinct(duplicate_id) + + if len(entries) > 10: + raise TooManyEntriesInBatchRequest(len(entries)) + + messages = [] + for index, entry in entries.items(): + # Loop through looking for messages + message = self.send_message( + queue_name, + entry['MessageBody'], + message_attributes=entry['MessageAttributes'], + delay_seconds=entry['DelaySeconds'] + ) + message.user_id = entry['Id'] + + messages.append(message) + + return messages + + def _get_first_duplicate_id(self, ids): + unique_ids = set() + for id in ids: + if id in unique_ids: + return id + unique_ids.add(id) + return None + def receive_messages(self, queue_name, count, wait_seconds_timeout, visibility_timeout): """ Attempt to retrieve visible messages from a queue. @@ -593,6 +669,10 @@ class SQSBackend(BaseBackend): def delete_message(self, queue_name, receipt_handle): queue = self.get_queue(queue_name) + + if not any(message.receipt_handle == receipt_handle for message in queue._messages): + raise ReceiptHandleIsInvalid() + new_messages = [] for message in queue._messages: # Only delete message if it is not visible and the reciept_handle @@ -677,6 +757,9 @@ class SQSBackend(BaseBackend): except KeyError: pass + def list_queue_tags(self, queue_name): + return self.get_queue(queue_name) + sqs_backends = {} for region in boto.sqs.regions(): diff --git a/moto/sqs/responses.py b/moto/sqs/responses.py index b6f717f3b..ad46df723 100644 --- a/moto/sqs/responses.py +++ b/moto/sqs/responses.py @@ -11,6 +11,8 @@ from .exceptions import ( MessageAttributesInvalid, MessageNotInflight, ReceiptHandleIsInvalid, + EmptyBatchRequest, + InvalidAttributeName ) MAXIMUM_VISIBILTY_TIMEOUT = 43200 @@ -89,13 +91,10 @@ class SQSResponse(BaseResponse): request_url = urlparse(self.uri) queue_name = self._get_param("QueueName") - queue = self.sqs_backend.get_queue(queue_name) + queue = self.sqs_backend.get_queue_url(queue_name) - if queue: - template = self.response_template(GET_QUEUE_URL_RESPONSE) - return template.render(queue=queue, request_url=request_url) - else: - return "", dict(status=404) + template = self.response_template(GET_QUEUE_URL_RESPONSE) + return template.render(queue_url=queue.url(request_url)) def list_queues(self): request_url = urlparse(self.uri) @@ -119,7 +118,7 @@ class SQSResponse(BaseResponse): receipt_handle=receipt_handle, visibility_timeout=visibility_timeout ) - except (ReceiptHandleIsInvalid, MessageNotInflight) as e: + except MessageNotInflight as e: return "Invalid request: {0}".format(e.description), dict(status=e.status_code) template = self.response_template(CHANGE_MESSAGE_VISIBILITY_RESPONSE) @@ -171,10 +170,15 @@ class SQSResponse(BaseResponse): def get_queue_attributes(self): queue_name = self._get_queue_name() - queue = self.sqs_backend.get_queue(queue_name) + if self.querystring.get('AttributeNames'): + raise InvalidAttributeName('') + + attribute_names = self._get_multi_param('AttributeName') + + attributes = self.sqs_backend.get_queue_attributes(queue_name, attribute_names) template = self.response_template(GET_QUEUE_ATTRIBUTES_RESPONSE) - return template.render(queue=queue) + return template.render(attributes=attributes) def set_queue_attributes(self): # TODO validate self.get_param('QueueUrl') @@ -237,72 +241,31 @@ class SQSResponse(BaseResponse): self.sqs_backend.get_queue(queue_name) if self.querystring.get('Entries'): - return self._error('AWS.SimpleQueueService.EmptyBatchRequest', - 'There should be at least one SendMessageBatchRequestEntry in the request.') + raise EmptyBatchRequest() entries = {} for key, value in self.querystring.items(): match = re.match(r'^SendMessageBatchRequestEntry\.(\d+)\.Id', key) if match: - entries[match.group(1)] = { + index = match.group(1) + + message_attributes = parse_message_attributes( + self.querystring, base='SendMessageBatchRequestEntry.{}.'.format(index)) + + entries[index] = { 'Id': value[0], 'MessageBody': self.querystring.get( - 'SendMessageBatchRequestEntry.{}.MessageBody'.format(match.group(1)))[0] + 'SendMessageBatchRequestEntry.{}.MessageBody'.format(index))[0], + 'DelaySeconds': self.querystring.get( + 'SendMessageBatchRequestEntry.{}.DelaySeconds'.format(index), [None])[0], + 'MessageAttributes': message_attributes } - if any(not re.match(r'^[\w-]{1,80}$', entry['Id']) for entry in entries.values()): - return self._error('AWS.SimpleQueueService.InvalidBatchEntryId', - 'A batch entry id can only contain alphanumeric characters, ' - 'hyphens and underscores. It can be at most 80 letters long.') - - body_length = next( - (len(entry['MessageBody']) for entry in entries.values() if len(entry['MessageBody']) > MAXIMUM_MESSAGE_LENGTH), - False - ) - if body_length: - return self._error('AWS.SimpleQueueService.BatchRequestTooLong', - 'Batch requests cannot be longer than 262144 bytes. ' - 'You have sent {} bytes.'.format(body_length)) - - duplicate_id = self._get_first_duplicate_id([entry['Id'] for entry in entries.values()]) - if duplicate_id: - return self._error('AWS.SimpleQueueService.BatchEntryIdsNotDistinct', - 'Id {} repeated.'.format(duplicate_id)) - - if len(entries) > 10: - return self._error('AWS.SimpleQueueService.TooManyEntriesInBatchRequest', - 'Maximum number of entries per request are 10. ' - 'You have sent 11.') - - messages = [] - for index, entry in entries.items(): - # Loop through looking for messages - delay_key = 'SendMessageBatchRequestEntry.{0}.DelaySeconds'.format( - index) - delay_seconds = self.querystring.get(delay_key, [None])[0] - message = self.sqs_backend.send_message( - queue_name, entry['MessageBody'], delay_seconds=delay_seconds) - message.user_id = entry['Id'] - - message_attributes = parse_message_attributes( - self.querystring, base='SendMessageBatchRequestEntry.{0}.'.format(index)) - if type(message_attributes) == tuple: - return message_attributes[0], message_attributes[1] - message.message_attributes = message_attributes - - messages.append(message) + messages = self.sqs_backend.send_message_batch(queue_name, entries) template = self.response_template(SEND_MESSAGE_BATCH_RESPONSE) return template.render(messages=messages) - def _get_first_duplicate_id(self, ids): - unique_ids = set() - for id in ids: - if id in unique_ids: - return id - unique_ids.add(id) - return None - def delete_message(self): queue_name = self._get_queue_name() receipt_handle = self.querystring.get("ReceiptHandle")[0] @@ -441,7 +404,7 @@ class SQSResponse(BaseResponse): def list_queue_tags(self): queue_name = self._get_queue_name() - queue = self.sqs_backend.get_queue(queue_name) + queue = self.sqs_backend.list_queue_tags(queue_name) template = self.response_template(LIST_QUEUE_TAGS_RESPONSE) return template.render(tags=queue.tags) @@ -458,7 +421,7 @@ CREATE_QUEUE_RESPONSE = """ GET_QUEUE_URL_RESPONSE = """ - {{ queue.url(request_url) }} + {{ queue_url }} @@ -484,7 +447,7 @@ DELETE_QUEUE_RESPONSE = """ GET_QUEUE_ATTRIBUTES_RESPONSE = """ - {% for key, value in queue.attributes.items() %} + {% for key, value in attributes.items() %} {{ key }} {{ value }} diff --git a/tests/test_sqs/test_sqs.py b/tests/test_sqs/test_sqs.py index 1ad2e1a80..a2111c9d7 100644 --- a/tests/test_sqs/test_sqs.py +++ b/tests/test_sqs/test_sqs.py @@ -5,6 +5,7 @@ import os import boto import boto3 import botocore.exceptions +import six from botocore.exceptions import ClientError from boto.exception import SQSError from boto.sqs.message import RawMessage, Message @@ -144,18 +145,40 @@ def test_create_queue_kms(): def test_create_queue_with_tags(): client = boto3.client('sqs', region_name='us-east-1') response = client.create_queue( - QueueName = 'test-queue-with-tags', - tags = { + QueueName='test-queue-with-tags', + tags={ 'tag_key_1': 'tag_value_1' } ) queue_url = response['QueueUrl'] - client.list_queue_tags(QueueUrl = queue_url)['Tags'].should.equal({ + client.list_queue_tags(QueueUrl=queue_url)['Tags'].should.equal({ 'tag_key_1': 'tag_value_1' }) +@mock_sqs +def test_get_queue_url(): + client = boto3.client('sqs', region_name='us-east-1') + client.create_queue(QueueName='test-queue') + + response = client.get_queue_url(QueueName='test-queue') + + response.should.have.key('QueueUrl').which.should.contain('test-queue') + + +@mock_sqs +def test_get_queue_url_errors(): + client = boto3.client('sqs', region_name='us-east-1') + + client.get_queue_url.when.called_with( + QueueName='non-existing-queue' + ).should.throw( + ClientError, + 'The specified queue does not exist for this wsdl version.' + ) + + @mock_sqs def test_get_nonexistent_queue(): sqs = boto3.resource('sqs', region_name='us-east-1') @@ -341,6 +364,98 @@ def test_delete_queue(): queue.delete() +@mock_sqs +def test_get_queue_attributes(): + client = boto3.client('sqs', region_name='us-east-1') + response = client.create_queue(QueueName='test-queue') + queue_url = response['QueueUrl'] + + response = client.get_queue_attributes(QueueUrl=queue_url) + + response['Attributes']['ApproximateNumberOfMessages'].should.equal('0') + response['Attributes']['ApproximateNumberOfMessagesDelayed'].should.equal('0') + response['Attributes']['ApproximateNumberOfMessagesNotVisible'].should.equal('0') + response['Attributes']['CreatedTimestamp'].should.be.a(six.string_types) + response['Attributes']['DelaySeconds'].should.equal('0') + response['Attributes']['LastModifiedTimestamp'].should.be.a(six.string_types) + response['Attributes']['MaximumMessageSize'].should.equal('65536') + response['Attributes']['MessageRetentionPeriod'].should.equal('345600') + response['Attributes']['QueueArn'].should.equal('arn:aws:sqs:us-east-1:123456789012:test-queue') + response['Attributes']['ReceiveMessageWaitTimeSeconds'].should.equal('0') + response['Attributes']['VisibilityTimeout'].should.equal('30') + + response = client.get_queue_attributes( + QueueUrl=queue_url, + AttributeNames=[ + 'ApproximateNumberOfMessages', + 'MaximumMessageSize', + 'QueueArn', + 'VisibilityTimeout' + ] + ) + + response['Attributes'].should.equal({ + 'ApproximateNumberOfMessages': '0', + 'MaximumMessageSize': '65536', + 'QueueArn': 'arn:aws:sqs:us-east-1:123456789012:test-queue', + 'VisibilityTimeout': '30' + }) + + # should not return any attributes, if it was not set before + response = client.get_queue_attributes( + QueueUrl=queue_url, + AttributeNames=[ + 'KmsMasterKeyId' + ] + ) + + response.should_not.have.key('Attributes') + + +@mock_sqs +def test_get_queue_attributes_errors(): + client = boto3.client('sqs', region_name='us-east-1') + response = client.create_queue(QueueName='test-queue') + queue_url = response['QueueUrl'] + + client.get_queue_attributes.when.called_with( + QueueUrl=queue_url + '-non-existing' + ).should.throw( + ClientError, + 'The specified queue does not exist for this wsdl version.' + ) + + client.get_queue_attributes.when.called_with( + QueueUrl=queue_url, + AttributeNames=[ + 'QueueArn', + 'not-existing', + 'VisibilityTimeout' + ] + ).should.throw( + ClientError, + 'Unknown Attribute not-existing.' + ) + + client.get_queue_attributes.when.called_with( + QueueUrl=queue_url, + AttributeNames=[ + '' + ] + ).should.throw( + ClientError, + 'Unknown Attribute .' + ) + + client.get_queue_attributes.when.called_with( + QueueUrl = queue_url, + AttributeNames = [] + ).should.throw( + ClientError, + 'Unknown Attribute .' + ) + + @mock_sqs def test_set_queue_attribute(): sqs = boto3.resource('sqs', region_name='us-east-1') @@ -883,10 +998,100 @@ def test_delete_message_after_visibility_timeout(): @mock_sqs -def test_send_message_batch_errors(): - client = boto3.client('sqs', region_name = 'us-east-1') +def test_delete_message_errors(): + client = boto3.client('sqs', region_name='us-east-1') + response = client.create_queue(QueueName='test-queue') + queue_url = response['QueueUrl'] + client.send_message( + QueueUrl=queue_url, + MessageBody='body' + ) + response = client.receive_message( + QueueUrl=queue_url + ) + receipt_handle = response['Messages'][0]['ReceiptHandle'] - response = client.create_queue(QueueName='test-queue-with-tags') + client.delete_message.when.called_with( + QueueUrl=queue_url + '-not-existing', + ReceiptHandle=receipt_handle + ).should.throw( + ClientError, + 'The specified queue does not exist for this wsdl version.' + ) + + client.delete_message.when.called_with( + QueueUrl=queue_url, + ReceiptHandle='not-existing' + ).should.throw( + ClientError, + 'The input receipt handle is invalid.' + ) + +@mock_sqs +def test_send_message_batch(): + client = boto3.client('sqs', region_name='us-east-1') + response = client.create_queue(QueueName='test-queue') + queue_url = response['QueueUrl'] + + response = client.send_message_batch( + QueueUrl=queue_url, + Entries=[ + { + 'Id': 'id_1', + 'MessageBody': 'body_1', + 'DelaySeconds': 0, + 'MessageAttributes': { + 'attribute_name_1': { + 'StringValue': 'attribute_value_1', + 'DataType': 'String' + } + } + }, + { + 'Id': 'id_2', + 'MessageBody': 'body_2', + 'DelaySeconds': 0, + 'MessageAttributes': { + 'attribute_name_2': { + 'StringValue': '123', + 'DataType': 'Number' + } + } + } + ] + ) + + sorted([entry['Id'] for entry in response['Successful']]).should.equal([ + 'id_1', + 'id_2' + ]) + + response = client.receive_message( + QueueUrl=queue_url, + MaxNumberOfMessages=10 + ) + + response['Messages'][0]['Body'].should.equal('body_1') + response['Messages'][0]['MessageAttributes'].should.equal({ + 'attribute_name_1': { + 'StringValue': 'attribute_value_1', + 'DataType': 'String' + } + }) + response['Messages'][1]['Body'].should.equal('body_2') + response['Messages'][1]['MessageAttributes'].should.equal({ + 'attribute_name_2': { + 'StringValue': '123', + 'DataType': 'Number' + } + }) + + +@mock_sqs +def test_send_message_batch_errors(): + client = boto3.client('sqs', region_name='us-east-1') + + response = client.create_queue(QueueName='test-queue') queue_url = response['QueueUrl'] client.send_message_batch.when.called_with(