from __future__ import unicode_literals import re from moto.core.responses import BaseResponse from moto.core.utils import amz_crc32, amzn_request_id from six.moves.urllib.parse import urlparse from .exceptions import ( EmptyBatchRequest, InvalidAttributeName, MessageAttributesInvalid, MessageNotInflight, ReceiptHandleIsInvalid, ) from .models import sqs_backends from .utils import parse_message_attributes MAXIMUM_VISIBILTY_TIMEOUT = 43200 MAXIMUM_MESSAGE_LENGTH = 262144 # 256 KiB DEFAULT_RECEIVED_MESSAGES = 1 class SQSResponse(BaseResponse): region_regex = re.compile(r"://(.+?)\.queue\.amazonaws\.com") @property def sqs_backend(self): return sqs_backends[self.region] @property def attribute(self): if not hasattr(self, "_attribute"): self._attribute = self._get_map_prefix( "Attribute", key_end=".Name", value_end=".Value" ) return self._attribute @property def tags(self): if not hasattr(self, "_tags"): self._tags = self._get_map_prefix("Tag", key_end=".Key", value_end=".Value") return self._tags def _get_queue_name(self): try: queue_name = self.querystring.get("QueueUrl")[0].split("/")[-1] except TypeError: # Fallback to reading from the URL queue_name = self.path.split("/")[-1] return queue_name def _get_validated_visibility_timeout(self, timeout=None): """ :raises ValueError: If specified visibility timeout exceeds MAXIMUM_VISIBILTY_TIMEOUT :raises TypeError: If visibility timeout was not specified """ if timeout is not None: visibility_timeout = int(timeout) else: visibility_timeout = int(self.querystring.get("VisibilityTimeout")[0]) if visibility_timeout > MAXIMUM_VISIBILTY_TIMEOUT: raise ValueError return visibility_timeout @amz_crc32 # crc last as request_id can edit XML @amzn_request_id def call_action(self): status_code, headers, body = super(SQSResponse, self).call_action() if status_code == 404: return 404, headers, ERROR_INEXISTENT_QUEUE return status_code, headers, body def _error(self, code, message, status=400): template = self.response_template(ERROR_TEMPLATE) return template.render(code=code, message=message), dict(status=status) def create_queue(self): request_url = urlparse(self.uri) queue_name = self._get_param("QueueName") try: queue = self.sqs_backend.create_queue( queue_name, self.tags, **self.attribute ) except MessageAttributesInvalid as e: return self._error("InvalidParameterValue", e.description) template = self.response_template(CREATE_QUEUE_RESPONSE) return template.render(queue_url=queue.url(request_url)) def get_queue_url(self): request_url = urlparse(self.uri) queue_name = self._get_param("QueueName") queue = self.sqs_backend.get_queue_url(queue_name) template = self.response_template(GET_QUEUE_URL_RESPONSE) return template.render(queue_url=queue.url(request_url)) def list_queues(self): request_url = urlparse(self.uri) queue_name_prefix = self._get_param("QueueNamePrefix") queues = self.sqs_backend.list_queues(queue_name_prefix) template = self.response_template(LIST_QUEUES_RESPONSE) return template.render(queues=queues, request_url=request_url) def change_message_visibility(self): queue_name = self._get_queue_name() receipt_handle = self._get_param("ReceiptHandle") try: visibility_timeout = self._get_validated_visibility_timeout() except ValueError: return ERROR_MAX_VISIBILITY_TIMEOUT_RESPONSE, dict(status=400) try: self.sqs_backend.change_message_visibility( queue_name=queue_name, receipt_handle=receipt_handle, visibility_timeout=visibility_timeout, ) except MessageNotInflight as e: return ( "Invalid request: {0}".format(e.description), dict(status=e.status_code), ) template = self.response_template(CHANGE_MESSAGE_VISIBILITY_RESPONSE) return template.render() def change_message_visibility_batch(self): queue_name = self._get_queue_name() entries = self._get_list_prefix("ChangeMessageVisibilityBatchRequestEntry") success = [] error = [] for entry in entries: try: visibility_timeout = self._get_validated_visibility_timeout( entry["visibility_timeout"] ) except ValueError: error.append( { "Id": entry["id"], "SenderFault": "true", "Code": "InvalidParameterValue", "Message": "Visibility timeout invalid", } ) continue try: self.sqs_backend.change_message_visibility( queue_name=queue_name, receipt_handle=entry["receipt_handle"], visibility_timeout=visibility_timeout, ) success.append(entry["id"]) except ReceiptHandleIsInvalid as e: error.append( { "Id": entry["id"], "SenderFault": "true", "Code": "ReceiptHandleIsInvalid", "Message": e.description, } ) except MessageNotInflight as e: error.append( { "Id": entry["id"], "SenderFault": "false", "Code": "AWS.SimpleQueueService.MessageNotInflight", "Message": e.description, } ) template = self.response_template(CHANGE_MESSAGE_VISIBILITY_BATCH_RESPONSE) return template.render(success=success, errors=error) def get_queue_attributes(self): queue_name = self._get_queue_name() if self.querystring.get("AttributeNames"): raise InvalidAttributeName("") attribute_names = self._get_multi_param("AttributeName") attributes = self.sqs_backend.get_queue_attributes(queue_name, attribute_names) template = self.response_template(GET_QUEUE_ATTRIBUTES_RESPONSE) return template.render(attributes=attributes) def set_queue_attributes(self): # TODO validate self.get_param('QueueUrl') queue_name = self._get_queue_name() self.sqs_backend.set_queue_attributes(queue_name, self.attribute) return SET_QUEUE_ATTRIBUTE_RESPONSE def delete_queue(self): # TODO validate self.get_param('QueueUrl') queue_name = self._get_queue_name() queue = self.sqs_backend.delete_queue(queue_name) if not queue: return ( "A queue with name {0} does not exist".format(queue_name), dict(status=404), ) template = self.response_template(DELETE_QUEUE_RESPONSE) return template.render(queue=queue) def send_message(self): message = self._get_param("MessageBody") delay_seconds = int(self._get_param("DelaySeconds", 0)) message_group_id = self._get_param("MessageGroupId") message_dedupe_id = self._get_param("MessageDeduplicationId") if len(message) > MAXIMUM_MESSAGE_LENGTH: return ERROR_TOO_LONG_RESPONSE, dict(status=400) try: message_attributes = parse_message_attributes(self.querystring) except MessageAttributesInvalid as e: return e.description, dict(status=e.status_code) queue_name = self._get_queue_name() message = self.sqs_backend.send_message( queue_name, message, message_attributes=message_attributes, delay_seconds=delay_seconds, deduplication_id=message_dedupe_id, group_id=message_group_id, ) template = self.response_template(SEND_MESSAGE_RESPONSE) return template.render(message=message, message_attributes=message_attributes) def send_message_batch(self): """ The querystring comes like this 'SendMessageBatchRequestEntry.1.DelaySeconds': ['0'], 'SendMessageBatchRequestEntry.1.MessageBody': ['test message 1'], 'SendMessageBatchRequestEntry.1.Id': ['6d0f122d-4b13-da2c-378f-e74244d8ad11'] 'SendMessageBatchRequestEntry.2.Id': ['ff8cbf59-70a2-c1cb-44c7-b7469f1ba390'], 'SendMessageBatchRequestEntry.2.MessageBody': ['test message 2'], 'SendMessageBatchRequestEntry.2.DelaySeconds': ['0'], """ queue_name = self._get_queue_name() self.sqs_backend.get_queue(queue_name) if self.querystring.get("Entries"): raise EmptyBatchRequest() entries = {} for key, value in self.querystring.items(): match = re.match(r"^SendMessageBatchRequestEntry\.(\d+)\.Id", key) if match: index = match.group(1) message_attributes = parse_message_attributes( self.querystring, base="SendMessageBatchRequestEntry.{}.".format(index), ) entries[index] = { "Id": value[0], "MessageBody": self.querystring.get( "SendMessageBatchRequestEntry.{}.MessageBody".format(index) )[0], "DelaySeconds": self.querystring.get( "SendMessageBatchRequestEntry.{}.DelaySeconds".format(index), [None], )[0], "MessageAttributes": message_attributes, } messages = self.sqs_backend.send_message_batch(queue_name, entries) template = self.response_template(SEND_MESSAGE_BATCH_RESPONSE) return template.render(messages=messages) def delete_message(self): queue_name = self._get_queue_name() receipt_handle = self.querystring.get("ReceiptHandle")[0] self.sqs_backend.delete_message(queue_name, receipt_handle) template = self.response_template(DELETE_MESSAGE_RESPONSE) return template.render() def delete_message_batch(self): """ The querystring comes like this 'DeleteMessageBatchRequestEntry.1.Id': ['message_1'], 'DeleteMessageBatchRequestEntry.1.ReceiptHandle': ['asdfsfs...'], 'DeleteMessageBatchRequestEntry.2.Id': ['message_2'], 'DeleteMessageBatchRequestEntry.2.ReceiptHandle': ['zxcvfda...'], ... """ queue_name = self._get_queue_name() message_ids = [] for index in range(1, 11): # Loop through looking for messages receipt_key = "DeleteMessageBatchRequestEntry.{0}.ReceiptHandle".format( index ) receipt_handle = self.querystring.get(receipt_key) if not receipt_handle: # Found all messages break self.sqs_backend.delete_message(queue_name, receipt_handle[0]) message_user_id_key = "DeleteMessageBatchRequestEntry.{0}.Id".format(index) message_user_id = self.querystring.get(message_user_id_key)[0] message_ids.append(message_user_id) template = self.response_template(DELETE_MESSAGE_BATCH_RESPONSE) return template.render(message_ids=message_ids) def purge_queue(self): queue_name = self._get_queue_name() self.sqs_backend.purge_queue(queue_name) template = self.response_template(PURGE_QUEUE_RESPONSE) return template.render() def receive_message(self): queue_name = self._get_queue_name() queue = self.sqs_backend.get_queue(queue_name) try: message_count = int(self.querystring.get("MaxNumberOfMessages")[0]) except TypeError: message_count = DEFAULT_RECEIVED_MESSAGES if message_count < 1 or message_count > 10: return self._error( "InvalidParameterValue", "An error occurred (InvalidParameterValue) when calling " "the ReceiveMessage operation: Value %s for parameter " "MaxNumberOfMessages is invalid. Reason: must be between " "1 and 10, if provided." % message_count, ) try: wait_time = int(self.querystring.get("WaitTimeSeconds")[0]) except TypeError: wait_time = int(queue.receive_message_wait_time_seconds) if wait_time < 0 or wait_time > 20: return self._error( "InvalidParameterValue", "An error occurred (InvalidParameterValue) when calling " "the ReceiveMessage operation: Value %s for parameter " "WaitTimeSeconds is invalid. Reason: must be <= 0 and " ">= 20 if provided." % wait_time, ) try: visibility_timeout = self._get_validated_visibility_timeout() except TypeError: visibility_timeout = queue.visibility_timeout except ValueError: return ERROR_MAX_VISIBILITY_TIMEOUT_RESPONSE, dict(status=400) messages = self.sqs_backend.receive_messages( queue_name, message_count, wait_time, visibility_timeout ) template = self.response_template(RECEIVE_MESSAGE_RESPONSE) return template.render(messages=messages) def list_dead_letter_source_queues(self): request_url = urlparse(self.uri) queue_name = self._get_queue_name() source_queue_urls = self.sqs_backend.list_dead_letter_source_queues(queue_name) template = self.response_template(LIST_DEAD_LETTER_SOURCE_QUEUES_RESPONSE) return template.render(queues=source_queue_urls, request_url=request_url) def add_permission(self): queue_name = self._get_queue_name() actions = self._get_multi_param("ActionName") account_ids = self._get_multi_param("AWSAccountId") label = self._get_param("Label") self.sqs_backend.add_permission(queue_name, actions, account_ids, label) template = self.response_template(ADD_PERMISSION_RESPONSE) return template.render() def remove_permission(self): queue_name = self._get_queue_name() label = self._get_param("Label") self.sqs_backend.remove_permission(queue_name, label) template = self.response_template(REMOVE_PERMISSION_RESPONSE) return template.render() def tag_queue(self): queue_name = self._get_queue_name() tags = self._get_map_prefix("Tag", key_end=".Key", value_end=".Value") self.sqs_backend.tag_queue(queue_name, tags) template = self.response_template(TAG_QUEUE_RESPONSE) return template.render() def untag_queue(self): queue_name = self._get_queue_name() tag_keys = self._get_multi_param("TagKey") self.sqs_backend.untag_queue(queue_name, tag_keys) template = self.response_template(UNTAG_QUEUE_RESPONSE) return template.render() def list_queue_tags(self): queue_name = self._get_queue_name() queue = self.sqs_backend.list_queue_tags(queue_name) template = self.response_template(LIST_QUEUE_TAGS_RESPONSE) return template.render(tags=queue.tags) CREATE_QUEUE_RESPONSE = """ {{ queue_url }} """ GET_QUEUE_URL_RESPONSE = """ {{ queue_url }} """ LIST_QUEUES_RESPONSE = """ {% for queue in queues %} {{ queue.url(request_url) }} {% endfor %} """ DELETE_QUEUE_RESPONSE = """ """ GET_QUEUE_ATTRIBUTES_RESPONSE = """ {% for key, value in attributes.items() %} {{ key }} {{ value }} {% endfor %} """ SET_QUEUE_ATTRIBUTE_RESPONSE = """ """ SEND_MESSAGE_RESPONSE = """ {{- message.body_md5 -}} {% if message.message_attributes.items()|count > 0 %} {{- message.attribute_md5 -}} {% endif %} {{- message.id -}} """ RECEIVE_MESSAGE_RESPONSE = """ {% for message in messages %} {{ message.id }} {{ message.receipt_handle }} {{ message.body_md5 }} {{ message.body }} SenderId {{ message.sender_id }} SentTimestamp {{ message.sent_timestamp }} ApproximateReceiveCount {{ message.approximate_receive_count }} ApproximateFirstReceiveTimestamp {{ message.approximate_first_receive_timestamp }} {% if message.deduplication_id is not none %} MessageDeduplicationId {{ message.deduplication_id }} {% endif %} {% if message.group_id is not none %} MessageGroupId {{ message.group_id }} {% endif %} {% if message.message_attributes.items()|count > 0 %} {{- message.attribute_md5 -}} {% endif %} {% for name, value in message.message_attributes.items() %} {{ name }} {{ value.data_type }} {% if 'Binary' in value.data_type %} {{ value.binary_value }} {% else %} {{ value.string_value }} {% endif %} {% endfor %} {% endfor %} """ SEND_MESSAGE_BATCH_RESPONSE = """ {% for message in messages %} {{ message.user_id }} {{ message.id }} {{ message.body_md5 }} {% if message.message_attributes.items()|count > 0 %} {{- message.attribute_md5 -}} {% endif %} {% endfor %} """ DELETE_MESSAGE_RESPONSE = """ """ DELETE_MESSAGE_BATCH_RESPONSE = """ {% for message_id in message_ids %} {{ message_id }} {% endfor %} """ CHANGE_MESSAGE_VISIBILITY_RESPONSE = """ """ CHANGE_MESSAGE_VISIBILITY_BATCH_RESPONSE = """ {% for success_id in success %} {{ success_id }} {% endfor %} {% for error_dict in errors %} {{ error_dict['Id'] }} {{ error_dict['Code'] }} {{ error_dict['Message'] }} {{ error_dict['SenderFault'] }} {% endfor %} {{ request_id }} """ PURGE_QUEUE_RESPONSE = """ """ LIST_DEAD_LETTER_SOURCE_QUEUES_RESPONSE = """ {% for queue in queues %} {{ queue.url(request_url) }} {% endfor %} 8ffb921f-b85e-53d9-abcf-d8d0057f38fc """ ADD_PERMISSION_RESPONSE = """ {{ request_id }} """ REMOVE_PERMISSION_RESPONSE = """ {{ request_id }} """ TAG_QUEUE_RESPONSE = """ {{ request_id }} """ UNTAG_QUEUE_RESPONSE = """ {{ request_id }} """ LIST_QUEUE_TAGS_RESPONSE = """ {% for key, value in tags.items() %} {{ key }} {{ value }} {% endfor %} {{ request_id }} """ ERROR_TOO_LONG_RESPONSE = """ Sender InvalidParameterValue One or more parameters are invalid. Reason: Message must be shorter than 262144 bytes. 6fde8d1e-52cd-4581-8cd9-c512f4c64223 """ ERROR_MAX_VISIBILITY_TIMEOUT_RESPONSE = "Invalid request, maximum visibility timeout is {0}".format( MAXIMUM_VISIBILTY_TIMEOUT ) ERROR_INEXISTENT_QUEUE = """ Sender AWS.SimpleQueueService.NonExistentQueue The specified queue does not exist for this wsdl version. b8bc806b-fa6b-53b5-8be8-cfa2f9836bc3 """ ERROR_TEMPLATE = """ Sender {{ code }} {{ message }} 6fde8d1e-52cd-4581-8cd9-c512f4c64223 """