diff --git a/moto/sqs/models.py b/moto/sqs/models.py index d3e1ded3b..447c0f01d 100644 --- a/moto/sqs/models.py +++ b/moto/sqs/models.py @@ -1,18 +1,43 @@ +import base64 import hashlib import time import re from moto.core import BaseBackend from moto.core.utils import camelcase_to_underscores, get_random_message_id -from .utils import generate_receipt_handle +from .utils import generate_receipt_handle, unix_time_millis + + +DEFAULT_ACCOUNT_ID = 123456789012 + + +class MessageNotInflight(Exception): + description = "The message referred to is not in flight." + status_code = 400 + + +class ReceiptHandleIsInvalid(Exception): + description = "The receipt handle provided is not valid." + status_code = 400 class Message(object): - def __init__(self, message_id, body): self.id = message_id self.body = body - self.receipt_handle = generate_receipt_handle() + self.receipt_handle = None + self.sender_id = DEFAULT_ACCOUNT_ID + self.sent_timestamp = None + self.approximate_first_receive_timestamp = None + self.approximate_receive_count = 0 + self.visible_at = 0 + self.delayed_until = 0 + + @property + def body_base64(self): + if len(self.body) >= 27: + return base64.b64encode(self.body) + return self.body @property def md5(self): @@ -20,6 +45,56 @@ class Message(object): body_md5.update(self.body) return body_md5.hexdigest() + def mark_sent(self, delay_seconds=None): + self.sent_timestamp = unix_time_millis() + if delay_seconds: + self.delay(delay_seconds=delay_seconds) + + def mark_recieved(self, visibility_timeout=None): + """ + When a message is received we will set the first recieve timestamp, + tap the ``approximate_receive_count`` and the ``visible_at`` time. + """ + if visibility_timeout: + visibility_timeout = int(visibility_timeout) + else: + visibility_timeout = 0 + + if not self.approximate_first_receive_timestamp: + self.approximate_first_receive_timestamp = unix_time_millis() + + self.approximate_receive_count += 1 + + # Make message visible again in the future unless its + # destroyed. + if visibility_timeout: + self.change_visibility(visibility_timeout) + + self.receipt_handle = generate_receipt_handle() + + def change_visibility(self, visibility_timeout): + # We're dealing with milliseconds internally + visibility_timeout_msec = int(visibility_timeout) * 1000 + self.visible_at = unix_time_millis() + visibility_timeout_msec + + def delay(self, delay_seconds): + delay_msec = int(delay_seconds) * 1000 + self.delayed_until = unix_time_millis() + delay_msec + + @property + def visible(self): + current_time = unix_time_millis() + if current_time > self.visible_at: + return True + return False + + @property + def delayed(self): + current_time = unix_time_millis() + if current_time < self.delayed_until: + return True + return False + class Queue(object): camelcase_attributes = ['ApproximateNumberOfMessages', @@ -37,12 +112,10 @@ class Queue(object): def __init__(self, name, visibility_timeout): self.name = name self.visibility_timeout = visibility_timeout or 30 - self.messages = [] + self._messages = [] now = time.time() - self.approximate_number_of_messages_delayed = 0 - self.approximate_number_of_messages_not_visible = 0 self.created_timestamp = now self.delay_seconds = 0 self.last_modified_timestamp = now @@ -60,6 +133,18 @@ class Queue(object): visibility_timeout=properties.get('VisibilityTimeout'), ) + @property + def approximate_number_of_messages_delayed(self): + return len([m for m in self._messages if m.delayed]) + + @property + def approximate_number_of_messages_not_visible(self): + return len([m for m in self._messages if not m.visible]) + + @property + def approximate_number_of_messages(self): + return len(self.messages) + @property def physical_resource_id(self): return self.name @@ -72,12 +157,14 @@ class Queue(object): return result @property - def approximate_number_of_messages(self): - return len(self.messages) + def messages(self): + return [message for message in self._messages if message.visible and not message.delayed] + + def add_message(self, message): + self._messages.append(message) class SQSBackend(BaseBackend): - def __init__(self): self.queues = {} super(SQSBackend, self).__init__() @@ -112,28 +199,66 @@ class SQSBackend(BaseBackend): return queue def send_message(self, queue_name, message_body, delay_seconds=None): - # TODO impemented delay_seconds queue = self.get_queue(queue_name) + + if delay_seconds: + delay_seconds = int(delay_seconds) + else: + delay_seconds = queue.delay_seconds + message_id = get_random_message_id() message = Message(message_id, message_body) - queue.messages.append(message) + + message.mark_sent( + delay_seconds=delay_seconds + ) + + queue.add_message(message) return message def receive_messages(self, queue_name, count): + """ + Attempt to retrieve visible messages from a queue. + + If a message was read by client and not deleted it is considered to be + "inflight" and cannot be read. We make attempts to obtain ``count`` + messages but we may return less if messages are in-flight or there + are simple not enough messages in the queue. + + :param string queue_name: The name of the queue to read from. + :param int count: The maximum amount of messages to retrieve. + """ queue = self.get_queue(queue_name) result = [] - for _ in range(count): - if queue.messages: - result.append(queue.messages.pop(0)) + # queue.messages only contains visible messages + for message in queue.messages: + message.mark_recieved( + visibility_timeout=queue.visibility_timeout + ) + result.append(message) + if len(result) >= count: + break return result def delete_message(self, queue_name, receipt_handle): queue = self.get_queue(queue_name) - new_messages = [ - message for message in queue.messages - if message.receipt_handle != receipt_handle - ] - queue.message = new_messages + new_messages = [] + for message in queue._messages: + # Only delete message if it is not visible and the reciept_handle + # matches. + if not message.visible and message.receipt_handle == receipt_handle: + continue + new_messages.append(message) + queue._messages = new_messages + def change_message_visibility(self, queue_name, receipt_handle, visibility_timeout): + queue = self.get_queue(queue_name) + for message in queue._messages: + if message.receipt_handle == receipt_handle: + if message.visible: + raise MessageNotInflight + message.change_visibility(visibility_timeout) + return + raise ReceiptHandleIsInvalid sqs_backend = SQSBackend() diff --git a/moto/sqs/responses.py b/moto/sqs/responses.py index fc69e7f05..53acc3264 100644 --- a/moto/sqs/responses.py +++ b/moto/sqs/responses.py @@ -4,6 +4,8 @@ from moto.core.responses import BaseResponse from moto.core.utils import camelcase_to_underscores from .models import sqs_backend +MAXIMUM_VISIBILTY_TIMEOUT = 43200 + class QueuesResponse(BaseResponse): @@ -34,6 +36,25 @@ class QueuesResponse(BaseResponse): class QueueResponse(BaseResponse): + def change_message_visibility(self): + queue_name = self.path.split("/")[-1] + receipt_handle = self.querystring.get("ReceiptHandle")[0] + visibility_timeout = int(self.querystring.get("VisibilityTimeout")[0]) + + if visibility_timeout > MAXIMUM_VISIBILTY_TIMEOUT: + return "Invalid request, maximum visibility timeout is {}".format( + MAXIMUM_VISIBILTY_TIMEOUT + ), dict(status=400) + + sqs_backend.change_message_visibility( + queue_name=queue_name, + receipt_handle=receipt_handle, + visibility_timeout=visibility_timeout + ) + + template = Template(CHANGE_MESSAGE_VISIBILITY_RESPONSE) + return template.render() + def get_queue_attributes(self): queue_name = self.path.split("/")[-1] queue = sqs_backend.get_queue(queue_name) @@ -57,8 +78,19 @@ class QueueResponse(BaseResponse): def send_message(self): message = self.querystring.get("MessageBody")[0] + delay_seconds = self.querystring.get('DelaySeconds') + + if delay_seconds: + delay_seconds = int(delay_seconds[0]) + else: + delay_seconds = 0 + queue_name = self.path.split("/")[-1] - message = sqs_backend.send_message(queue_name, message) + message = sqs_backend.send_message( + queue_name, + message, + delay_seconds=delay_seconds + ) template = Template(SEND_MESSAGE_RESPONSE) return template.render(message=message) @@ -138,7 +170,8 @@ class QueueResponse(BaseResponse): message_count = int(self.querystring.get("MaxNumberOfMessages")[0]) messages = sqs_backend.receive_messages(queue_name, message_count) template = Template(RECEIVE_MESSAGE_RESPONSE) - return template.render(messages=messages) + output = template.render(messages=messages) + return output CREATE_QUEUE_RESPONSE = """ @@ -226,18 +259,26 @@ RECEIVE_MESSAGE_RESPONSE = """ {% for message in messages %} - - {{ message.id }} - - - MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw - Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE - auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0= - - - {{ message.md5 }} - - {{ message.body }} + {{ message.id }} + {{ message.receipt_handle }} + {{ message.md5 }} + {{ message.body_base64 }} + + SenderId + {{ message.sender_id }} + + + SentTimestamp + {{ message.sent_timestamp }} + + + ApproximateReceiveCount + {{ message.approximate_receive_count }} + + + ApproximateFirstReceiveTimestamp + {{ message.approximate_first_receive_timestamp }} + {% endfor %} @@ -283,3 +324,11 @@ DELETE_MESSAGE_BATCH_RESPONSE = """ d6f86b7a-74d1-4439-b43f-196a1e29cd85 """ + +CHANGE_MESSAGE_VISIBILITY_RESPONSE = """ + + + 6a7a282a-d013-4a59-aba9-335b0fa48bed + + +""" diff --git a/moto/sqs/utils.py b/moto/sqs/utils.py index 0d92e10a7..e98f3b0c7 100644 --- a/moto/sqs/utils.py +++ b/moto/sqs/utils.py @@ -1,3 +1,4 @@ +import datetime import random import string @@ -6,3 +7,14 @@ def generate_receipt_handle(): # http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ImportantIdentifiers.html#ImportantIdentifiers-receipt-handles length = 185 return ''.join(random.choice(string.lowercase) for x in range(length)) + + +def unix_time(dt=None): + dt = dt or datetime.datetime.utcnow() + epoch = datetime.datetime.utcfromtimestamp(0) + delta = dt - epoch + return delta.total_seconds() + + +def unix_time_millis(dt=None): + return unix_time(dt) * 1000.0 diff --git a/tests/test_sqs/test_server.py b/tests/test_sqs/test_server.py index 59b81bea7..2a098150a 100644 --- a/tests/test_sqs/test_server.py +++ b/tests/test_sqs/test_server.py @@ -26,5 +26,6 @@ def test_sqs_list_identities(): res = test_client.get( '/123/testqueue?Action=ReceiveMessage&MaxNumberOfMessages=1') + message = re.search("(.*?)", res.data).groups()[0] message.should.equal('test-message') diff --git a/tests/test_sqs/test_sqs.py b/tests/test_sqs/test_sqs.py index 7c84a04db..9b8380885 100644 --- a/tests/test_sqs/test_sqs.py +++ b/tests/test_sqs/test_sqs.py @@ -3,6 +3,7 @@ from boto.exception import SQSError from boto.sqs.message import RawMessage import requests import sure # noqa +import time from moto import mock_sqs @@ -79,8 +80,97 @@ def test_send_message(): conn.send_message(queue, 'this is a test message') conn.send_message(queue, 'this is another test message') - messages = conn.receive_message(queue, number_messages=1) + messages = conn.receive_message(queue, number_messages=2) messages[0].get_body().should.equal('this is a test message') + messages[1].get_body().should.equal('this is another test message') + + +@mock_sqs +def test_send_message_with_delay(): + conn = boto.connect_sqs('the_key', 'the_secret') + queue = conn.create_queue("test-queue", visibility_timeout=60) + + conn.send_message(queue, 'this is a test message', delay_seconds=60) + conn.send_message(queue, 'this is another test message') + + queue.count().should.equal(1) + + messages = conn.receive_message(queue, number_messages=2) + assert len(messages) == 1 + message = messages[0] + assert message.get_body().should.equal('this is another test message') + queue.count().should.equal(0) + + +@mock_sqs +def test_message_becomes_inflight_when_recieved(): + conn = boto.connect_sqs('the_key', 'the_secret') + queue = conn.create_queue("test-queue", visibility_timeout=2) + + conn.send_message(queue, 'this is another test message') + queue.count().should.equal(1) + + messages = conn.receive_message(queue, number_messages=1) + queue.count().should.equal(0) + + assert len(messages) == 1 + + # Wait + time.sleep(3) + + queue.count().should.equal(1) + + +@mock_sqs +def test_change_message_visibility(): + conn = boto.connect_sqs('the_key', 'the_secret') + queue = conn.create_queue("test-queue", visibility_timeout=2) + + conn.send_message(queue, 'this is another test message') + queue.count().should.equal(1) + messages = conn.receive_message(queue, number_messages=1) + + assert len(messages) == 1 + + queue.count().should.equal(0) + + messages[0].change_visibility(2) + + # Wait + time.sleep(1) + + # Message is not visible + queue.count().should.equal(0) + + time.sleep(2) + + # Message now becomes visible + queue.count().should.equal(1) + + messages = conn.receive_message(queue, number_messages=1) + messages[0].delete() + queue.count().should.equal(0) + + +@mock_sqs +def test_message_attributes(): + conn = boto.connect_sqs('the_key', 'the_secret') + queue = conn.create_queue("test-queue", visibility_timeout=2) + + conn.send_message(queue, 'this is another test message') + queue.count().should.equal(1) + + messages = conn.receive_message(queue, number_messages=1) + queue.count().should.equal(0) + + assert len(messages) == 1 + + message_attributes = messages[0].attributes + + assert message_attributes.get('ApproximateFirstReceiveTimestamp') + assert int(message_attributes.get('ApproximateReceiveCount')) == 1 + assert message_attributes.get('SentTimestamp') + assert message_attributes.get('SenderId') @mock_sqs @@ -109,12 +199,18 @@ def test_delete_message(): conn.send_message(queue, 'this is a test message') conn.send_message(queue, 'this is another test message') + queue.count().should.equal(2) messages = conn.receive_message(queue, number_messages=1) + assert len(messages) == 1 messages[0].delete() - queue.count().should.equal(1) + messages = conn.receive_message(queue, number_messages=1) + assert len(messages) == 1 + messages[0].delete() + queue.count().should.equal(0) + @mock_sqs def test_send_batch_operation():