Basic plumbing to preserve MessageGroupId and MessageDeduplicationID, if they are provided.

This commit is contained in:
Darien Hager 2018-04-19 00:22:35 -07:00
parent 6332ed9df9
commit 6556ba89cd
2 changed files with 26 additions and 2 deletions

View File

@ -38,6 +38,8 @@ class Message(BaseModel):
self.sent_timestamp = None self.sent_timestamp = None
self.approximate_first_receive_timestamp = None self.approximate_first_receive_timestamp = None
self.approximate_receive_count = 0 self.approximate_receive_count = 0
self.deduplication_id = None
self.group_id = None
self.visible_at = 0 self.visible_at = 0
self.delayed_until = 0 self.delayed_until = 0
@ -400,7 +402,7 @@ class SQSBackend(BaseBackend):
queue._set_attributes(attributes) queue._set_attributes(attributes)
return queue return queue
def send_message(self, queue_name, message_body, message_attributes=None, delay_seconds=None): def send_message(self, queue_name, message_body, message_attributes=None, delay_seconds=None, deduplication_id=None, group_id=None):
queue = self.get_queue(queue_name) queue = self.get_queue(queue_name)
@ -412,6 +414,12 @@ class SQSBackend(BaseBackend):
message_id = get_random_message_id() message_id = get_random_message_id()
message = Message(message_id, message_body) message = Message(message_id, message_body)
# Attributes, but not *message* attributes
if deduplication_id is not None:
message.deduplication_id = deduplication_id
if group_id is not None:
message.group_id = group_id
if message_attributes: if message_attributes:
message.message_attributes = message_attributes message.message_attributes = message_attributes

View File

@ -198,6 +198,8 @@ class SQSResponse(BaseResponse):
def send_message(self): def send_message(self):
message = self._get_param('MessageBody') message = self._get_param('MessageBody')
delay_seconds = int(self._get_param('DelaySeconds', 0)) delay_seconds = int(self._get_param('DelaySeconds', 0))
message_group_id = self._get_param("MessageGroupId")
message_dedupe_id = self._get_param("MessageDeduplicationId")
if len(message) > MAXIMUM_MESSAGE_LENGTH: if len(message) > MAXIMUM_MESSAGE_LENGTH:
return ERROR_TOO_LONG_RESPONSE, dict(status=400) return ERROR_TOO_LONG_RESPONSE, dict(status=400)
@ -213,7 +215,9 @@ class SQSResponse(BaseResponse):
queue_name, queue_name,
message, message,
message_attributes=message_attributes, message_attributes=message_attributes,
delay_seconds=delay_seconds delay_seconds=delay_seconds,
deduplication_id=message_dedupe_id,
group_id=message_group_id
) )
template = self.response_template(SEND_MESSAGE_RESPONSE) template = self.response_template(SEND_MESSAGE_RESPONSE)
return template.render(message=message, message_attributes=message_attributes) return template.render(message=message, message_attributes=message_attributes)
@ -491,6 +495,18 @@ RECEIVE_MESSAGE_RESPONSE = """<ReceiveMessageResponse>
<Name>ApproximateFirstReceiveTimestamp</Name> <Name>ApproximateFirstReceiveTimestamp</Name>
<Value>{{ message.approximate_first_receive_timestamp }}</Value> <Value>{{ message.approximate_first_receive_timestamp }}</Value>
</Attribute> </Attribute>
{% if message.deduplication_id is not none %}
<Attribute>
<Name>MessageDeduplicationId</Name>
<Value>{{ message.deduplication_id }}</Value>
</Attribute>
{% endif %}
{% if message.group_id is not none %}
<Attribute>
<Name>MessageGroupId</Name>
<Value>{{ message.group_id }}</Value>
</Attribute>
{% endif %}
{% if message.message_attributes.items()|count > 0 %} {% if message.message_attributes.items()|count > 0 %}
<MD5OfMessageAttributes>{{- message.attribute_md5 -}}</MD5OfMessageAttributes> <MD5OfMessageAttributes>{{- message.attribute_md5 -}}</MD5OfMessageAttributes>
{% endif %} {% endif %}