Enhanced SQS support

- Support for delaying messages
- Support for visibility timeouts
- Support for actually deleting messages
- Support for message bodies longer than 27 characters
- Support for message attributes
- Support for accurate queue attributes

Caveats:

- All message attributes are returned regardless of whether or not
attributes were requested when reading messages
- I’m not sure why messages longer than 27 characters were breaking in
my tests. Boto seems to expect the body to be base64 encoded and bodies
less than 27 characters would be fine, but if I attempted to use a
larger body it would mangle the content. I now base64 encode the body
if the raw string is longer than 27 characters and all is fine.
This commit is contained in:
Clint Ecker 2014-06-20 15:00:36 -05:00
parent aec7d8e998
commit c18e0cc82e
5 changed files with 318 additions and 35 deletions

View File

@ -1,18 +1,43 @@
import base64
import hashlib
import time
import re
from moto.core import BaseBackend
from moto.core.utils import camelcase_to_underscores, get_random_message_id
from .utils import generate_receipt_handle
from .utils import generate_receipt_handle, unix_time_millis
DEFAULT_ACCOUNT_ID = 123456789012
class MessageNotInflight(Exception):
description = "The message referred to is not in flight."
status_code = 400
class ReceiptHandleIsInvalid(Exception):
description = "The receipt handle provided is not valid."
status_code = 400
class Message(object):
def __init__(self, message_id, body):
self.id = message_id
self.body = body
self.receipt_handle = generate_receipt_handle()
self.receipt_handle = None
self.sender_id = DEFAULT_ACCOUNT_ID
self.sent_timestamp = None
self.approximate_first_receive_timestamp = None
self.approximate_receive_count = 0
self.visible_at = 0
self.delayed_until = 0
@property
def body_base64(self):
if len(self.body) >= 27:
return base64.b64encode(self.body)
return self.body
@property
def md5(self):
@ -20,6 +45,56 @@ class Message(object):
body_md5.update(self.body)
return body_md5.hexdigest()
def mark_sent(self, delay_seconds=None):
self.sent_timestamp = unix_time_millis()
if delay_seconds:
self.delay(delay_seconds=delay_seconds)
def mark_recieved(self, visibility_timeout=None):
"""
When a message is received we will set the first recieve timestamp,
tap the ``approximate_receive_count`` and the ``visible_at`` time.
"""
if visibility_timeout:
visibility_timeout = int(visibility_timeout)
else:
visibility_timeout = 0
if not self.approximate_first_receive_timestamp:
self.approximate_first_receive_timestamp = unix_time_millis()
self.approximate_receive_count += 1
# Make message visible again in the future unless its
# destroyed.
if visibility_timeout:
self.change_visibility(visibility_timeout)
self.receipt_handle = generate_receipt_handle()
def change_visibility(self, visibility_timeout):
# We're dealing with milliseconds internally
visibility_timeout_msec = int(visibility_timeout) * 1000
self.visible_at = unix_time_millis() + visibility_timeout_msec
def delay(self, delay_seconds):
delay_msec = int(delay_seconds) * 1000
self.delayed_until = unix_time_millis() + delay_msec
@property
def visible(self):
current_time = unix_time_millis()
if current_time > self.visible_at:
return True
return False
@property
def delayed(self):
current_time = unix_time_millis()
if current_time < self.delayed_until:
return True
return False
class Queue(object):
camelcase_attributes = ['ApproximateNumberOfMessages',
@ -37,12 +112,10 @@ class Queue(object):
def __init__(self, name, visibility_timeout):
self.name = name
self.visibility_timeout = visibility_timeout or 30
self.messages = []
self._messages = []
now = time.time()
self.approximate_number_of_messages_delayed = 0
self.approximate_number_of_messages_not_visible = 0
self.created_timestamp = now
self.delay_seconds = 0
self.last_modified_timestamp = now
@ -60,6 +133,18 @@ class Queue(object):
visibility_timeout=properties.get('VisibilityTimeout'),
)
@property
def approximate_number_of_messages_delayed(self):
return len([m for m in self._messages if m.delayed])
@property
def approximate_number_of_messages_not_visible(self):
return len([m for m in self._messages if not m.visible])
@property
def approximate_number_of_messages(self):
return len(self.messages)
@property
def physical_resource_id(self):
return self.name
@ -72,12 +157,14 @@ class Queue(object):
return result
@property
def approximate_number_of_messages(self):
return len(self.messages)
def messages(self):
return [message for message in self._messages if message.visible and not message.delayed]
def add_message(self, message):
self._messages.append(message)
class SQSBackend(BaseBackend):
def __init__(self):
self.queues = {}
super(SQSBackend, self).__init__()
@ -112,28 +199,66 @@ class SQSBackend(BaseBackend):
return queue
def send_message(self, queue_name, message_body, delay_seconds=None):
# TODO impemented delay_seconds
queue = self.get_queue(queue_name)
if delay_seconds:
delay_seconds = int(delay_seconds)
else:
delay_seconds = queue.delay_seconds
message_id = get_random_message_id()
message = Message(message_id, message_body)
queue.messages.append(message)
message.mark_sent(
delay_seconds=delay_seconds
)
queue.add_message(message)
return message
def receive_messages(self, queue_name, count):
"""
Attempt to retrieve visible messages from a queue.
If a message was read by client and not deleted it is considered to be
"inflight" and cannot be read. We make attempts to obtain ``count``
messages but we may return less if messages are in-flight or there
are simple not enough messages in the queue.
:param string queue_name: The name of the queue to read from.
:param int count: The maximum amount of messages to retrieve.
"""
queue = self.get_queue(queue_name)
result = []
for _ in range(count):
if queue.messages:
result.append(queue.messages.pop(0))
# queue.messages only contains visible messages
for message in queue.messages:
message.mark_recieved(
visibility_timeout=queue.visibility_timeout
)
result.append(message)
if len(result) >= count:
break
return result
def delete_message(self, queue_name, receipt_handle):
queue = self.get_queue(queue_name)
new_messages = [
message for message in queue.messages
if message.receipt_handle != receipt_handle
]
queue.message = new_messages
new_messages = []
for message in queue._messages:
# Only delete message if it is not visible and the reciept_handle
# matches.
if not message.visible and message.receipt_handle == receipt_handle:
continue
new_messages.append(message)
queue._messages = new_messages
def change_message_visibility(self, queue_name, receipt_handle, visibility_timeout):
queue = self.get_queue(queue_name)
for message in queue._messages:
if message.receipt_handle == receipt_handle:
if message.visible:
raise MessageNotInflight
message.change_visibility(visibility_timeout)
return
raise ReceiptHandleIsInvalid
sqs_backend = SQSBackend()

