Merge pull request #222 from ralfas/master

Added basic support for SQS MessageAttributes.
This commit is contained in:
Steve Pulec 2014-10-05 21:10:48 -04:00
commit 8ab284a9d2
5 changed files with 134 additions and 3 deletions

View File

@ -7,3 +7,10 @@ class MessageNotInflight(Exception):
class ReceiptHandleIsInvalid(Exception): class ReceiptHandleIsInvalid(Exception):
description = "The receipt handle provided is not valid." description = "The receipt handle provided is not valid."
status_code = 400 status_code = 400
class MessageAttributesInvalid(Exception):
status_code = 400
def __init__(self, description):
self.description = description

View File

@ -19,6 +19,7 @@ class Message(object):
def __init__(self, message_id, body): def __init__(self, message_id, body):
self.id = message_id self.id = message_id
self.body = body self.body = body
self.message_attributes = {}
self.receipt_handle = None self.receipt_handle = None
self.sender_id = DEFAULT_ACCOUNT_ID self.sender_id = DEFAULT_ACCOUNT_ID
self.sent_timestamp = None self.sent_timestamp = None
@ -188,7 +189,8 @@ class SQSBackend(BaseBackend):
setattr(queue, key, value) setattr(queue, key, value)
return queue return queue
def send_message(self, queue_name, message_body, delay_seconds=None): def send_message(self, queue_name, message_body, message_attributes=None, delay_seconds=None):
queue = self.get_queue(queue_name) queue = self.get_queue(queue_name)
if delay_seconds: if delay_seconds:
@ -199,11 +201,15 @@ class SQSBackend(BaseBackend):
message_id = get_random_message_id() message_id = get_random_message_id()
message = Message(message_id, message_body) message = Message(message_id, message_body)
if message_attributes:
message.message_attributes = message_attributes
message.mark_sent( message.mark_sent(
delay_seconds=delay_seconds delay_seconds=delay_seconds
) )
queue.add_message(message) queue.add_message(message)
return message return message
def receive_messages(self, queue_name, count): def receive_messages(self, queue_name, count):
@ -228,6 +234,7 @@ class SQSBackend(BaseBackend):
result.append(message) result.append(message)
if len(result) >= count: if len(result) >= count:
break break
return result return result
def delete_message(self, queue_name, receipt_handle): def delete_message(self, queue_name, receipt_handle):

View File

