From 76aa9a8b221e7754e0d4fea86cefdb80e94581aa Mon Sep 17 00:00:00 2001 From: Ralfas Date: Sun, 28 Sep 2014 20:59:14 +0100 Subject: [PATCH] Added basic support for SQS MessageAttributes. --- moto/sqs/exceptions.py | 7 ++++++ moto/sqs/models.py | 9 +++++++- moto/sqs/responses.py | 38 +++++++++++++++++++++++++++++++- moto/sqs/utils.py | 38 ++++++++++++++++++++++++++++++++ tests/test_sqs/test_sqs.py | 45 +++++++++++++++++++++++++++++++++++++- 5 files changed, 134 insertions(+), 3 deletions(-) diff --git a/moto/sqs/exceptions.py b/moto/sqs/exceptions.py index a6367827c..7b2c121dc 100644 --- a/moto/sqs/exceptions.py +++ b/moto/sqs/exceptions.py @@ -7,3 +7,10 @@ class MessageNotInflight(Exception): class ReceiptHandleIsInvalid(Exception): description = "The receipt handle provided is not valid." status_code = 400 + + +class MessageAttributesInvalid(Exception): + status_code = 400 + + def __init__(self, description): + self.description = description diff --git a/moto/sqs/models.py b/moto/sqs/models.py index 2fa1a13f8..8bb92b710 100644 --- a/moto/sqs/models.py +++ b/moto/sqs/models.py @@ -19,6 +19,7 @@ class Message(object): def __init__(self, message_id, body): self.id = message_id self.body = body + self.message_attributes = {} self.receipt_handle = None self.sender_id = DEFAULT_ACCOUNT_ID self.sent_timestamp = None @@ -188,7 +189,8 @@ class SQSBackend(BaseBackend): setattr(queue, key, value) return queue - def send_message(self, queue_name, message_body, delay_seconds=None): + def send_message(self, queue_name, message_body, message_attributes=None, delay_seconds=None): + queue = self.get_queue(queue_name) if delay_seconds: @@ -199,11 +201,15 @@ class SQSBackend(BaseBackend): message_id = get_random_message_id() message = Message(message_id, message_body) + if message_attributes: + message.message_attributes = message_attributes + message.mark_sent( delay_seconds=delay_seconds ) queue.add_message(message) + return message def receive_messages(self, queue_name, count): @@ -228,6 +234,7 @@ class SQSBackend(BaseBackend): result.append(message) if len(result) >= count: break + return result def delete_message(self, queue_name, receipt_handle): diff --git a/moto/sqs/responses.py b/moto/sqs/responses.py index 1219e6789..71d68c72f 100644 --- a/moto/sqs/responses.py +++ b/moto/sqs/responses.py @@ -3,8 +3,10 @@ from jinja2 import Template from moto.core.responses import BaseResponse from moto.core.utils import camelcase_to_underscores +from .utils import parse_message_attributes from .models import sqs_backend from .exceptions import ( + MessageAttributesInvalid, MessageNotInflight, ReceiptHandleIsInvalid ) @@ -93,14 +95,20 @@ class QueueResponse(BaseResponse): else: delay_seconds = 0 + try: + message_attributes = parse_message_attributes(self.querystring) + except MessageAttributesInvalid as e: + return e.description, dict(status=e.status_code) + queue_name = self.path.split("/")[-1] message = sqs_backend.send_message( queue_name, message, + message_attributes=message_attributes, delay_seconds=delay_seconds ) template = Template(SEND_MESSAGE_RESPONSE) - return template.render(message=message) + return template.render(message=message, message_attributes=message_attributes) def send_message_batch(self): """ @@ -131,6 +139,12 @@ class QueueResponse(BaseResponse): delay_seconds = self.querystring.get(delay_key, [None])[0] message = sqs_backend.send_message(queue_name, message_body[0], delay_seconds=delay_seconds) message.user_id = message_user_id + + message_attributes = parse_message_attributes(self.querystring, base='SendMessageBatchRequestEntry.{0}.'.format(index), value_namespace='') + if type(message_attributes) == tuple: + return message_attributes[0], message_attributes[1] + message.message_attributes = message_attributes + messages.append(message) template = Template(SEND_MESSAGE_BATCH_RESPONSE) @@ -252,6 +266,9 @@ SEND_MESSAGE_RESPONSE = """ {{ message.md5 }} + {% if message.message_attributes.items()|count > 0 %} + 324758f82d026ac6ec5b31a3b192d1e3 + {% endif %} {{ message.id }} @@ -287,6 +304,22 @@ RECEIVE_MESSAGE_RESPONSE = """ ApproximateFirstReceiveTimestamp {{ message.approximate_first_receive_timestamp }} + {% if message.message_attributes.items()|count > 0 %} + 324758f82d026ac6ec5b31a3b192d1e3 + {% endif %} + {% for name, value in message.message_attributes.items() %} + + {{ name }} + + {{ value.data_type }} + {% if 'Binary' in value.data_type %} + {{ value.binary_value }} + {% else %} + {{ value.string_value }} + {% endif %} + + + {% endfor %} {% endfor %} @@ -304,6 +337,9 @@ SEND_MESSAGE_BATCH_RESPONSE = """ {{ message.user_id }} {{ message.id }} {{ message.md5 }} + {% if message.message_attributes.items()|count > 0 %} + 324758f82d026ac6ec5b31a3b192d1e3 + {% endif %} {% endfor %} diff --git a/moto/sqs/utils.py b/moto/sqs/utils.py index fafc1824d..294611a05 100644 --- a/moto/sqs/utils.py +++ b/moto/sqs/utils.py @@ -3,6 +3,8 @@ import datetime import random import string +from .exceptions import MessageAttributesInvalid + def generate_receipt_handle(): # http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ImportantIdentifiers.html#ImportantIdentifiers-receipt-handles @@ -19,3 +21,39 @@ def unix_time(dt=None): def unix_time_millis(dt=None): return unix_time(dt) * 1000.0 + + +def parse_message_attributes(querystring, base='', value_namespace='Value.'): + message_attributes = {} + index = 1 + while True: + # Loop through looking for message attributes + name_key = base + 'MessageAttribute.{0}.Name'.format(index) + name = querystring.get(name_key) + if not name: + # Found all attributes + break + + data_type_key = base + 'MessageAttribute.{0}.{1}DataType'.format(index, value_namespace) + data_type = querystring.get(data_type_key) + if not data_type: + raise MessageAttributesInvalid("The message attribute '{0}' must contain non-empty message attribute value.".format(name[0])) + + data_type_parts = data_type[0].split('.') + if len(data_type_parts) > 2 or data_type_parts[0] not in ['String', 'Binary', 'Number']: + raise MessageAttributesInvalid("The message attribute '{0}' has an invalid message attribute type, the set of supported type prefixes is Binary, Number, and String.".format(name[0])) + + type_prefix = 'String' + if data_type_parts[0] == 'Binary': + type_prefix = 'Binary' + + value_key = base + 'MessageAttribute.{0}.{1}{2}Value'.format(index, value_namespace, type_prefix) + value = querystring.get(value_key) + if not value: + raise MessageAttributesInvalid("The message attribute '{0}' must contain non-empty message attribute value for message attribute type '{1}'.".format(name[0], data_type[0])) + + message_attributes[name[0]] = {'data_type' : data_type[0], type_prefix.lower() + '_value' : value[0]} + + index += 1 + + return message_attributes diff --git a/tests/test_sqs/test_sqs.py b/tests/test_sqs/test_sqs.py index 5362535d1..d616e0496 100644 --- a/tests/test_sqs/test_sqs.py +++ b/tests/test_sqs/test_sqs.py @@ -8,7 +8,7 @@ import sure # noqa import time from moto import mock_sqs - +from tests.helpers import requires_boto_gte @mock_sqs def test_create_queue(): @@ -92,6 +92,32 @@ def test_send_message(): messages[1].get_body().should.equal(body_two) +@requires_boto_gte("2.28") +@mock_sqs +def test_send_message_with_attributes(): + conn = boto.connect_sqs('the_key', 'the_secret') + queue = conn.create_queue("test-queue", visibility_timeout=60) + queue.set_message_class(RawMessage) + + body = 'this is a test message' + message = queue.new_message(body) + message_attributes = { + 'test.attribute_name' : {'data_type' : 'String', 'string_value' : 'attribute value'}, + 'test.binary_attribute' : {'data_type' : 'Binary', 'binary_value' : 'binary value'}, + 'test.number_attribute' : {'data_type' : 'Number', 'string_value' : 'string value'} + } + message.message_attributes = message_attributes + + queue.write(message) + + messages = conn.receive_message(queue) + + messages[0].get_body().should.equal(body) + + for name, value in message_attributes.items(): + dict(messages[0].message_attributes[name]).should.equal(value) + + @mock_sqs def test_send_message_with_delay(): conn = boto.connect_sqs('the_key', 'the_secret') @@ -257,6 +283,23 @@ def test_send_batch_operation(): messages = queue.get_messages(2) +@requires_boto_gte("2.28") +@mock_sqs +def test_send_batch_operation_with_message_attributes(): + conn = boto.connect_sqs('the_key', 'the_secret') + queue = conn.create_queue("test-queue", visibility_timeout=60) + queue.set_message_class(RawMessage) + + message_tuple = ("my_first_message", 'test message 1', 0, {'name1': {'data_type': 'String', 'string_value': 'foo'}}) + queue.write_batch([message_tuple]) + + messages = queue.get_messages() + messages[0].get_body().should.equal("test message 1") + + for name, value in message_tuple[3].items(): + dict(messages[0].message_attributes[name]).should.equal(value) + + @mock_sqs def test_delete_batch_operation(): conn = boto.connect_sqs('the_key', 'the_secret')