View File

@ -4,6 +4,8 @@ from moto.core.responses import BaseResponse
from moto.core.utils import camelcase_to_underscores
from .models import sqs_backend
MAXIMUM_VISIBILTY_TIMEOUT = 43200
class QueuesResponse(BaseResponse):
@ -34,6 +36,25 @@ class QueuesResponse(BaseResponse):
class QueueResponse(BaseResponse):
def change_message_visibility(self):
queue_name = self.path.split("/")[-1]
receipt_handle = self.querystring.get("ReceiptHandle")[0]
visibility_timeout = int(self.querystring.get("VisibilityTimeout")[0])
if visibility_timeout > MAXIMUM_VISIBILTY_TIMEOUT:
return "Invalid request, maximum visibility timeout is {}".format(
MAXIMUM_VISIBILTY_TIMEOUT
), dict(status=400)
sqs_backend.change_message_visibility(
queue_name=queue_name,
receipt_handle=receipt_handle,
visibility_timeout=visibility_timeout
)
template = Template(CHANGE_MESSAGE_VISIBILITY_RESPONSE)
return template.render()
def get_queue_attributes(self):
queue_name = self.path.split("/")[-1]
queue = sqs_backend.get_queue(queue_name)
@ -57,8 +78,19 @@ class QueueResponse(BaseResponse):
def send_message(self):
message = self.querystring.get("MessageBody")[0]
delay_seconds = self.querystring.get('DelaySeconds')
if delay_seconds:
delay_seconds = int(delay_seconds[0])
else:
delay_seconds = 0
queue_name = self.path.split("/")[-1]
message = sqs_backend.send_message(queue_name, message)
message = sqs_backend.send_message(
queue_name,
message,
delay_seconds=delay_seconds
)
template = Template(SEND_MESSAGE_RESPONSE)
return template.render(message=message)
@ -138,7 +170,8 @@ class QueueResponse(BaseResponse):
message_count = int(self.querystring.get("MaxNumberOfMessages")[0])
messages = sqs_backend.receive_messages(queue_name, message_count)
template = Template(RECEIVE_MESSAGE_RESPONSE)
return template.render(messages=messages)
output = template.render(messages=messages)
return output
CREATE_QUEUE_RESPONSE = """<CreateQueueResponse>
@ -226,18 +259,26 @@ RECEIVE_MESSAGE_RESPONSE = """<ReceiveMessageResponse>
<ReceiveMessageResult>
{% for message in messages %}
<Message>
<MessageId>
{{ message.id }}
</MessageId>
<ReceiptHandle>
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
</ReceiptHandle>
<MD5OfBody>
{{ message.md5 }}
</MD5OfBody>
<Body>{{ message.body }}</Body>
<MessageId>{{ message.id }}</MessageId>
<ReceiptHandle>{{ message.receipt_handle }}</ReceiptHandle>
<MD5OfBody>{{ message.md5 }}</MD5OfBody>
<Body>{{ message.body_base64 }}</Body>
<Attribute>
<Name>SenderId</Name>
<Value>{{ message.sender_id }}</Value>
</Attribute>
<Attribute>
<Name>SentTimestamp</Name>
<Value>{{ message.sent_timestamp }}</Value>
</Attribute>
<Attribute>
<Name>ApproximateReceiveCount</Name>
<Value>{{ message.approximate_receive_count }}</Value>
</Attribute>
<Attribute>
<Name>ApproximateFirstReceiveTimestamp</Name>
<Value>{{ message.approximate_first_receive_timestamp }}</Value>
</Attribute>
</Message>
{% endfor %}
</ReceiveMessageResult>
@ -283,3 +324,11 @@ DELETE_MESSAGE_BATCH_RESPONSE = """<DeleteMessageBatchResponse>
<RequestId>d6f86b7a-74d1-4439-b43f-196a1e29cd85</RequestId>
</ResponseMetadata>
</DeleteMessageBatchResponse>"""
CHANGE_MESSAGE_VISIBILITY_RESPONSE = """<ChangeMessageVisibilityResponse>
<ResponseMetadata>
<RequestId>
6a7a282a-d013-4a59-aba9-335b0fa48bed
</RequestId>
</ResponseMetadata>
</ChangeMessageVisibilityResponse>"""

