Merge pull request #2830 from gruebel/fix-sqs-raw-message-delivery

Fix missing MessageAttributes when using RawMessageDelivery
This commit is contained in:
Bert Blommers 2020-03-22 08:18:22 +00:00 committed by GitHub
commit fc9cecc154
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 28 deletions

View File

@ -146,7 +146,9 @@ class Subscription(BaseModel):
queue_name = self.endpoint.split(":")[-1] queue_name = self.endpoint.split(":")[-1]
region = self.endpoint.split(":")[3] region = self.endpoint.split(":")[3]
if self.attributes.get("RawMessageDelivery") != "true": if self.attributes.get("RawMessageDelivery") != "true":
enveloped_message = json.dumps( sqs_backends[region].send_message(
queue_name,
json.dumps(
self.get_post_data( self.get_post_data(
message, message,
message_id, message_id,
@ -156,10 +158,26 @@ class Subscription(BaseModel):
sort_keys=True, sort_keys=True,
indent=2, indent=2,
separators=(",", ": "), separators=(",", ": "),
),
) )
else: else:
enveloped_message = message raw_message_attributes = {}
sqs_backends[region].send_message(queue_name, enveloped_message) for key, value in message_attributes.items():
type = "string_value"
type_value = value["Value"]
if value["Type"].startswith("Binary"):
type = "binary_value"
elif value["Type"].startswith("Number"):
type_value = "{0:g}".format(value["Value"])
raw_message_attributes[key] = {
"data_type": value["Type"],
type: type_value,
}
sqs_backends[region].send_message(
queue_name, message, message_attributes=raw_message_attributes
)
elif self.protocol in ["http", "https"]: elif self.protocol in ["http", "https"]:
post_data = self.get_post_data(message, message_id, subject) post_data = self.get_post_data(message, message_id, subject)
requests.post( requests.post(

View File

@ -148,34 +148,42 @@ def test_publish_to_sqs_msg_attr_byte_value():
conn.create_topic(Name="some-topic") conn.create_topic(Name="some-topic")
response = conn.list_topics() response = conn.list_topics()
topic_arn = response["Topics"][0]["TopicArn"] topic_arn = response["Topics"][0]["TopicArn"]
sqs = boto3.resource("sqs", region_name="us-east-1")
sqs_conn = boto3.resource("sqs", region_name="us-east-1") queue = sqs.create_queue(QueueName="test-queue")
queue = sqs_conn.create_queue(QueueName="test-queue") conn.subscribe(
TopicArn=topic_arn, Protocol="sqs", Endpoint=queue.attributes["QueueArn"],
)
queue_raw = sqs.create_queue(QueueName="test-queue-raw")
conn.subscribe( conn.subscribe(
TopicArn=topic_arn, TopicArn=topic_arn,
Protocol="sqs", Protocol="sqs",
Endpoint="arn:aws:sqs:us-east-1:{}:test-queue".format(ACCOUNT_ID), Endpoint=queue_raw.attributes["QueueArn"],
Attributes={"RawMessageDelivery": "true"},
) )
message = "my message"
conn.publish( conn.publish(
TopicArn=topic_arn, TopicArn=topic_arn,
Message=message, Message="my message",
MessageAttributes={ MessageAttributes={
"store": {"DataType": "Binary", "BinaryValue": b"\x02\x03\x04"} "store": {"DataType": "Binary", "BinaryValue": b"\x02\x03\x04"}
}, },
) )
messages = queue.receive_messages(MaxNumberOfMessages=5)
message_attributes = [json.loads(m.body)["MessageAttributes"] for m in messages] message = json.loads(queue.receive_messages()[0].body)
message_attributes.should.equal( message["Message"].should.equal("my message")
[ message["MessageAttributes"].should.equal(
{ {
"store": { "store": {
"Type": "Binary", "Type": "Binary",
"Value": base64.b64encode(b"\x02\x03\x04").decode(), "Value": base64.b64encode(b"\x02\x03\x04").decode(),
} }
} }
] )
message = queue_raw.receive_messages()[0]
message.body.should.equal("my message")
message.message_attributes.should.equal(
{"store": {"DataType": "Binary", "BinaryValue": b"\x02\x03\x04"}}
) )
@ -187,6 +195,12 @@ def test_publish_to_sqs_msg_attr_number_type():
sqs = boto3.resource("sqs", region_name="us-east-1") sqs = boto3.resource("sqs", region_name="us-east-1")
queue = sqs.create_queue(QueueName="test-queue") queue = sqs.create_queue(QueueName="test-queue")
topic.subscribe(Protocol="sqs", Endpoint=queue.attributes["QueueArn"]) topic.subscribe(Protocol="sqs", Endpoint=queue.attributes["QueueArn"])
queue_raw = sqs.create_queue(QueueName="test-queue-raw")
topic.subscribe(
Protocol="sqs",
Endpoint=queue_raw.attributes["QueueArn"],
Attributes={"RawMessageDelivery": "true"},
)
topic.publish( topic.publish(
Message="test message", Message="test message",
@ -199,6 +213,12 @@ def test_publish_to_sqs_msg_attr_number_type():
{"retries": {"Type": "Number", "Value": 0}} {"retries": {"Type": "Number", "Value": 0}}
) )
message = queue_raw.receive_messages()[0]
message.body.should.equal("test message")
message.message_attributes.should.equal(
{"retries": {"DataType": "Number", "StringValue": "0"}}
)
@mock_sns @mock_sns
def test_publish_sms(): def test_publish_sms():