add most of sqs features

This commit is contained in:
Steve Pulec 2013-02-24 11:06:42 -05:00
parent 67261d3125
commit 41890225e6
5 changed files with 306 additions and 4 deletions

View File

@ -1,4 +1,5 @@
import inspect
import random
from urlparse import parse_qs
@ -41,3 +42,12 @@ def camelcase_to_underscores(argument):
def method_names_from_class(clazz):
return [x[0] for x in inspect.getmembers(clazz, predicate=inspect.ismethod)]
def get_random_hex(length=8):
chars = range(10) + ['a', 'b', 'c', 'd', 'e', 'f']
return ''.join(unicode(random.choice(chars)) for x in range(length))
def get_random_message_id():
return '{}-{}-{}-{}-{}'.format(get_random_hex(8), get_random_hex(4), get_random_hex(4), get_random_hex(4), get_random_hex(12))

View File

@ -1,13 +1,35 @@
from moto.core import BaseBackend
from moto.core.utils import camelcase_to_underscores
import base64
import md5
from moto.core import BaseBackend
from moto.core.utils import camelcase_to_underscores, get_random_message_id
from .utils import generate_receipt_handle
class Message(object):
def __init__(self, message_id, body):
self.id = message_id
self._body = body
self.receipt_handle = generate_receipt_handle()
@property
def md5(self):
body_md5 = md5.new()
body_md5.update(self.body)
return body_md5.hexdigest()
@property
def body(self):
# SQS Message bodies are base64 encoded by default
return base64.b64encode(self._body)
class Queue(object):
camelcase_attributes = ['VisibilityTimeout']
camelcase_attributes = ['VisibilityTimeout', 'ApproximateNumberOfMessages']
def __init__(self, name, visibility_timeout):
self.name = name
self.visibility_timeout = visibility_timeout
self.visibility_timeout = visibility_timeout or 30
self.messages = []
@property
def attributes(self):
@ -16,6 +38,11 @@ class Queue(object):
result[attribute] = getattr(self, camelcase_to_underscores(attribute))
return result
@property
def approximate_number_of_messages(self):
return len(self.messages)
class SQSBackend(BaseBackend):
def __init__(self):
@ -43,4 +70,27 @@ class SQSBackend(BaseBackend):
setattr(queue, key, value)
return queue
def send_message(self, queue_name, message_body, delay_seconds=None):
# TODO impemented delay_seconds
queue = self.get_queue(queue_name)
message_id = get_random_message_id()
message = Message(message_id, message_body)
queue.messages.append(message)
return message
def receive_messages(self, queue_name, count):
queue = self.get_queue(queue_name)
result = []
for index in range(count):
if queue.messages:
result.append(queue.messages.pop(0))
return result
def delete_message(self, queue_name, receipt_handle):
queue = self.get_queue(queue_name)
new_messages = [message for message in queue.messages
if message.receipt_handle != receipt_handle]
queue.message = new_messages
sqs_backend = SQSBackend()

View File

