Fix : SQS - Added support for attribute labels for send_message function (#3181)
* Fix : SQS - Added support for attribute labels for send_message function * Add integration test on receive message function * Add send message invalid datetype integration test and fix SQS MessageAttributesInvalid exceptions
This commit is contained in:
parent
126f5a5155
commit
97139d4253
@ -16,11 +16,13 @@ class ReceiptHandleIsInvalid(RESTError):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class MessageAttributesInvalid(Exception):
|
class MessageAttributesInvalid(RESTError):
|
||||||
status_code = 400
|
code = 400
|
||||||
|
|
||||||
def __init__(self, description):
|
def __init__(self, description):
|
||||||
self.description = description
|
super(MessageAttributesInvalid, self).__init__(
|
||||||
|
"MessageAttributesInvalid", description
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class QueueDoesNotExist(RESTError):
|
class QueueDoesNotExist(RESTError):
|
||||||
|
@ -87,7 +87,19 @@ class Message(BaseModel):
|
|||||||
struct_format = "!I".encode("ascii") # ensure it's a bytestring
|
struct_format = "!I".encode("ascii") # ensure it's a bytestring
|
||||||
for name in sorted(self.message_attributes.keys()):
|
for name in sorted(self.message_attributes.keys()):
|
||||||
attr = self.message_attributes[name]
|
attr = self.message_attributes[name]
|
||||||
data_type = attr["data_type"]
|
data_type_parts = attr["data_type"].split(".")
|
||||||
|
data_type = data_type_parts[0]
|
||||||
|
|
||||||
|
if data_type not in [
|
||||||
|
"String",
|
||||||
|
"Binary",
|
||||||
|
"Number",
|
||||||
|
]:
|
||||||
|
raise MessageAttributesInvalid(
|
||||||
|
"The message attribute '{0}' has an invalid message attribute type, the set of supported type prefixes is Binary, Number, and String.".format(
|
||||||
|
name[0]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
encoded = utf8("")
|
encoded = utf8("")
|
||||||
# Each part of each attribute is encoded right after it's
|
# Each part of each attribute is encoded right after it's
|
||||||
@ -243,9 +255,7 @@ class Queue(BaseModel):
|
|||||||
|
|
||||||
# Check some conditions
|
# Check some conditions
|
||||||
if self.fifo_queue and not self.name.endswith(".fifo"):
|
if self.fifo_queue and not self.name.endswith(".fifo"):
|
||||||
raise MessageAttributesInvalid(
|
raise InvalidParameterValue("Queue name must end in .fifo for FIFO queues")
|
||||||
"Queue name must end in .fifo for FIFO queues"
|
|
||||||
)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pending_messages(self):
|
def pending_messages(self):
|
||||||
|
@ -9,7 +9,6 @@ from six.moves.urllib.parse import urlparse
|
|||||||
from .exceptions import (
|
from .exceptions import (
|
||||||
EmptyBatchRequest,
|
EmptyBatchRequest,
|
||||||
InvalidAttributeName,
|
InvalidAttributeName,
|
||||||
MessageAttributesInvalid,
|
|
||||||
MessageNotInflight,
|
MessageNotInflight,
|
||||||
ReceiptHandleIsInvalid,
|
ReceiptHandleIsInvalid,
|
||||||
)
|
)
|
||||||
@ -82,12 +81,7 @@ class SQSResponse(BaseResponse):
|
|||||||
request_url = urlparse(self.uri)
|
request_url = urlparse(self.uri)
|
||||||
queue_name = self._get_param("QueueName")
|
queue_name = self._get_param("QueueName")
|
||||||
|
|
||||||
try:
|
queue = self.sqs_backend.create_queue(queue_name, self.tags, **self.attribute)
|
||||||
queue = self.sqs_backend.create_queue(
|
|
||||||
queue_name, self.tags, **self.attribute
|
|
||||||
)
|
|
||||||
except MessageAttributesInvalid as e:
|
|
||||||
return self._error("InvalidParameterValue", e.description)
|
|
||||||
|
|
||||||
template = self.response_template(CREATE_QUEUE_RESPONSE)
|
template = self.response_template(CREATE_QUEUE_RESPONSE)
|
||||||
return template.render(queue_url=queue.url(request_url))
|
return template.render(queue_url=queue.url(request_url))
|
||||||
@ -225,10 +219,7 @@ class SQSResponse(BaseResponse):
|
|||||||
if len(message) > MAXIMUM_MESSAGE_LENGTH:
|
if len(message) > MAXIMUM_MESSAGE_LENGTH:
|
||||||
return ERROR_TOO_LONG_RESPONSE, dict(status=400)
|
return ERROR_TOO_LONG_RESPONSE, dict(status=400)
|
||||||
|
|
||||||
try:
|
|
||||||
message_attributes = parse_message_attributes(self.querystring)
|
message_attributes = parse_message_attributes(self.querystring)
|
||||||
except MessageAttributesInvalid as e:
|
|
||||||
return e.description, dict(status=e.status_code)
|
|
||||||
|
|
||||||
queue_name = self._get_queue_name()
|
queue_name = self._get_queue_name()
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ def parse_message_attributes(querystring, base="", value_namespace="Value."):
|
|||||||
)
|
)
|
||||||
|
|
||||||
data_type_parts = data_type[0].split(".")
|
data_type_parts = data_type[0].split(".")
|
||||||
if len(data_type_parts) > 2 or data_type_parts[0] not in [
|
if data_type_parts[0] not in [
|
||||||
"String",
|
"String",
|
||||||
"Binary",
|
"Binary",
|
||||||
"Number",
|
"Number",
|
||||||
|
@ -248,6 +248,50 @@ def test_message_with_complex_attributes():
|
|||||||
messages.should.have.length_of(1)
|
messages.should.have.length_of(1)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_message_with_attributes_have_labels():
|
||||||
|
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||||
|
queue = sqs.create_queue(QueueName="blah")
|
||||||
|
msg = queue.send_message(
|
||||||
|
MessageBody="derp",
|
||||||
|
MessageAttributes={
|
||||||
|
"timestamp": {
|
||||||
|
"DataType": "Number.java.lang.Long",
|
||||||
|
"StringValue": "1493147359900",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
msg.get("MD5OfMessageBody").should.equal("58fd9edd83341c29f1aebba81c31e257")
|
||||||
|
msg.get("MD5OfMessageAttributes").should.equal("235c5c510d26fb653d073faed50ae77c")
|
||||||
|
msg.get("MessageId").should_not.contain(" \n")
|
||||||
|
|
||||||
|
messages = queue.receive_messages()
|
||||||
|
messages.should.have.length_of(1)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_message_with_attributes_invalid_datatype():
|
||||||
|
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||||
|
queue = sqs.create_queue(QueueName="blah")
|
||||||
|
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
queue.send_message(
|
||||||
|
MessageBody="derp",
|
||||||
|
MessageAttributes={
|
||||||
|
"timestamp": {
|
||||||
|
"DataType": "InvalidNumber",
|
||||||
|
"StringValue": "149314735990a",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
ex = e.exception
|
||||||
|
ex.response["Error"]["Code"].should.equal("MessageAttributesInvalid")
|
||||||
|
ex.response["Error"]["Message"].should.equal(
|
||||||
|
"The message attribute 'timestamp' has an invalid message attribute type, the set of supported type "
|
||||||
|
"prefixes is Binary, Number, and String."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@mock_sqs
|
@mock_sqs
|
||||||
def test_send_message_with_message_group_id():
|
def test_send_message_with_message_group_id():
|
||||||
sqs = boto3.resource("sqs", region_name="us-east-1")
|
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||||
@ -532,6 +576,54 @@ def test_send_receive_message_with_attributes():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_send_receive_message_with_attributes_with_labels():
|
||||||
|
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||||
|
conn = boto3.client("sqs", region_name="us-east-1")
|
||||||
|
conn.create_queue(QueueName="test-queue")
|
||||||
|
queue = sqs.Queue("test-queue")
|
||||||
|
|
||||||
|
body_one = "this is a test message"
|
||||||
|
body_two = "this is another test message"
|
||||||
|
|
||||||
|
queue.send_message(
|
||||||
|
MessageBody=body_one,
|
||||||
|
MessageAttributes={
|
||||||
|
"timestamp": {
|
||||||
|
"StringValue": "1493147359900",
|
||||||
|
"DataType": "Number.java.lang.Long",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
queue.send_message(
|
||||||
|
MessageBody=body_two,
|
||||||
|
MessageAttributes={
|
||||||
|
"timestamp": {
|
||||||
|
"StringValue": "1493147359901",
|
||||||
|
"DataType": "Number.java.lang.Long",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
messages = conn.receive_message(QueueUrl=queue.url, MaxNumberOfMessages=2)[
|
||||||
|
"Messages"
|
||||||
|
]
|
||||||
|
|
||||||
|
message1 = messages[0]
|
||||||
|
message2 = messages[1]
|
||||||
|
|
||||||
|
message1.get("Body").should.equal(body_one)
|
||||||
|
message2.get("Body").should.equal(body_two)
|
||||||
|
|
||||||
|
message1.get("MD5OfMessageAttributes").should.equal(
|
||||||
|
"235c5c510d26fb653d073faed50ae77c"
|
||||||
|
)
|
||||||
|
message2.get("MD5OfMessageAttributes").should.equal(
|
||||||
|
"994258b45346a2cc3f9cbb611aa7af30"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@mock_sqs
|
@mock_sqs
|
||||||
def test_send_receive_message_timestamps():
|
def test_send_receive_message_timestamps():
|
||||||
sqs = boto3.resource("sqs", region_name="us-east-1")
|
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user