Cleaning up SQS for boto3. Closes #385.
This commit is contained in:
parent
ac4aef87a1
commit
519726a70a
@ -79,10 +79,17 @@ def create_backend_app(service):
|
|||||||
else:
|
else:
|
||||||
endpoint = None
|
endpoint = None
|
||||||
|
|
||||||
backend_app.route(
|
if endpoint in backend_app.view_functions:
|
||||||
|
# HACK: Sometimes we map the same view to multiple url_paths. Flask
|
||||||
|
# requries us to have different names.
|
||||||
|
endpoint += "2"
|
||||||
|
|
||||||
|
backend_app.add_url_rule(
|
||||||
url_path,
|
url_path,
|
||||||
endpoint=endpoint,
|
endpoint=endpoint,
|
||||||
methods=HTTP_METHODS)(convert_flask_to_httpretty_response(handler))
|
methods=HTTP_METHODS,
|
||||||
|
view_func=convert_flask_to_httpretty_response(handler),
|
||||||
|
)
|
||||||
|
|
||||||
return backend_app
|
return backend_app
|
||||||
|
|
||||||
|
@ -11,10 +11,11 @@ from .exceptions import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
MAXIMUM_VISIBILTY_TIMEOUT = 43200
|
MAXIMUM_VISIBILTY_TIMEOUT = 43200
|
||||||
|
DEFAULT_RECEIVED_MESSAGES = 1
|
||||||
SQS_REGION_REGEX = r'://(.+?)\.queue\.amazonaws\.com'
|
SQS_REGION_REGEX = r'://(.+?)\.queue\.amazonaws\.com'
|
||||||
|
|
||||||
|
|
||||||
class QueuesResponse(BaseResponse):
|
class SQSResponse(BaseResponse):
|
||||||
|
|
||||||
region_regex = SQS_REGION_REGEX
|
region_regex = SQS_REGION_REGEX
|
||||||
|
|
||||||
@ -22,6 +23,14 @@ class QueuesResponse(BaseResponse):
|
|||||||
def sqs_backend(self):
|
def sqs_backend(self):
|
||||||
return sqs_backends[self.region]
|
return sqs_backends[self.region]
|
||||||
|
|
||||||
|
def _get_queue_name(self):
|
||||||
|
try:
|
||||||
|
queue_name = self.querystring.get('QueueUrl')[0].split("/")[-1]
|
||||||
|
except TypeError:
|
||||||
|
# Fallback to reading from the URL
|
||||||
|
queue_name = self.path.split("/")[-1]
|
||||||
|
return queue_name
|
||||||
|
|
||||||
def create_queue(self):
|
def create_queue(self):
|
||||||
visibility_timeout = None
|
visibility_timeout = None
|
||||||
if 'Attribute.1.Name' in self.querystring and self.querystring.get('Attribute.1.Name')[0] == 'VisibilityTimeout':
|
if 'Attribute.1.Name' in self.querystring and self.querystring.get('Attribute.1.Name')[0] == 'VisibilityTimeout':
|
||||||
@ -47,17 +56,8 @@ class QueuesResponse(BaseResponse):
|
|||||||
template = self.response_template(LIST_QUEUES_RESPONSE)
|
template = self.response_template(LIST_QUEUES_RESPONSE)
|
||||||
return template.render(queues=queues)
|
return template.render(queues=queues)
|
||||||
|
|
||||||
|
|
||||||
class QueueResponse(BaseResponse):
|
|
||||||
|
|
||||||
region_regex = SQS_REGION_REGEX
|
|
||||||
|
|
||||||
@property
|
|
||||||
def sqs_backend(self):
|
|
||||||
return sqs_backends[self.region]
|
|
||||||
|
|
||||||
def change_message_visibility(self):
|
def change_message_visibility(self):
|
||||||
queue_name = self.path.split("/")[-1]
|
queue_name = self._get_queue_name()
|
||||||
receipt_handle = self.querystring.get("ReceiptHandle")[0]
|
receipt_handle = self.querystring.get("ReceiptHandle")[0]
|
||||||
visibility_timeout = int(self.querystring.get("VisibilityTimeout")[0])
|
visibility_timeout = int(self.querystring.get("VisibilityTimeout")[0])
|
||||||
|
|
||||||
@ -79,20 +79,20 @@ class QueueResponse(BaseResponse):
|
|||||||
return template.render()
|
return template.render()
|
||||||
|
|
||||||
def get_queue_attributes(self):
|
def get_queue_attributes(self):
|
||||||
queue_name = self.path.split("/")[-1]
|
queue_name = self._get_queue_name()
|
||||||
queue = self.sqs_backend.get_queue(queue_name)
|
queue = self.sqs_backend.get_queue(queue_name)
|
||||||
template = self.response_template(GET_QUEUE_ATTRIBUTES_RESPONSE)
|
template = self.response_template(GET_QUEUE_ATTRIBUTES_RESPONSE)
|
||||||
return template.render(queue=queue)
|
return template.render(queue=queue)
|
||||||
|
|
||||||
def set_queue_attributes(self):
|
def set_queue_attributes(self):
|
||||||
queue_name = self.path.split("/")[-1]
|
queue_name = self._get_queue_name()
|
||||||
key = camelcase_to_underscores(self.querystring.get('Attribute.Name')[0])
|
key = camelcase_to_underscores(self.querystring.get('Attribute.Name')[0])
|
||||||
value = self.querystring.get('Attribute.Value')[0]
|
value = self.querystring.get('Attribute.Value')[0]
|
||||||
self.sqs_backend.set_queue_attribute(queue_name, key, value)
|
self.sqs_backend.set_queue_attribute(queue_name, key, value)
|
||||||
return SET_QUEUE_ATTRIBUTE_RESPONSE
|
return SET_QUEUE_ATTRIBUTE_RESPONSE
|
||||||
|
|
||||||
def delete_queue(self):
|
def delete_queue(self):
|
||||||
queue_name = self.path.split("/")[-1]
|
queue_name = self._get_queue_name()
|
||||||
queue = self.sqs_backend.delete_queue(queue_name)
|
queue = self.sqs_backend.delete_queue(queue_name)
|
||||||
if not queue:
|
if not queue:
|
||||||
return "A queue with name {0} does not exist".format(queue_name), dict(status=404)
|
return "A queue with name {0} does not exist".format(queue_name), dict(status=404)
|
||||||
@ -113,7 +113,8 @@ class QueueResponse(BaseResponse):
|
|||||||
except MessageAttributesInvalid as e:
|
except MessageAttributesInvalid as e:
|
||||||
return e.description, dict(status=e.status_code)
|
return e.description, dict(status=e.status_code)
|
||||||
|
|
||||||
queue_name = self.path.split("/")[-1]
|
queue_name = self._get_queue_name()
|
||||||
|
|
||||||
message = self.sqs_backend.send_message(
|
message = self.sqs_backend.send_message(
|
||||||
queue_name,
|
queue_name,
|
||||||
message,
|
message,
|
||||||
@ -135,7 +136,7 @@ class QueueResponse(BaseResponse):
|
|||||||
'SendMessageBatchRequestEntry.2.DelaySeconds': ['0'],
|
'SendMessageBatchRequestEntry.2.DelaySeconds': ['0'],
|
||||||
"""
|
"""
|
||||||
|
|
||||||
queue_name = self.path.split("/")[-1]
|
queue_name = self._get_queue_name()
|
||||||
|
|
||||||
messages = []
|
messages = []
|
||||||
for index in range(1, 11):
|
for index in range(1, 11):
|
||||||
@ -164,7 +165,7 @@ class QueueResponse(BaseResponse):
|
|||||||
return template.render(messages=messages)
|
return template.render(messages=messages)
|
||||||
|
|
||||||
def delete_message(self):
|
def delete_message(self):
|
||||||
queue_name = self.path.split("/")[-1]
|
queue_name = self._get_queue_name()
|
||||||
receipt_handle = self.querystring.get("ReceiptHandle")[0]
|
receipt_handle = self.querystring.get("ReceiptHandle")[0]
|
||||||
self.sqs_backend.delete_message(queue_name, receipt_handle)
|
self.sqs_backend.delete_message(queue_name, receipt_handle)
|
||||||
template = self.response_template(DELETE_MESSAGE_RESPONSE)
|
template = self.response_template(DELETE_MESSAGE_RESPONSE)
|
||||||
@ -180,7 +181,7 @@ class QueueResponse(BaseResponse):
|
|||||||
'DeleteMessageBatchRequestEntry.2.ReceiptHandle': ['zxcvfda...'],
|
'DeleteMessageBatchRequestEntry.2.ReceiptHandle': ['zxcvfda...'],
|
||||||
...
|
...
|
||||||
"""
|
"""
|
||||||
queue_name = self.path.split("/")[-1]
|
queue_name = self._get_queue_name()
|
||||||
|
|
||||||
message_ids = []
|
message_ids = []
|
||||||
for index in range(1, 11):
|
for index in range(1, 11):
|
||||||
@ -201,14 +202,17 @@ class QueueResponse(BaseResponse):
|
|||||||
return template.render(message_ids=message_ids)
|
return template.render(message_ids=message_ids)
|
||||||
|
|
||||||
def purge_queue(self):
|
def purge_queue(self):
|
||||||
queue_name = self.path.split("/")[-1]
|
queue_name = self._get_queue_name()
|
||||||
self.sqs_backend.purge_queue(queue_name)
|
self.sqs_backend.purge_queue(queue_name)
|
||||||
template = self.response_template(PURGE_QUEUE_RESPONSE)
|
template = self.response_template(PURGE_QUEUE_RESPONSE)
|
||||||
return template.render()
|
return template.render()
|
||||||
|
|
||||||
def receive_message(self):
|
def receive_message(self):
|
||||||
queue_name = self.path.split("/")[-1]
|
queue_name = self._get_queue_name()
|
||||||
message_count = int(self.querystring.get("MaxNumberOfMessages")[0])
|
try:
|
||||||
|
message_count = int(self.querystring.get("MaxNumberOfMessages")[0])
|
||||||
|
except TypeError:
|
||||||
|
message_count = DEFAULT_RECEIVED_MESSAGES
|
||||||
messages = self.sqs_backend.receive_messages(queue_name, message_count)
|
messages = self.sqs_backend.receive_messages(queue_name, message_count)
|
||||||
template = self.response_template(RECEIVE_MESSAGE_RESPONSE)
|
template = self.response_template(RECEIVE_MESSAGE_RESPONSE)
|
||||||
output = template.render(messages=messages)
|
output = template.render(messages=messages)
|
||||||
|
@ -1,11 +1,13 @@
|
|||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
from .responses import QueueResponse, QueuesResponse
|
from .responses import SQSResponse
|
||||||
|
|
||||||
url_bases = [
|
url_bases = [
|
||||||
"https?://(.*?)(queue|sqs)(.*?).amazonaws.com"
|
"https?://(.*?)(queue|sqs)(.*?).amazonaws.com"
|
||||||
]
|
]
|
||||||
|
|
||||||
|
dispatch = SQSResponse().dispatch
|
||||||
|
|
||||||
url_paths = {
|
url_paths = {
|
||||||
'{0}/$': QueuesResponse.dispatch,
|
'{0}/$': dispatch,
|
||||||
'{0}/(?P<account_id>\d+)/(?P<queue_name>[a-zA-Z0-9\-_]+)': QueueResponse.dispatch,
|
'{0}/(?P<account_id>\d+)/(?P<queue_name>[a-zA-Z0-9\-_]+)': dispatch,
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
import boto
|
import boto
|
||||||
|
import boto3
|
||||||
from boto.exception import SQSError
|
from boto.exception import SQSError
|
||||||
from boto.sqs.message import RawMessage, Message
|
from boto.sqs.message import RawMessage, Message
|
||||||
|
|
||||||
@ -462,3 +463,17 @@ def test_delete_message_after_visibility_timeout():
|
|||||||
m1_retrieved.delete()
|
m1_retrieved.delete()
|
||||||
|
|
||||||
assert new_queue.count() == 0
|
assert new_queue.count() == 0
|
||||||
|
|
||||||
|
"""
|
||||||
|
boto3
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_boto3_message_send():
|
||||||
|
sqs = boto3.resource('sqs', region_name='us-east-1')
|
||||||
|
queue = sqs.create_queue(QueueName="blah")
|
||||||
|
queue.send_message(MessageBody="derp")
|
||||||
|
|
||||||
|
messages = queue.receive_messages()
|
||||||
|
messages.should.have.length_of(1)
|
||||||
|
Loading…
Reference in New Issue
Block a user