From c18e0cc82eb1135e8f30247ae1df2a4cc3be1c2b Mon Sep 17 00:00:00 2001 From: Clint Ecker Date: Fri, 20 Jun 2014 15:00:36 -0500 Subject: [PATCH] Enhanced SQS support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Support for delaying messages - Support for visibility timeouts - Support for actually deleting messages - Support for message bodies longer than 27 characters - Support for message attributes - Support for accurate queue attributes Caveats: - All message attributes are returned regardless of whether or not attributes were requested when reading messages - I’m not sure why messages longer than 27 characters were breaking in my tests. Boto seems to expect the body to be base64 encoded and bodies less than 27 characters would be fine, but if I attempted to use a larger body it would mangle the content. I now base64 encode the body if the raw string is longer than 27 characters and all is fine. --- moto/sqs/models.py | 163 ++++++++++++++++++++++++++++++---- moto/sqs/responses.py | 77 +++++++++++++--- moto/sqs/utils.py | 12 +++ tests/test_sqs/test_server.py | 1 + tests/test_sqs/test_sqs.py | 100 ++++++++++++++++++++- 5 files changed, 318 insertions(+), 35 deletions(-) diff --git a/moto/sqs/models.py b/moto/sqs/models.py index d3e1ded3b..447c0f01d 100644 --- a/moto/sqs/models.py +++ b/moto/sqs/models.py @@ -1,18 +1,43 @@ +import base64 import hashlib import time import re from moto.core import BaseBackend from moto.core.utils import camelcase_to_underscores, get_random_message_id -from .utils import generate_receipt_handle +from .utils import generate_receipt_handle, unix_time_millis + + +DEFAULT_ACCOUNT_ID = 123456789012 + + +class MessageNotInflight(Exception): + description = "The message referred to is not in flight." + status_code = 400 + + +class ReceiptHandleIsInvalid(Exception): + description = "The receipt handle provided is not valid." + status_code = 400 class Message(object): - def __init__(self, message_id, body): self.id = message_id self.body = body - self.receipt_handle = generate_receipt_handle() + self.receipt_handle = None + self.sender_id = DEFAULT_ACCOUNT_ID + self.sent_timestamp = None + self.approximate_first_receive_timestamp = None + self.approximate_receive_count = 0 + self.visible_at = 0 + self.delayed_until = 0 + + @property + def body_base64(self): + if len(self.body) >= 27: + return base64.b64encode(self.body) + return self.body @property def md5(self): @@ -20,6 +45,56 @@ class Message(object): body_md5.update(self.body) return body_md5.hexdigest() + def mark_sent(self, delay_seconds=None): + self.sent_timestamp = unix_time_millis() + if delay_seconds: + self.delay(delay_seconds=delay_seconds) + + def mark_recieved(self, visibility_timeout=None): + """ + When a message is received we will set the first recieve timestamp, + tap the ``approximate_receive_count`` and the ``visible_at`` time. + """ + if visibility_timeout: + visibility_timeout = int(visibility_timeout) + else: + visibility_timeout = 0 + + if not self.approximate_first_receive_timestamp: + self.approximate_first_receive_timestamp = unix_time_millis() + + self.approximate_receive_count += 1 + + # Make message visible again in the future unless its + # destroyed. + if visibility_timeout: + self.change_visibility(visibility_timeout) + + self.receipt_handle = generate_receipt_handle() + + def change_visibility(self, visibility_timeout): + # We're dealing with milliseconds internally + visibility_timeout_msec = int(visibility_timeout) * 1000 + self.visible_at = unix_time_millis() + visibility_timeout_msec + + def delay(self, delay_seconds): + delay_msec = int(delay_seconds) * 1000 + self.delayed_until = unix_time_millis() + delay_msec + + @property + def visible(self): + current_time = unix_time_millis() + if current_time > self.visible_at: + return True + return False + + @property + def delayed(self): + current_time = unix_time_millis() + if current_time < self.delayed_until: + return True + return False + class Queue(object): camelcase_attributes = ['ApproximateNumberOfMessages', @@ -37,12 +112,10 @@ class Queue(object): def __init__(self, name, visibility_timeout): self.name = name self.visibility_timeout = visibility_timeout or 30 - self.messages = [] + self._messages = [] now = time.time() - self.approximate_number_of_messages_delayed = 0 - self.approximate_number_of_messages_not_visible = 0 self.created_timestamp = now self.delay_seconds = 0 self.last_modified_timestamp = now @@ -60,6 +133,18 @@ class Queue(object): visibility_timeout=properties.get('VisibilityTimeout'), ) + @property + def approximate_number_of_messages_delayed(self): + return len([m for m in self._messages if m.delayed]) + + @property + def approximate_number_of_messages_not_visible(self): + return len([m for m in self._messages if not m.visible]) + + @property + def approximate_number_of_messages(self): + return len(self.messages) + @property def physical_resource_id(self): return self.name @@ -72,12 +157,14 @@ class Queue(object): return result @property - def approximate_number_of_messages(self): - return len(self.messages) + def messages(self): + return [message for message in self._messages if message.visible and not message.delayed] + + def add_message(self, message): + self._messages.append(message) class SQSBackend(BaseBackend): - def __init__(self): self.queues = {} super(SQSBackend, self).__init__() @@ -112,28 +199,66 @@ class SQSBackend(BaseBackend): return queue def send_message(self, queue_name, message_body, delay_seconds=None): - # TODO impemented delay_seconds queue = self.get_queue(queue_name) + + if delay_seconds: + delay_seconds = int(delay_seconds) + else: + delay_seconds = queue.delay_seconds + message_id = get_random_message_id() message = Message(message_id, message_body) - queue.messages.append(message) + + message.mark_sent( + delay_seconds=delay_seconds + ) + + queue.add_message(message) return message def receive_messages(self, queue_name, count): + """ + Attempt to retrieve visible messages from a queue. + + If a message was read by client and not deleted it is considered to be + "inflight" and cannot be read. We make attempts to obtain ``count`` + messages but we may return less if messages are in-flight or there + are simple not enough messages in the queue. + + :param string queue_name: The name of the queue to read from. + :param int count: The maximum amount of messages to retrieve. + """ queue = self.get_queue(queue_name) result = [] - for _ in range(count): - if queue.messages: - result.append(queue.messages.pop(0)) + # queue.messages only contains visible messages + for message in queue.messages: + message.mark_recieved( + visibility_timeout=queue.visibility_timeout + ) + result.append(message) + if len(result) >= count: + break return result def delete_message(self, queue_name, receipt_handle): queue = self.get_queue(queue_name) - new_messages = [ - message for message in queue.messages - if message.receipt_handle != receipt_handle - ] - queue.message = new_messages + new_messages = [] + for message in queue._messages: + # Only delete message if it is not visible and the reciept_handle + # matches. + if not message.visible and message.receipt_handle == receipt_handle: + continue + new_messages.append(message) + queue._messages = new_messages + def change_message_visibility(self, queue_name, receipt_handle, visibility_timeout): + queue = self.get_queue(queue_name) + for message in queue._messages: + if message.receipt_handle == receipt_handle: + if message.visible: + raise MessageNotInflight + message.change_visibility(visibility_timeout) + return + raise ReceiptHandleIsInvalid sqs_backend = SQSBackend() diff --git a/moto/sqs/responses.py b/moto/sqs/responses.py index fc69e7f05..53acc3264 100644 --- a/moto/sqs/responses.py +++ b/moto/sqs/responses.py @@ -4,6 +4,8 @@ from moto.core.responses import BaseResponse from moto.core.utils import camelcase_to_underscores from .models import sqs_backend +MAXIMUM_VISIBILTY_TIMEOUT = 43200 + class QueuesResponse(BaseResponse): @@ -34,6 +36,25 @@ class QueuesResponse(BaseResponse): class QueueResponse(BaseResponse): + def change_message_visibility(self): + queue_name = self.path.split("/")[-1] + receipt_handle = self.querystring.get("ReceiptHandle")[0] + visibility_timeout = int(self.querystring.get("VisibilityTimeout")[0]) + + if visibility_timeout > MAXIMUM_VISIBILTY_TIMEOUT: + return "Invalid request, maximum visibility timeout is {}".format( + MAXIMUM_VISIBILTY_TIMEOUT + ), dict(status=400) + + sqs_backend.change_message_visibility( + queue_name=queue_name, + receipt_handle=receipt_handle, + visibility_timeout=visibility_timeout + ) + + template = Template(CHANGE_MESSAGE_VISIBILITY_RESPONSE) + return template.render() + def get_queue_attributes(self): queue_name = self.path.split("/")[-1] queue = sqs_backend.get_queue(queue_name) @@ -57,8 +78,19 @@ class QueueResponse(BaseResponse): def send_message(self): message = self.querystring.get("MessageBody")[0] + delay_seconds = self.querystring.get('DelaySeconds') + + if delay_seconds: + delay_seconds = int(delay_seconds[0]) + else: + delay_seconds = 0 + queue_name = self.path.split("/")[-1] - message = sqs_backend.send_message(queue_name, message) + message = sqs_backend.send_message( + queue_name, + message, + delay_seconds=delay_seconds + ) template = Template(SEND_MESSAGE_RESPONSE) return template.render(message=message) @@ -138,7 +170,8 @@ class QueueResponse(BaseResponse): message_count = int(self.querystring.get("MaxNumberOfMessages")[0]) messages = sqs_backend.receive_messages(queue_name, message_count) template = Template(RECEIVE_MESSAGE_RESPONSE) - return template.render(messages=messages) + output = template.render(messages=messages) + return output CREATE_QUEUE_RESPONSE = """ @@ -226,18 +259,26 @@ RECEIVE_MESSAGE_RESPONSE = """ {% for message in messages %} - - {{ message.id }} - - - MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw - Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE - auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0= - - - {{ message.md5 }} - - {{ message.body }} + {{ message.id }} + {{ message.receipt_handle }} + {{ message.md5 }} + {{ message.body_base64 }} + + SenderId + {{ message.sender_id }} + + + SentTimestamp + {{ message.sent_timestamp }} + + + ApproximateReceiveCount + {{ message.approximate_receive_count }} + + + ApproximateFirstReceiveTimestamp + {{ message.approximate_first_receive_timestamp }} + {% endfor %} @@ -283,3 +324,11 @@ DELETE_MESSAGE_BATCH_RESPONSE = """ d6f86b7a-74d1-4439-b43f-196a1e29cd85 """ + +CHANGE_MESSAGE_VISIBILITY_RESPONSE = """ + + + 6a7a282a-d013-4a59-aba9-335b0fa48bed + + +""" diff --git a/moto/sqs/utils.py b/moto/sqs/utils.py index 0d92e10a7..e98f3b0c7 100644 --- a/moto/sqs/utils.py +++ b/moto/sqs/utils.py @@ -1,3 +1,4 @@ +import datetime import random import string @@ -6,3 +7,14 @@ def generate_receipt_handle(): # http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ImportantIdentifiers.html#ImportantIdentifiers-receipt-handles length = 185 return ''.join(random.choice(string.lowercase) for x in range(length)) + + +def unix_time(dt=None): + dt = dt or datetime.datetime.utcnow() + epoch = datetime.datetime.utcfromtimestamp(0) + delta = dt - epoch + return delta.total_seconds() + + +def unix_time_millis(dt=None): + return unix_time(dt) * 1000.0 diff --git a/tests/test_sqs/test_server.py b/tests/test_sqs/test_server.py index 59b81bea7..2a098150a 100644 --- a/tests/test_sqs/test_server.py +++ b/tests/test_sqs/test_server.py @@ -26,5 +26,6 @@ def test_sqs_list_identities(): res = test_client.get( '/123/testqueue?Action=ReceiveMessage&MaxNumberOfMessages=1') + message = re.search("(.*?)", res.data).groups()[0] message.should.equal('test-message') diff --git a/tests/test_sqs/test_sqs.py b/tests/test_sqs/test_sqs.py index 7c84a04db..9b8380885 100644 --- a/tests/test_sqs/test_sqs.py +++ b/tests/test_sqs/test_sqs.py @@ -3,6 +3,7 @@ from boto.exception import SQSError from boto.sqs.message import RawMessage import requests import sure # noqa +import time from moto import mock_sqs @@ -79,8 +80,97 @@ def test_send_message(): conn.send_message(queue, 'this is a test message') conn.send_message(queue, 'this is another test message') - messages = conn.receive_message(queue, number_messages=1) + messages = conn.receive_message(queue, number_messages=2) messages[0].get_body().should.equal('this is a test message') + messages[1].get_body().should.equal('this is another test message') + + +@mock_sqs +def test_send_message_with_delay(): + conn = boto.connect_sqs('the_key', 'the_secret') + queue = conn.create_queue("test-queue", visibility_timeout=60) + + conn.send_message(queue, 'this is a test message', delay_seconds=60) + conn.send_message(queue, 'this is another test message') + + queue.count().should.equal(1) + + messages = conn.receive_message(queue, number_messages=2) + assert len(messages) == 1 + message = messages[0] + assert message.get_body().should.equal('this is another test message') + queue.count().should.equal(0) + + +@mock_sqs +def test_message_becomes_inflight_when_recieved(): + conn = boto.connect_sqs('the_key', 'the_secret') + queue = conn.create_queue("test-queue", visibility_timeout=2) + + conn.send_message(queue, 'this is another test message') + queue.count().should.equal(1) + + messages = conn.receive_message(queue, number_messages=1) + queue.count().should.equal(0) + + assert len(messages) == 1 + + # Wait + time.sleep(3) + + queue.count().should.equal(1) + + +@mock_sqs +def test_change_message_visibility(): + conn = boto.connect_sqs('the_key', 'the_secret') + queue = conn.create_queue("test-queue", visibility_timeout=2) + + conn.send_message(queue, 'this is another test message') + queue.count().should.equal(1) + messages = conn.receive_message(queue, number_messages=1) + + assert len(messages) == 1 + + queue.count().should.equal(0) + + messages[0].change_visibility(2) + + # Wait + time.sleep(1) + + # Message is not visible + queue.count().should.equal(0) + + time.sleep(2) + + # Message now becomes visible + queue.count().should.equal(1) + + messages = conn.receive_message(queue, number_messages=1) + messages[0].delete() + queue.count().should.equal(0) + + +@mock_sqs +def test_message_attributes(): + conn = boto.connect_sqs('the_key', 'the_secret') + queue = conn.create_queue("test-queue", visibility_timeout=2) + + conn.send_message(queue, 'this is another test message') + queue.count().should.equal(1) + + messages = conn.receive_message(queue, number_messages=1) + queue.count().should.equal(0) + + assert len(messages) == 1 + + message_attributes = messages[0].attributes + + assert message_attributes.get('ApproximateFirstReceiveTimestamp') + assert int(message_attributes.get('ApproximateReceiveCount')) == 1 + assert message_attributes.get('SentTimestamp') + assert message_attributes.get('SenderId') @mock_sqs @@ -109,12 +199,18 @@ def test_delete_message(): conn.send_message(queue, 'this is a test message') conn.send_message(queue, 'this is another test message') + queue.count().should.equal(2) messages = conn.receive_message(queue, number_messages=1) + assert len(messages) == 1 messages[0].delete() - queue.count().should.equal(1) + messages = conn.receive_message(queue, number_messages=1) + assert len(messages) == 1 + messages[0].delete() + queue.count().should.equal(0) + @mock_sqs def test_send_batch_operation():