@ -1,3 +1,4 @@
import re
from urlparse import parse_qs
from jinja2 import Template
@ -66,6 +67,90 @@ class QueueResponse(BaseResponse):
template = Template(DELETE_QUEUE_RESPONSE)
return template.render(queue=queue)
def send_message(self):
message = self.querystring.get("MessageBody")[0]
queue_name = self.path.split("/")[-1]
message = sqs_backend.send_message(queue_name, message)
template = Template(SEND_MESSAGE_RESPONSE)
return template.render(message=message)
def send_message_batch(self):
"""
The querystring comes like this
'SendMessageBatchRequestEntry.1.DelaySeconds': ['0'],
'SendMessageBatchRequestEntry.1.MessageBody': ['test message 1'],
'SendMessageBatchRequestEntry.1.Id': ['6d0f122d-4b13-da2c-378f-e74244d8ad11']
'SendMessageBatchRequestEntry.2.Id': ['ff8cbf59-70a2-c1cb-44c7-b7469f1ba390'],
'SendMessageBatchRequestEntry.2.MessageBody': ['test message 2'],
'SendMessageBatchRequestEntry.2.DelaySeconds': ['0'],
"""
queue_name = self.path.split("/")[-1]
messages = []
for index in range(1, 11):
# Loop through looking for messages
message_key = 'SendMessageBatchRequestEntry.{}.MessageBody'.format(index)
message_body = self.querystring.get(message_key)
if not message_body:
# Found all messages
break
message_user_id_key = 'SendMessageBatchRequestEntry.{}.Id'.format(index)
message_user_id = self.querystring.get(message_user_id_key)[0]
delay_key = 'SendMessageBatchRequestEntry.{}.DelaySeconds'.format(index)
delay_seconds = self.querystring.get(delay_key, [None])[0]
message = sqs_backend.send_message(queue_name, message_body[0], delay_seconds=delay_seconds)
message.user_id = message_user_id
messages.append(message)
template = Template(SEND_MESSAGE_BATCH_RESPONSE)
return template.render(messages=messages)
def delete_message(self):
queue_name = self.path.split("/")[-1]
receipt_handle = self.querystring.get("ReceiptHandle")[0]
sqs_backend.delete_message(queue_name, receipt_handle)
template = Template(DELETE_MESSAGE_RESPONSE)
return template.render()
def delete_message_batch(self):
"""
The querystring comes like this
'DeleteMessageBatchRequestEntry.1.Id': ['message_1'],
'DeleteMessageBatchRequestEntry.1.ReceiptHandle': ['asdfsfs...'],
'DeleteMessageBatchRequestEntry.2.Id': ['message_2'],
'DeleteMessageBatchRequestEntry.2.ReceiptHandle': ['zxcvfda...'],
...
"""
queue_name = self.path.split("/")[-1]
message_ids = []
for index in range(1, 11):
# Loop through looking for messages
receipt_key = 'DeleteMessageBatchRequestEntry.{}.ReceiptHandle'.format(index)
receipt_handle = self.querystring.get(receipt_key)
if not receipt_handle:
# Found all messages
break
message = sqs_backend.delete_message(queue_name, receipt_handle[0])
message_user_id_key = 'DeleteMessageBatchRequestEntry.{}.Id'.format(index)
message_user_id = self.querystring.get(message_user_id_key)[0]
message_ids.append(message_user_id)
template = Template(DELETE_MESSAGE_BATCH_RESPONSE)
return template.render(message_ids=message_ids)
def receive_message(self):
queue_name = self.path.split("/")[-1]
message_count = int(self.querystring.get("MaxNumberOfMessages")[0])
messages = sqs_backend.receive_messages(queue_name, message_count)
template = Template(RECEIVE_MESSAGE_RESPONSE)
return template.render(messages=messages)
CREATE_QUEUE_RESPONSE = """<CreateQueueResponse>
@ -123,3 +208,81 @@ SET_QUEUE_ATTRIBUTE_RESPONSE = """<SetQueueAttributesResponse>
</RequestId>
</ResponseMetadata>
</SetQueueAttributesResponse>"""
SEND_MESSAGE_RESPONSE = """<SendMessageResponse>
<SendMessageResult>
<MD5OfMessageBody>
{{ message.md5 }}
</MD5OfMessageBody>
<MessageId>
{{ message.id }}
</MessageId>
</SendMessageResult>
<ResponseMetadata>
<RequestId>
27daac76-34dd-47df-bd01-1f6e873584a0
</RequestId>
</ResponseMetadata>
</SendMessageResponse>"""
RECEIVE_MESSAGE_RESPONSE = """<ReceiveMessageResponse>
<ReceiveMessageResult>
{% for message in messages %}
<Message>
<MessageId>
{{ message.id }}
</MessageId>
<ReceiptHandle>
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
</ReceiptHandle>
<MD5OfBody>
{{ message.md5 }}
</MD5OfBody>
<Body>{{ message.body }}</Body>
</Message>
{% endfor %}
</ReceiveMessageResult>
<ResponseMetadata>
<RequestId>
b6633655-283d-45b4-aee4-4e84e0ae6afa
</RequestId>
</ResponseMetadata>
</ReceiveMessageResponse>"""
SEND_MESSAGE_BATCH_RESPONSE = """<SendMessageBatchResponse>
<SendMessageBatchResult>
{% for message in messages %}
<SendMessageBatchResultEntry>
<Id>{{ message.user_id }}</Id>
<MessageId>{{ message.id }}</MessageId>
<MD5OfMessageBody>{{ message.md5 }}</MD5OfMessageBody>
</SendMessageBatchResultEntry>
{% endfor %}
</SendMessageBatchResult>
<ResponseMetadata>
<RequestId>ca1ad5d0-8271-408b-8d0f-1351bf547e74</RequestId>
</ResponseMetadata>
</SendMessageBatchResponse>"""
DELETE_MESSAGE_RESPONSE = """<DeleteMessageResponse>
<ResponseMetadata>
<RequestId>
b5293cb5-d306-4a17-9048-b263635abe42
</RequestId>
</ResponseMetadata>
</DeleteMessageResponse>"""
DELETE_MESSAGE_BATCH_RESPONSE = """<DeleteMessageBatchResponse>
<DeleteMessageBatchResult>
{% for message_id in message_ids %}
<DeleteMessageBatchResultEntry>
<Id>{{ message_id }}</Id>
</DeleteMessageBatchResultEntry>
{% endfor %}
</DeleteMessageBatchResult>
<ResponseMetadata>
<RequestId>d6f86b7a-74d1-4439-b43f-196a1e29cd85</RequestId>
</ResponseMetadata>
</DeleteMessageBatchResponse>"""

8
moto/sqs/utils.py Normal file
View File

@ -0,0 +1,8 @@
import random
import string
def generate_receipt_handle():
# http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ImportantIdentifiers.html#ImportantIdentifiers-receipt-handles
length = 185
return ''.join(random.choice(string.lowercase) for x in range(length))

View File

@ -41,3 +41,74 @@ def test_set_queue_attribute():
queue.set_attribute("VisibilityTimeout", 45)
queue = conn.get_all_queues()[0]
queue.get_timeout().should.equal(45)
@mock_sqs
def test_send_message():
conn = boto.connect_sqs('the_key', 'the_secret')
queue = conn.create_queue("test-queue", visibility_timeout=60)
conn.send_message(queue, 'this is a test message')
conn.send_message(queue, 'this is another test message')
messages = conn.receive_message(queue, number_messages=1)
messages[0].get_body().should.equal('this is a test message')
@mock_sqs
def test_queue_length():
conn = boto.connect_sqs('the_key', 'the_secret')
queue = conn.create_queue("test-queue", visibility_timeout=60)
conn.send_message(queue, 'this is a test message')
conn.send_message(queue, 'this is another test message')
queue.count().should.equal(2)
@mock_sqs
def test_delete_message():
conn = boto.connect_sqs('the_key', 'the_secret')
queue = conn.create_queue("test-queue", visibility_timeout=60)
conn.send_message(queue, 'this is a test message')
conn.send_message(queue, 'this is another test message')
messages = conn.receive_message(queue, number_messages=1)
messages[0].delete()
queue.count().should.equal(1)
@mock_sqs
def test_send_batch_operation():
conn = boto.connect_sqs('the_key', 'the_secret')
queue = conn.create_queue("test-queue", visibility_timeout=60)
conn.send_message_batch(queue, [
("my_first_message", 'test message 1', 0),
("my_second_message", 'test message 2', 0),
("my_third_message", 'test message 3', 0),
])
messages = queue.get_messages(3)
messages[0].get_body().should.equal("test message 1")
# Test that pulling more messages doesn't break anything
messages = queue.get_messages(2)
@mock_sqs
def test_delete_batch_operation():
conn = boto.connect_sqs('the_key', 'the_secret')
queue = conn.create_queue("test-queue", visibility_timeout=60)
conn.send_message_batch(queue, [
("my_first_message", 'test message 1', 0),
("my_second_message", 'test message 2', 0),
("my_third_message", 'test message 3', 0),
])
messages = queue.get_messages(2)
queue.delete_message_batch(messages)
queue.count().should.equal(1)