commit
8f3116220c
@ -6083,7 +6083,7 @@
|
|||||||
- [X] untag_resource
|
- [X] untag_resource
|
||||||
|
|
||||||
## sqs
|
## sqs
|
||||||
65% implemented
|
85% implemented
|
||||||
- [X] add_permission
|
- [X] add_permission
|
||||||
- [X] change_message_visibility
|
- [X] change_message_visibility
|
||||||
- [ ] change_message_visibility_batch
|
- [ ] change_message_visibility_batch
|
||||||
@ -6091,16 +6091,16 @@
|
|||||||
- [X] delete_message
|
- [X] delete_message
|
||||||
- [ ] delete_message_batch
|
- [ ] delete_message_batch
|
||||||
- [X] delete_queue
|
- [X] delete_queue
|
||||||
- [ ] get_queue_attributes
|
- [X] get_queue_attributes
|
||||||
- [ ] get_queue_url
|
- [X] get_queue_url
|
||||||
- [X] list_dead_letter_source_queues
|
- [X] list_dead_letter_source_queues
|
||||||
- [ ] list_queue_tags
|
- [X] list_queue_tags
|
||||||
- [X] list_queues
|
- [X] list_queues
|
||||||
- [X] purge_queue
|
- [X] purge_queue
|
||||||
- [ ] receive_message
|
- [ ] receive_message
|
||||||
- [X] remove_permission
|
- [X] remove_permission
|
||||||
- [X] send_message
|
- [X] send_message
|
||||||
- [ ] send_message_batch
|
- [X] send_message_batch
|
||||||
- [X] set_queue_attributes
|
- [X] set_queue_attributes
|
||||||
- [X] tag_queue
|
- [X] tag_queue
|
||||||
- [X] untag_queue
|
- [X] untag_queue
|
||||||
|
@ -454,7 +454,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
|
|||||||
index = 1
|
index = 1
|
||||||
while True:
|
while True:
|
||||||
value_dict = self._get_multi_param_helper(prefix + str(index))
|
value_dict = self._get_multi_param_helper(prefix + str(index))
|
||||||
if not value_dict:
|
if not value_dict and value_dict != '':
|
||||||
break
|
break
|
||||||
|
|
||||||
values.append(value_dict)
|
values.append(value_dict)
|
||||||
|
@ -7,9 +7,14 @@ class MessageNotInflight(Exception):
|
|||||||
status_code = 400
|
status_code = 400
|
||||||
|
|
||||||
|
|
||||||
class ReceiptHandleIsInvalid(Exception):
|
class ReceiptHandleIsInvalid(RESTError):
|
||||||
description = "The receipt handle provided is not valid."
|
code = 400
|
||||||
status_code = 400
|
|
||||||
|
def __init__(self):
|
||||||
|
super(ReceiptHandleIsInvalid, self).__init__(
|
||||||
|
'ReceiptHandleIsInvalid',
|
||||||
|
'The input receipt handle is invalid.'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class MessageAttributesInvalid(Exception):
|
class MessageAttributesInvalid(Exception):
|
||||||
@ -33,3 +38,66 @@ class QueueAlreadyExists(RESTError):
|
|||||||
def __init__(self, message):
|
def __init__(self, message):
|
||||||
super(QueueAlreadyExists, self).__init__(
|
super(QueueAlreadyExists, self).__init__(
|
||||||
"QueueAlreadyExists", message)
|
"QueueAlreadyExists", message)
|
||||||
|
|
||||||
|
|
||||||
|
class EmptyBatchRequest(RESTError):
|
||||||
|
code = 400
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super(EmptyBatchRequest, self).__init__(
|
||||||
|
'EmptyBatchRequest',
|
||||||
|
'There should be at least one SendMessageBatchRequestEntry in the request.'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidBatchEntryId(RESTError):
|
||||||
|
code = 400
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super(InvalidBatchEntryId, self).__init__(
|
||||||
|
'InvalidBatchEntryId',
|
||||||
|
'A batch entry id can only contain alphanumeric characters, '
|
||||||
|
'hyphens and underscores. It can be at most 80 letters long.'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class BatchRequestTooLong(RESTError):
|
||||||
|
code = 400
|
||||||
|
|
||||||
|
def __init__(self, length):
|
||||||
|
super(BatchRequestTooLong, self).__init__(
|
||||||
|
'BatchRequestTooLong',
|
||||||
|
'Batch requests cannot be longer than 262144 bytes. '
|
||||||
|
'You have sent {} bytes.'.format(length)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class BatchEntryIdsNotDistinct(RESTError):
|
||||||
|
code = 400
|
||||||
|
|
||||||
|
def __init__(self, entry_id):
|
||||||
|
super(BatchEntryIdsNotDistinct, self).__init__(
|
||||||
|
'BatchEntryIdsNotDistinct',
|
||||||
|
'Id {} repeated.'.format(entry_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TooManyEntriesInBatchRequest(RESTError):
|
||||||
|
code = 400
|
||||||
|
|
||||||
|
def __init__(self, number):
|
||||||
|
super(TooManyEntriesInBatchRequest, self).__init__(
|
||||||
|
'TooManyEntriesInBatchRequest',
|
||||||
|
'Maximum number of entries per request are 10. '
|
||||||
|
'You have sent {}.'.format(number)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidAttributeName(RESTError):
|
||||||
|
code = 400
|
||||||
|
|
||||||
|
def __init__(self, attribute_name):
|
||||||
|
super(InvalidAttributeName, self).__init__(
|
||||||
|
'InvalidAttributeName',
|
||||||
|
'Unknown Attribute {}.'.format(attribute_name)
|
||||||
|
)
|
||||||
|
@ -20,11 +20,18 @@ from .exceptions import (
|
|||||||
QueueDoesNotExist,
|
QueueDoesNotExist,
|
||||||
QueueAlreadyExists,
|
QueueAlreadyExists,
|
||||||
ReceiptHandleIsInvalid,
|
ReceiptHandleIsInvalid,
|
||||||
|
InvalidBatchEntryId,
|
||||||
|
BatchRequestTooLong,
|
||||||
|
BatchEntryIdsNotDistinct,
|
||||||
|
TooManyEntriesInBatchRequest,
|
||||||
|
InvalidAttributeName
|
||||||
)
|
)
|
||||||
|
|
||||||
DEFAULT_ACCOUNT_ID = 123456789012
|
DEFAULT_ACCOUNT_ID = 123456789012
|
||||||
DEFAULT_SENDER_ID = "AIDAIT2UOQQY3AUEKVGXU"
|
DEFAULT_SENDER_ID = "AIDAIT2UOQQY3AUEKVGXU"
|
||||||
|
|
||||||
|
MAXIMUM_MESSAGE_LENGTH = 262144 # 256 KiB
|
||||||
|
|
||||||
TRANSPORT_TYPE_ENCODINGS = {'String': b'\x01', 'Binary': b'\x02', 'Number': b'\x01'}
|
TRANSPORT_TYPE_ENCODINGS = {'String': b'\x01', 'Binary': b'\x02', 'Number': b'\x01'}
|
||||||
|
|
||||||
|
|
||||||
@ -155,7 +162,7 @@ class Message(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class Queue(BaseModel):
|
class Queue(BaseModel):
|
||||||
base_attributes = ['ApproximateNumberOfMessages',
|
BASE_ATTRIBUTES = ['ApproximateNumberOfMessages',
|
||||||
'ApproximateNumberOfMessagesDelayed',
|
'ApproximateNumberOfMessagesDelayed',
|
||||||
'ApproximateNumberOfMessagesNotVisible',
|
'ApproximateNumberOfMessagesNotVisible',
|
||||||
'CreatedTimestamp',
|
'CreatedTimestamp',
|
||||||
@ -166,9 +173,9 @@ class Queue(BaseModel):
|
|||||||
'QueueArn',
|
'QueueArn',
|
||||||
'ReceiveMessageWaitTimeSeconds',
|
'ReceiveMessageWaitTimeSeconds',
|
||||||
'VisibilityTimeout']
|
'VisibilityTimeout']
|
||||||
fifo_attributes = ['FifoQueue',
|
FIFO_ATTRIBUTES = ['FifoQueue',
|
||||||
'ContentBasedDeduplication']
|
'ContentBasedDeduplication']
|
||||||
kms_attributes = ['KmsDataKeyReusePeriodSeconds',
|
KMS_ATTRIBUTES = ['KmsDataKeyReusePeriodSeconds',
|
||||||
'KmsMasterKeyId']
|
'KmsMasterKeyId']
|
||||||
ALLOWED_PERMISSIONS = ('*', 'ChangeMessageVisibility', 'DeleteMessage',
|
ALLOWED_PERMISSIONS = ('*', 'ChangeMessageVisibility', 'DeleteMessage',
|
||||||
'GetQueueAttributes', 'GetQueueUrl',
|
'GetQueueAttributes', 'GetQueueUrl',
|
||||||
@ -185,8 +192,9 @@ class Queue(BaseModel):
|
|||||||
|
|
||||||
now = unix_time()
|
now = unix_time()
|
||||||
self.created_timestamp = now
|
self.created_timestamp = now
|
||||||
self.queue_arn = 'arn:aws:sqs:{0}:123456789012:{1}'.format(self.region,
|
self.queue_arn = 'arn:aws:sqs:{0}:{1}:{2}'.format(self.region,
|
||||||
self.name)
|
DEFAULT_ACCOUNT_ID,
|
||||||
|
self.name)
|
||||||
self.dead_letter_queue = None
|
self.dead_letter_queue = None
|
||||||
|
|
||||||
self.lambda_event_source_mappings = {}
|
self.lambda_event_source_mappings = {}
|
||||||
@ -330,17 +338,17 @@ class Queue(BaseModel):
|
|||||||
def attributes(self):
|
def attributes(self):
|
||||||
result = {}
|
result = {}
|
||||||
|
|
||||||
for attribute in self.base_attributes:
|
for attribute in self.BASE_ATTRIBUTES:
|
||||||
attr = getattr(self, camelcase_to_underscores(attribute))
|
attr = getattr(self, camelcase_to_underscores(attribute))
|
||||||
result[attribute] = attr
|
result[attribute] = attr
|
||||||
|
|
||||||
if self.fifo_queue:
|
if self.fifo_queue:
|
||||||
for attribute in self.fifo_attributes:
|
for attribute in self.FIFO_ATTRIBUTES:
|
||||||
attr = getattr(self, camelcase_to_underscores(attribute))
|
attr = getattr(self, camelcase_to_underscores(attribute))
|
||||||
result[attribute] = attr
|
result[attribute] = attr
|
||||||
|
|
||||||
if self.kms_master_key_id:
|
if self.kms_master_key_id:
|
||||||
for attribute in self.kms_attributes:
|
for attribute in self.KMS_ATTRIBUTES:
|
||||||
attr = getattr(self, camelcase_to_underscores(attribute))
|
attr = getattr(self, camelcase_to_underscores(attribute))
|
||||||
result[attribute] = attr
|
result[attribute] = attr
|
||||||
|
|
||||||
@ -460,6 +468,9 @@ class SQSBackend(BaseBackend):
|
|||||||
|
|
||||||
return queue
|
return queue
|
||||||
|
|
||||||
|
def get_queue_url(self, queue_name):
|
||||||
|
return self.get_queue(queue_name)
|
||||||
|
|
||||||
def list_queues(self, queue_name_prefix):
|
def list_queues(self, queue_name_prefix):
|
||||||
re_str = '.*'
|
re_str = '.*'
|
||||||
if queue_name_prefix:
|
if queue_name_prefix:
|
||||||
@ -482,6 +493,28 @@ class SQSBackend(BaseBackend):
|
|||||||
return self.queues.pop(queue_name)
|
return self.queues.pop(queue_name)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def get_queue_attributes(self, queue_name, attribute_names):
|
||||||
|
queue = self.get_queue(queue_name)
|
||||||
|
|
||||||
|
if not len(attribute_names):
|
||||||
|
attribute_names.append('All')
|
||||||
|
|
||||||
|
valid_names = ['All'] + queue.BASE_ATTRIBUTES + queue.FIFO_ATTRIBUTES + queue.KMS_ATTRIBUTES
|
||||||
|
invalid_name = next((name for name in attribute_names if name not in valid_names), None)
|
||||||
|
|
||||||
|
if invalid_name or invalid_name == '':
|
||||||
|
raise InvalidAttributeName(invalid_name)
|
||||||
|
|
||||||
|
attributes = {}
|
||||||
|
|
||||||
|
if 'All' in attribute_names:
|
||||||
|
attributes = queue.attributes
|
||||||
|
else:
|
||||||
|
for name in (name for name in attribute_names if name in queue.attributes):
|
||||||
|
attributes[name] = queue.attributes.get(name)
|
||||||
|
|
||||||
|
return attributes
|
||||||
|
|
||||||
def set_queue_attributes(self, queue_name, attributes):
|
def set_queue_attributes(self, queue_name, attributes):
|
||||||
queue = self.get_queue(queue_name)
|
queue = self.get_queue(queue_name)
|
||||||
queue._set_attributes(attributes)
|
queue._set_attributes(attributes)
|
||||||
@ -516,6 +549,49 @@ class SQSBackend(BaseBackend):
|
|||||||
|
|
||||||
return message
|
return message
|
||||||
|
|
||||||
|
def send_message_batch(self, queue_name, entries):
|
||||||
|
self.get_queue(queue_name)
|
||||||
|
|
||||||
|
if any(not re.match(r'^[\w-]{1,80}$', entry['Id']) for entry in entries.values()):
|
||||||
|
raise InvalidBatchEntryId()
|
||||||
|
|
||||||
|
body_length = next(
|
||||||
|
(len(entry['MessageBody']) for entry in entries.values() if len(entry['MessageBody']) > MAXIMUM_MESSAGE_LENGTH),
|
||||||
|
False
|
||||||
|
)
|
||||||
|
if body_length:
|
||||||
|
raise BatchRequestTooLong(body_length)
|
||||||
|
|
||||||
|
duplicate_id = self._get_first_duplicate_id([entry['Id'] for entry in entries.values()])
|
||||||
|
if duplicate_id:
|
||||||
|
raise BatchEntryIdsNotDistinct(duplicate_id)
|
||||||
|
|
||||||
|
if len(entries) > 10:
|
||||||
|
raise TooManyEntriesInBatchRequest(len(entries))
|
||||||
|
|
||||||
|
messages = []
|
||||||
|
for index, entry in entries.items():
|
||||||
|
# Loop through looking for messages
|
||||||
|
message = self.send_message(
|
||||||
|
queue_name,
|
||||||
|
entry['MessageBody'],
|
||||||
|
message_attributes=entry['MessageAttributes'],
|
||||||
|
delay_seconds=entry['DelaySeconds']
|
||||||
|
)
|
||||||
|
message.user_id = entry['Id']
|
||||||
|
|
||||||
|
messages.append(message)
|
||||||
|
|
||||||
|
return messages
|
||||||
|
|
||||||
|
def _get_first_duplicate_id(self, ids):
|
||||||
|
unique_ids = set()
|
||||||
|
for id in ids:
|
||||||
|
if id in unique_ids:
|
||||||
|
return id
|
||||||
|
unique_ids.add(id)
|
||||||
|
return None
|
||||||
|
|
||||||
def receive_messages(self, queue_name, count, wait_seconds_timeout, visibility_timeout):
|
def receive_messages(self, queue_name, count, wait_seconds_timeout, visibility_timeout):
|
||||||
"""
|
"""
|
||||||
Attempt to retrieve visible messages from a queue.
|
Attempt to retrieve visible messages from a queue.
|
||||||
@ -593,6 +669,10 @@ class SQSBackend(BaseBackend):
|
|||||||
|
|
||||||
def delete_message(self, queue_name, receipt_handle):
|
def delete_message(self, queue_name, receipt_handle):
|
||||||
queue = self.get_queue(queue_name)
|
queue = self.get_queue(queue_name)
|
||||||
|
|
||||||
|
if not any(message.receipt_handle == receipt_handle for message in queue._messages):
|
||||||
|
raise ReceiptHandleIsInvalid()
|
||||||
|
|
||||||
new_messages = []
|
new_messages = []
|
||||||
for message in queue._messages:
|
for message in queue._messages:
|
||||||
# Only delete message if it is not visible and the reciept_handle
|
# Only delete message if it is not visible and the reciept_handle
|
||||||
@ -677,6 +757,9 @@ class SQSBackend(BaseBackend):
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def list_queue_tags(self, queue_name):
|
||||||
|
return self.get_queue(queue_name)
|
||||||
|
|
||||||
|
|
||||||
sqs_backends = {}
|
sqs_backends = {}
|
||||||
for region in boto.sqs.regions():
|
for region in boto.sqs.regions():
|
||||||
|
@ -11,6 +11,8 @@ from .exceptions import (
|
|||||||
MessageAttributesInvalid,
|
MessageAttributesInvalid,
|
||||||
MessageNotInflight,
|
MessageNotInflight,
|
||||||
ReceiptHandleIsInvalid,
|
ReceiptHandleIsInvalid,
|
||||||
|
EmptyBatchRequest,
|
||||||
|
InvalidAttributeName
|
||||||
)
|
)
|
||||||
|
|
||||||
MAXIMUM_VISIBILTY_TIMEOUT = 43200
|
MAXIMUM_VISIBILTY_TIMEOUT = 43200
|
||||||
@ -89,13 +91,10 @@ class SQSResponse(BaseResponse):
|
|||||||
request_url = urlparse(self.uri)
|
request_url = urlparse(self.uri)
|
||||||
queue_name = self._get_param("QueueName")
|
queue_name = self._get_param("QueueName")
|
||||||
|
|
||||||
queue = self.sqs_backend.get_queue(queue_name)
|
queue = self.sqs_backend.get_queue_url(queue_name)
|
||||||
|
|
||||||
if queue:
|
template = self.response_template(GET_QUEUE_URL_RESPONSE)
|
||||||
template = self.response_template(GET_QUEUE_URL_RESPONSE)
|
return template.render(queue_url=queue.url(request_url))
|
||||||
return template.render(queue=queue, request_url=request_url)
|
|
||||||
else:
|
|
||||||
return "", dict(status=404)
|
|
||||||
|
|
||||||
def list_queues(self):
|
def list_queues(self):
|
||||||
request_url = urlparse(self.uri)
|
request_url = urlparse(self.uri)
|
||||||
@ -119,7 +118,7 @@ class SQSResponse(BaseResponse):
|
|||||||
receipt_handle=receipt_handle,
|
receipt_handle=receipt_handle,
|
||||||
visibility_timeout=visibility_timeout
|
visibility_timeout=visibility_timeout
|
||||||
)
|
)
|
||||||
except (ReceiptHandleIsInvalid, MessageNotInflight) as e:
|
except MessageNotInflight as e:
|
||||||
return "Invalid request: {0}".format(e.description), dict(status=e.status_code)
|
return "Invalid request: {0}".format(e.description), dict(status=e.status_code)
|
||||||
|
|
||||||
template = self.response_template(CHANGE_MESSAGE_VISIBILITY_RESPONSE)
|
template = self.response_template(CHANGE_MESSAGE_VISIBILITY_RESPONSE)
|
||||||
@ -171,10 +170,15 @@ class SQSResponse(BaseResponse):
|
|||||||
def get_queue_attributes(self):
|
def get_queue_attributes(self):
|
||||||
queue_name = self._get_queue_name()
|
queue_name = self._get_queue_name()
|
||||||
|
|
||||||
queue = self.sqs_backend.get_queue(queue_name)
|
if self.querystring.get('AttributeNames'):
|
||||||
|
raise InvalidAttributeName('')
|
||||||
|
|
||||||
|
attribute_names = self._get_multi_param('AttributeName')
|
||||||
|
|
||||||
|
attributes = self.sqs_backend.get_queue_attributes(queue_name, attribute_names)
|
||||||
|
|
||||||
template = self.response_template(GET_QUEUE_ATTRIBUTES_RESPONSE)
|
template = self.response_template(GET_QUEUE_ATTRIBUTES_RESPONSE)
|
||||||
return template.render(queue=queue)
|
return template.render(attributes=attributes)
|
||||||
|
|
||||||
def set_queue_attributes(self):
|
def set_queue_attributes(self):
|
||||||
# TODO validate self.get_param('QueueUrl')
|
# TODO validate self.get_param('QueueUrl')
|
||||||
@ -237,72 +241,31 @@ class SQSResponse(BaseResponse):
|
|||||||
self.sqs_backend.get_queue(queue_name)
|
self.sqs_backend.get_queue(queue_name)
|
||||||
|
|
||||||
if self.querystring.get('Entries'):
|
if self.querystring.get('Entries'):
|
||||||
return self._error('AWS.SimpleQueueService.EmptyBatchRequest',
|
raise EmptyBatchRequest()
|
||||||
'There should be at least one SendMessageBatchRequestEntry in the request.')
|
|
||||||
|
|
||||||
entries = {}
|
entries = {}
|
||||||
for key, value in self.querystring.items():
|
for key, value in self.querystring.items():
|
||||||
match = re.match(r'^SendMessageBatchRequestEntry\.(\d+)\.Id', key)
|
match = re.match(r'^SendMessageBatchRequestEntry\.(\d+)\.Id', key)
|
||||||
if match:
|
if match:
|
||||||
entries[match.group(1)] = {
|
index = match.group(1)
|
||||||
|
|
||||||
|
message_attributes = parse_message_attributes(
|
||||||
|
self.querystring, base='SendMessageBatchRequestEntry.{}.'.format(index))
|
||||||
|
|
||||||
|
entries[index] = {
|
||||||
'Id': value[0],
|
'Id': value[0],
|
||||||
'MessageBody': self.querystring.get(
|
'MessageBody': self.querystring.get(
|
||||||
'SendMessageBatchRequestEntry.{}.MessageBody'.format(match.group(1)))[0]
|
'SendMessageBatchRequestEntry.{}.MessageBody'.format(index))[0],
|
||||||
|
'DelaySeconds': self.querystring.get(
|
||||||
|
'SendMessageBatchRequestEntry.{}.DelaySeconds'.format(index), [None])[0],
|
||||||
|
'MessageAttributes': message_attributes
|
||||||
}
|
}
|
||||||
|
|
||||||
if any(not re.match(r'^[\w-]{1,80}$', entry['Id']) for entry in entries.values()):
|
messages = self.sqs_backend.send_message_batch(queue_name, entries)
|
||||||
return self._error('AWS.SimpleQueueService.InvalidBatchEntryId',
|
|
||||||
'A batch entry id can only contain alphanumeric characters, '
|
|
||||||
'hyphens and underscores. It can be at most 80 letters long.')
|
|
||||||
|
|
||||||
body_length = next(
|
|
||||||
(len(entry['MessageBody']) for entry in entries.values() if len(entry['MessageBody']) > MAXIMUM_MESSAGE_LENGTH),
|
|
||||||
False
|
|
||||||
)
|
|
||||||
if body_length:
|
|
||||||
return self._error('AWS.SimpleQueueService.BatchRequestTooLong',
|
|
||||||
'Batch requests cannot be longer than 262144 bytes. '
|
|
||||||
'You have sent {} bytes.'.format(body_length))
|
|
||||||
|
|
||||||
duplicate_id = self._get_first_duplicate_id([entry['Id'] for entry in entries.values()])
|
|
||||||
if duplicate_id:
|
|
||||||
return self._error('AWS.SimpleQueueService.BatchEntryIdsNotDistinct',
|
|
||||||
'Id {} repeated.'.format(duplicate_id))
|
|
||||||
|
|
||||||
if len(entries) > 10:
|
|
||||||
return self._error('AWS.SimpleQueueService.TooManyEntriesInBatchRequest',
|
|
||||||
'Maximum number of entries per request are 10. '
|
|
||||||
'You have sent 11.')
|
|
||||||
|
|
||||||
messages = []
|
|
||||||
for index, entry in entries.items():
|
|
||||||
# Loop through looking for messages
|
|
||||||
delay_key = 'SendMessageBatchRequestEntry.{0}.DelaySeconds'.format(
|
|
||||||
index)
|
|
||||||
delay_seconds = self.querystring.get(delay_key, [None])[0]
|
|
||||||
message = self.sqs_backend.send_message(
|
|
||||||
queue_name, entry['MessageBody'], delay_seconds=delay_seconds)
|
|
||||||
message.user_id = entry['Id']
|
|
||||||
|
|
||||||
message_attributes = parse_message_attributes(
|
|
||||||
self.querystring, base='SendMessageBatchRequestEntry.{0}.'.format(index))
|
|
||||||
if type(message_attributes) == tuple:
|
|
||||||
return message_attributes[0], message_attributes[1]
|
|
||||||
message.message_attributes = message_attributes
|
|
||||||
|
|
||||||
messages.append(message)
|
|
||||||
|
|
||||||
template = self.response_template(SEND_MESSAGE_BATCH_RESPONSE)
|
template = self.response_template(SEND_MESSAGE_BATCH_RESPONSE)
|
||||||
return template.render(messages=messages)
|
return template.render(messages=messages)
|
||||||
|
|
||||||
def _get_first_duplicate_id(self, ids):
|
|
||||||
unique_ids = set()
|
|
||||||
for id in ids:
|
|
||||||
if id in unique_ids:
|
|
||||||
return id
|
|
||||||
unique_ids.add(id)
|
|
||||||
return None
|
|
||||||
|
|
||||||
def delete_message(self):
|
def delete_message(self):
|
||||||
queue_name = self._get_queue_name()
|
queue_name = self._get_queue_name()
|
||||||
receipt_handle = self.querystring.get("ReceiptHandle")[0]
|
receipt_handle = self.querystring.get("ReceiptHandle")[0]
|
||||||
@ -441,7 +404,7 @@ class SQSResponse(BaseResponse):
|
|||||||
def list_queue_tags(self):
|
def list_queue_tags(self):
|
||||||
queue_name = self._get_queue_name()
|
queue_name = self._get_queue_name()
|
||||||
|
|
||||||
queue = self.sqs_backend.get_queue(queue_name)
|
queue = self.sqs_backend.list_queue_tags(queue_name)
|
||||||
|
|
||||||
template = self.response_template(LIST_QUEUE_TAGS_RESPONSE)
|
template = self.response_template(LIST_QUEUE_TAGS_RESPONSE)
|
||||||
return template.render(tags=queue.tags)
|
return template.render(tags=queue.tags)
|
||||||
@ -458,7 +421,7 @@ CREATE_QUEUE_RESPONSE = """<CreateQueueResponse>
|
|||||||
|
|
||||||
GET_QUEUE_URL_RESPONSE = """<GetQueueUrlResponse>
|
GET_QUEUE_URL_RESPONSE = """<GetQueueUrlResponse>
|
||||||
<GetQueueUrlResult>
|
<GetQueueUrlResult>
|
||||||
<QueueUrl>{{ queue.url(request_url) }}</QueueUrl>
|
<QueueUrl>{{ queue_url }}</QueueUrl>
|
||||||
</GetQueueUrlResult>
|
</GetQueueUrlResult>
|
||||||
<ResponseMetadata>
|
<ResponseMetadata>
|
||||||
<RequestId></RequestId>
|
<RequestId></RequestId>
|
||||||
@ -484,7 +447,7 @@ DELETE_QUEUE_RESPONSE = """<DeleteQueueResponse>
|
|||||||
|
|
||||||
GET_QUEUE_ATTRIBUTES_RESPONSE = """<GetQueueAttributesResponse>
|
GET_QUEUE_ATTRIBUTES_RESPONSE = """<GetQueueAttributesResponse>
|
||||||
<GetQueueAttributesResult>
|
<GetQueueAttributesResult>
|
||||||
{% for key, value in queue.attributes.items() %}
|
{% for key, value in attributes.items() %}
|
||||||
<Attribute>
|
<Attribute>
|
||||||
<Name>{{ key }}</Name>
|
<Name>{{ key }}</Name>
|
||||||
<Value>{{ value }}</Value>
|
<Value>{{ value }}</Value>
|
||||||
|
@ -5,6 +5,7 @@ import os
|
|||||||
import boto
|
import boto
|
||||||
import boto3
|
import boto3
|
||||||
import botocore.exceptions
|
import botocore.exceptions
|
||||||
|
import six
|
||||||
from botocore.exceptions import ClientError
|
from botocore.exceptions import ClientError
|
||||||
from boto.exception import SQSError
|
from boto.exception import SQSError
|
||||||
from boto.sqs.message import RawMessage, Message
|
from boto.sqs.message import RawMessage, Message
|
||||||
@ -144,18 +145,40 @@ def test_create_queue_kms():
|
|||||||
def test_create_queue_with_tags():
|
def test_create_queue_with_tags():
|
||||||
client = boto3.client('sqs', region_name='us-east-1')
|
client = boto3.client('sqs', region_name='us-east-1')
|
||||||
response = client.create_queue(
|
response = client.create_queue(
|
||||||
QueueName = 'test-queue-with-tags',
|
QueueName='test-queue-with-tags',
|
||||||
tags = {
|
tags={
|
||||||
'tag_key_1': 'tag_value_1'
|
'tag_key_1': 'tag_value_1'
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
queue_url = response['QueueUrl']
|
queue_url = response['QueueUrl']
|
||||||
|
|
||||||
client.list_queue_tags(QueueUrl = queue_url)['Tags'].should.equal({
|
client.list_queue_tags(QueueUrl=queue_url)['Tags'].should.equal({
|
||||||
'tag_key_1': 'tag_value_1'
|
'tag_key_1': 'tag_value_1'
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_get_queue_url():
|
||||||
|
client = boto3.client('sqs', region_name='us-east-1')
|
||||||
|
client.create_queue(QueueName='test-queue')
|
||||||
|
|
||||||
|
response = client.get_queue_url(QueueName='test-queue')
|
||||||
|
|
||||||
|
response.should.have.key('QueueUrl').which.should.contain('test-queue')
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_get_queue_url_errors():
|
||||||
|
client = boto3.client('sqs', region_name='us-east-1')
|
||||||
|
|
||||||
|
client.get_queue_url.when.called_with(
|
||||||
|
QueueName='non-existing-queue'
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'The specified queue does not exist for this wsdl version.'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@mock_sqs
|
@mock_sqs
|
||||||
def test_get_nonexistent_queue():
|
def test_get_nonexistent_queue():
|
||||||
sqs = boto3.resource('sqs', region_name='us-east-1')
|
sqs = boto3.resource('sqs', region_name='us-east-1')
|
||||||
@ -341,6 +364,98 @@ def test_delete_queue():
|
|||||||
queue.delete()
|
queue.delete()
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_get_queue_attributes():
|
||||||
|
client = boto3.client('sqs', region_name='us-east-1')
|
||||||
|
response = client.create_queue(QueueName='test-queue')
|
||||||
|
queue_url = response['QueueUrl']
|
||||||
|
|
||||||
|
response = client.get_queue_attributes(QueueUrl=queue_url)
|
||||||
|
|
||||||
|
response['Attributes']['ApproximateNumberOfMessages'].should.equal('0')
|
||||||
|
response['Attributes']['ApproximateNumberOfMessagesDelayed'].should.equal('0')
|
||||||
|
response['Attributes']['ApproximateNumberOfMessagesNotVisible'].should.equal('0')
|
||||||
|
response['Attributes']['CreatedTimestamp'].should.be.a(six.string_types)
|
||||||
|
response['Attributes']['DelaySeconds'].should.equal('0')
|
||||||
|
response['Attributes']['LastModifiedTimestamp'].should.be.a(six.string_types)
|
||||||
|
response['Attributes']['MaximumMessageSize'].should.equal('65536')
|
||||||
|
response['Attributes']['MessageRetentionPeriod'].should.equal('345600')
|
||||||
|
response['Attributes']['QueueArn'].should.equal('arn:aws:sqs:us-east-1:123456789012:test-queue')
|
||||||
|
response['Attributes']['ReceiveMessageWaitTimeSeconds'].should.equal('0')
|
||||||
|
response['Attributes']['VisibilityTimeout'].should.equal('30')
|
||||||
|
|
||||||
|
response = client.get_queue_attributes(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
AttributeNames=[
|
||||||
|
'ApproximateNumberOfMessages',
|
||||||
|
'MaximumMessageSize',
|
||||||
|
'QueueArn',
|
||||||
|
'VisibilityTimeout'
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
response['Attributes'].should.equal({
|
||||||
|
'ApproximateNumberOfMessages': '0',
|
||||||
|
'MaximumMessageSize': '65536',
|
||||||
|
'QueueArn': 'arn:aws:sqs:us-east-1:123456789012:test-queue',
|
||||||
|
'VisibilityTimeout': '30'
|
||||||
|
})
|
||||||
|
|
||||||
|
# should not return any attributes, if it was not set before
|
||||||
|
response = client.get_queue_attributes(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
AttributeNames=[
|
||||||
|
'KmsMasterKeyId'
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
response.should_not.have.key('Attributes')
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_get_queue_attributes_errors():
|
||||||
|
client = boto3.client('sqs', region_name='us-east-1')
|
||||||
|
response = client.create_queue(QueueName='test-queue')
|
||||||
|
queue_url = response['QueueUrl']
|
||||||
|
|
||||||
|
client.get_queue_attributes.when.called_with(
|
||||||
|
QueueUrl=queue_url + '-non-existing'
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'The specified queue does not exist for this wsdl version.'
|
||||||
|
)
|
||||||
|
|
||||||
|
client.get_queue_attributes.when.called_with(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
AttributeNames=[
|
||||||
|
'QueueArn',
|
||||||
|
'not-existing',
|
||||||
|
'VisibilityTimeout'
|
||||||
|
]
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'Unknown Attribute not-existing.'
|
||||||
|
)
|
||||||
|
|
||||||
|
client.get_queue_attributes.when.called_with(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
AttributeNames=[
|
||||||
|
''
|
||||||
|
]
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'Unknown Attribute .'
|
||||||
|
)
|
||||||
|
|
||||||
|
client.get_queue_attributes.when.called_with(
|
||||||
|
QueueUrl = queue_url,
|
||||||
|
AttributeNames = []
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'Unknown Attribute .'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@mock_sqs
|
@mock_sqs
|
||||||
def test_set_queue_attribute():
|
def test_set_queue_attribute():
|
||||||
sqs = boto3.resource('sqs', region_name='us-east-1')
|
sqs = boto3.resource('sqs', region_name='us-east-1')
|
||||||
@ -883,10 +998,100 @@ def test_delete_message_after_visibility_timeout():
|
|||||||
|
|
||||||
|
|
||||||
@mock_sqs
|
@mock_sqs
|
||||||
def test_send_message_batch_errors():
|
def test_delete_message_errors():
|
||||||
client = boto3.client('sqs', region_name = 'us-east-1')
|
client = boto3.client('sqs', region_name='us-east-1')
|
||||||
|
response = client.create_queue(QueueName='test-queue')
|
||||||
|
queue_url = response['QueueUrl']
|
||||||
|
client.send_message(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
MessageBody='body'
|
||||||
|
)
|
||||||
|
response = client.receive_message(
|
||||||
|
QueueUrl=queue_url
|
||||||
|
)
|
||||||
|
receipt_handle = response['Messages'][0]['ReceiptHandle']
|
||||||
|
|
||||||
response = client.create_queue(QueueName='test-queue-with-tags')
|
client.delete_message.when.called_with(
|
||||||
|
QueueUrl=queue_url + '-not-existing',
|
||||||
|
ReceiptHandle=receipt_handle
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'The specified queue does not exist for this wsdl version.'
|
||||||
|
)
|
||||||
|
|
||||||
|
client.delete_message.when.called_with(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
ReceiptHandle='not-existing'
|
||||||
|
).should.throw(
|
||||||
|
ClientError,
|
||||||
|
'The input receipt handle is invalid.'
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_send_message_batch():
|
||||||
|
client = boto3.client('sqs', region_name='us-east-1')
|
||||||
|
response = client.create_queue(QueueName='test-queue')
|
||||||
|
queue_url = response['QueueUrl']
|
||||||
|
|
||||||
|
response = client.send_message_batch(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
Entries=[
|
||||||
|
{
|
||||||
|
'Id': 'id_1',
|
||||||
|
'MessageBody': 'body_1',
|
||||||
|
'DelaySeconds': 0,
|
||||||
|
'MessageAttributes': {
|
||||||
|
'attribute_name_1': {
|
||||||
|
'StringValue': 'attribute_value_1',
|
||||||
|
'DataType': 'String'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'Id': 'id_2',
|
||||||
|
'MessageBody': 'body_2',
|
||||||
|
'DelaySeconds': 0,
|
||||||
|
'MessageAttributes': {
|
||||||
|
'attribute_name_2': {
|
||||||
|
'StringValue': '123',
|
||||||
|
'DataType': 'Number'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
sorted([entry['Id'] for entry in response['Successful']]).should.equal([
|
||||||
|
'id_1',
|
||||||
|
'id_2'
|
||||||
|
])
|
||||||
|
|
||||||
|
response = client.receive_message(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
MaxNumberOfMessages=10
|
||||||
|
)
|
||||||
|
|
||||||
|
response['Messages'][0]['Body'].should.equal('body_1')
|
||||||
|
response['Messages'][0]['MessageAttributes'].should.equal({
|
||||||
|
'attribute_name_1': {
|
||||||
|
'StringValue': 'attribute_value_1',
|
||||||
|
'DataType': 'String'
|
||||||
|
}
|
||||||
|
})
|
||||||
|
response['Messages'][1]['Body'].should.equal('body_2')
|
||||||
|
response['Messages'][1]['MessageAttributes'].should.equal({
|
||||||
|
'attribute_name_2': {
|
||||||
|
'StringValue': '123',
|
||||||
|
'DataType': 'Number'
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_send_message_batch_errors():
|
||||||
|
client = boto3.client('sqs', region_name='us-east-1')
|
||||||
|
|
||||||
|
response = client.create_queue(QueueName='test-queue')
|
||||||
queue_url = response['QueueUrl']
|
queue_url = response['QueueUrl']
|
||||||
|
|
||||||
client.send_message_batch.when.called_with(
|
client.send_message_batch.when.called_with(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user