@ -3,8 +3,10 @@ from jinja2 import Template
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.core.utils import camelcase_to_underscores from moto.core.utils import camelcase_to_underscores
from .utils import parse_message_attributes
from .models import sqs_backend from .models import sqs_backend
from .exceptions import ( from .exceptions import (
MessageAttributesInvalid,
MessageNotInflight, MessageNotInflight,
ReceiptHandleIsInvalid ReceiptHandleIsInvalid
) )
@ -93,14 +95,20 @@ class QueueResponse(BaseResponse):
else: else:
delay_seconds = 0 delay_seconds = 0
try:
message_attributes = parse_message_attributes(self.querystring)
except MessageAttributesInvalid as e:
return e.description, dict(status=e.status_code)
queue_name = self.path.split("/")[-1] queue_name = self.path.split("/")[-1]
message = sqs_backend.send_message( message = sqs_backend.send_message(
queue_name, queue_name,
message, message,
message_attributes=message_attributes,
delay_seconds=delay_seconds delay_seconds=delay_seconds
) )
template = Template(SEND_MESSAGE_RESPONSE) template = Template(SEND_MESSAGE_RESPONSE)
return template.render(message=message) return template.render(message=message, message_attributes=message_attributes)
def send_message_batch(self): def send_message_batch(self):
""" """
@ -131,6 +139,12 @@ class QueueResponse(BaseResponse):
delay_seconds = self.querystring.get(delay_key, [None])[0] delay_seconds = self.querystring.get(delay_key, [None])[0]
message = sqs_backend.send_message(queue_name, message_body[0], delay_seconds=delay_seconds) message = sqs_backend.send_message(queue_name, message_body[0], delay_seconds=delay_seconds)
message.user_id = message_user_id message.user_id = message_user_id
message_attributes = parse_message_attributes(self.querystring, base='SendMessageBatchRequestEntry.{0}.'.format(index), value_namespace='')
if type(message_attributes) == tuple:
return message_attributes[0], message_attributes[1]
message.message_attributes = message_attributes
messages.append(message) messages.append(message)
template = Template(SEND_MESSAGE_BATCH_RESPONSE) template = Template(SEND_MESSAGE_BATCH_RESPONSE)
@ -252,6 +266,9 @@ SEND_MESSAGE_RESPONSE = """<SendMessageResponse>
<MD5OfMessageBody> <MD5OfMessageBody>
{{ message.md5 }} {{ message.md5 }}
</MD5OfMessageBody> </MD5OfMessageBody>
{% if message.message_attributes.items()|count > 0 %}
<MD5OfMessageAttributes>324758f82d026ac6ec5b31a3b192d1e3</MD5OfMessageAttributes>
{% endif %}
<MessageId> <MessageId>
{{ message.id }} {{ message.id }}
</MessageId> </MessageId>
@ -287,6 +304,22 @@ RECEIVE_MESSAGE_RESPONSE = """<ReceiveMessageResponse>
<Name>ApproximateFirstReceiveTimestamp</Name> <Name>ApproximateFirstReceiveTimestamp</Name>
<Value>{{ message.approximate_first_receive_timestamp }}</Value> <Value>{{ message.approximate_first_receive_timestamp }}</Value>
</Attribute> </Attribute>
{% if message.message_attributes.items()|count > 0 %}
<MD5OfMessageAttributes>324758f82d026ac6ec5b31a3b192d1e3</MD5OfMessageAttributes>
{% endif %}
{% for name, value in message.message_attributes.items() %}
<MessageAttribute>
<Name>{{ name }}</Name>
<Value>
<DataType>{{ value.data_type }}</DataType>
{% if 'Binary' in value.data_type %}
<BinaryValue>{{ value.binary_value }}</BinaryValue>
{% else %}
<StringValue>{{ value.string_value }}</StringValue>
{% endif %}
</Value>
</MessageAttribute>
{% endfor %}
</Message> </Message>
{% endfor %} {% endfor %}
</ReceiveMessageResult> </ReceiveMessageResult>
@ -304,6 +337,9 @@ SEND_MESSAGE_BATCH_RESPONSE = """<SendMessageBatchResponse>
<Id>{{ message.user_id }}</Id> <Id>{{ message.user_id }}</Id>
<MessageId>{{ message.id }}</MessageId> <MessageId>{{ message.id }}</MessageId>
<MD5OfMessageBody>{{ message.md5 }}</MD5OfMessageBody> <MD5OfMessageBody>{{ message.md5 }}</MD5OfMessageBody>
{% if message.message_attributes.items()|count > 0 %}
<MD5OfMessageAttributes>324758f82d026ac6ec5b31a3b192d1e3</MD5OfMessageAttributes>
{% endif %}
</SendMessageBatchResultEntry> </SendMessageBatchResultEntry>
{% endfor %} {% endfor %}
</SendMessageBatchResult> </SendMessageBatchResult>

View File

@ -3,6 +3,8 @@ import datetime
import random import random
import string import string
from .exceptions import MessageAttributesInvalid
def generate_receipt_handle(): def generate_receipt_handle():
# http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ImportantIdentifiers.html#ImportantIdentifiers-receipt-handles # http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ImportantIdentifiers.html#ImportantIdentifiers-receipt-handles
@ -19,3 +21,39 @@ def unix_time(dt=None):
def unix_time_millis(dt=None): def unix_time_millis(dt=None):
return unix_time(dt) * 1000.0 return unix_time(dt) * 1000.0
def parse_message_attributes(querystring, base='', value_namespace='Value.'):
message_attributes = {}
index = 1
while True:
# Loop through looking for message attributes
name_key = base + 'MessageAttribute.{0}.Name'.format(index)
name = querystring.get(name_key)
if not name:
# Found all attributes
break
data_type_key = base + 'MessageAttribute.{0}.{1}DataType'.format(index, value_namespace)
data_type = querystring.get(data_type_key)
if not data_type:
raise MessageAttributesInvalid("The message attribute '{0}' must contain non-empty message attribute value.".format(name[0]))
data_type_parts = data_type[0].split('.')
if len(data_type_parts) > 2 or data_type_parts[0] not in ['String', 'Binary', 'Number']:
raise MessageAttributesInvalid("The message attribute '{0}' has an invalid message attribute type, the set of supported type prefixes is Binary, Number, and String.".format(name[0]))
type_prefix = 'String'
if data_type_parts[0] == 'Binary':
type_prefix = 'Binary'
value_key = base + 'MessageAttribute.{0}.{1}{2}Value'.format(index, value_namespace, type_prefix)
value = querystring.get(value_key)
if not value:
raise MessageAttributesInvalid("The message attribute '{0}' must contain non-empty message attribute value for message attribute type '{1}'.".format(name[0], data_type[0]))
message_attributes[name[0]] = {'data_type' : data_type[0], type_prefix.lower() + '_value' : value[0]}
index += 1
return message_attributes

View File

@ -8,7 +8,7 @@ import sure # noqa
import time import time
from moto import mock_sqs from moto import mock_sqs
from tests.helpers import requires_boto_gte
@mock_sqs @mock_sqs
def test_create_queue(): def test_create_queue():
@ -92,6 +92,32 @@ def test_send_message():
messages[1].get_body().should.equal(body_two) messages[1].get_body().should.equal(body_two)
@requires_boto_gte("2.28")
@mock_sqs
def test_send_message_with_attributes():
conn = boto.connect_sqs('the_key', 'the_secret')
queue = conn.create_queue("test-queue", visibility_timeout=60)
queue.set_message_class(RawMessage)
body = 'this is a test message'
message = queue.new_message(body)
message_attributes = {
'test.attribute_name' : {'data_type' : 'String', 'string_value' : 'attribute value'},
'test.binary_attribute' : {'data_type' : 'Binary', 'binary_value' : 'binary value'},
'test.number_attribute' : {'data_type' : 'Number', 'string_value' : 'string value'}
}
message.message_attributes = message_attributes
queue.write(message)
messages = conn.receive_message(queue)
messages[0].get_body().should.equal(body)
for name, value in message_attributes.items():
dict(messages[0].message_attributes[name]).should.equal(value)
@mock_sqs @mock_sqs
def test_send_message_with_delay(): def test_send_message_with_delay():
conn = boto.connect_sqs('the_key', 'the_secret') conn = boto.connect_sqs('the_key', 'the_secret')
@ -257,6 +283,23 @@ def test_send_batch_operation():
messages = queue.get_messages(2) messages = queue.get_messages(2)
@requires_boto_gte("2.28")
@mock_sqs
def test_send_batch_operation_with_message_attributes():
conn = boto.connect_sqs('the_key', 'the_secret')
queue = conn.create_queue("test-queue", visibility_timeout=60)
queue.set_message_class(RawMessage)
message_tuple = ("my_first_message", 'test message 1', 0, {'name1': {'data_type': 'String', 'string_value': 'foo'}})
queue.write_batch([message_tuple])
messages = queue.get_messages()
messages[0].get_body().should.equal("test message 1")
for name, value in message_tuple[3].items():
dict(messages[0].message_attributes[name]).should.equal(value)
@mock_sqs @mock_sqs
def test_delete_batch_operation(): def test_delete_batch_operation():
conn = boto.connect_sqs('the_key', 'the_secret') conn = boto.connect_sqs('the_key', 'the_secret')