View File

@ -1,3 +1,4 @@
import datetime
import random
import string
@ -6,3 +7,14 @@ def generate_receipt_handle():
# http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ImportantIdentifiers.html#ImportantIdentifiers-receipt-handles
length = 185
return ''.join(random.choice(string.lowercase) for x in range(length))
def unix_time(dt=None):
dt = dt or datetime.datetime.utcnow()
epoch = datetime.datetime.utcfromtimestamp(0)
delta = dt - epoch
return delta.total_seconds()
def unix_time_millis(dt=None):
return unix_time(dt) * 1000.0

View File

@ -26,5 +26,6 @@ def test_sqs_list_identities():
res = test_client.get(
'/123/testqueue?Action=ReceiveMessage&MaxNumberOfMessages=1')
message = re.search("<Body>(.*?)</Body>", res.data).groups()[0]
message.should.equal('test-message')

View File

@ -3,6 +3,7 @@ from boto.exception import SQSError
from boto.sqs.message import RawMessage
import requests
import sure # noqa
import time
from moto import mock_sqs
@ -79,8 +80,97 @@ def test_send_message():
conn.send_message(queue, 'this is a test message')
conn.send_message(queue, 'this is another test message')
messages = conn.receive_message(queue, number_messages=1)
messages = conn.receive_message(queue, number_messages=2)
messages[0].get_body().should.equal('this is a test message')
messages[1].get_body().should.equal('this is another test message')
@mock_sqs
def test_send_message_with_delay():
conn = boto.connect_sqs('the_key', 'the_secret')
queue = conn.create_queue("test-queue", visibility_timeout=60)
conn.send_message(queue, 'this is a test message', delay_seconds=60)
conn.send_message(queue, 'this is another test message')
queue.count().should.equal(1)
messages = conn.receive_message(queue, number_messages=2)
assert len(messages) == 1
message = messages[0]
assert message.get_body().should.equal('this is another test message')
queue.count().should.equal(0)
@mock_sqs
def test_message_becomes_inflight_when_recieved():
conn = boto.connect_sqs('the_key', 'the_secret')
queue = conn.create_queue("test-queue", visibility_timeout=2)
conn.send_message(queue, 'this is another test message')
queue.count().should.equal(1)
messages = conn.receive_message(queue, number_messages=1)
queue.count().should.equal(0)
assert len(messages) == 1
# Wait
time.sleep(3)
queue.count().should.equal(1)
@mock_sqs
def test_change_message_visibility():
conn = boto.connect_sqs('the_key', 'the_secret')
queue = conn.create_queue("test-queue", visibility_timeout=2)
conn.send_message(queue, 'this is another test message')
queue.count().should.equal(1)
messages = conn.receive_message(queue, number_messages=1)
assert len(messages) == 1
queue.count().should.equal(0)
messages[0].change_visibility(2)
# Wait
time.sleep(1)
# Message is not visible
queue.count().should.equal(0)
time.sleep(2)
# Message now becomes visible
queue.count().should.equal(1)
messages = conn.receive_message(queue, number_messages=1)
messages[0].delete()
queue.count().should.equal(0)
@mock_sqs
def test_message_attributes():
conn = boto.connect_sqs('the_key', 'the_secret')
queue = conn.create_queue("test-queue", visibility_timeout=2)
conn.send_message(queue, 'this is another test message')
queue.count().should.equal(1)
messages = conn.receive_message(queue, number_messages=1)
queue.count().should.equal(0)
assert len(messages) == 1
message_attributes = messages[0].attributes
assert message_attributes.get('ApproximateFirstReceiveTimestamp')
assert int(message_attributes.get('ApproximateReceiveCount')) == 1
assert message_attributes.get('SentTimestamp')
assert message_attributes.get('SenderId')
@mock_sqs
@ -109,12 +199,18 @@ def test_delete_message():
conn.send_message(queue, 'this is a test message')
conn.send_message(queue, 'this is another test message')
queue.count().should.equal(2)
messages = conn.receive_message(queue, number_messages=1)
assert len(messages) == 1
messages[0].delete()
queue.count().should.equal(1)
messages = conn.receive_message(queue, number_messages=1)
assert len(messages) == 1
messages[0].delete()
queue.count().should.equal(0)
@mock_sqs
def test_send_batch_operation():