Improve SQS Compatibility with AWS (#1520)

* Return correct error code when fetching a queue that does not exist

* Improve SQS Queue get and set attributes

* Queue creation and set_attributes uses the same code path
    - ensure bool/int values are cast correctly
* RedrivePolicy is handled properly with set_attributes
    - _setup_dlq is called
    - is json decoded, so that returned RedrivePolicy is not json
      encoded twice
* As per AWS not all attributes are returned when they are not set, for
  example RedrivePolicy, FifoQueue, Policy, Kms*
* WaitTimeSeconds is not a queue attribute switch to
  ReceiveMessageWaitTimeSeconds
This commit is contained in:
Iain Bullard 2018-03-21 15:48:08 +00:00 committed by Jack Danger
parent 35d69759ef
commit 6dce7dcb18
3 changed files with 170 additions and 60 deletions

View File

@ -152,64 +152,86 @@ class Message(BaseModel):
class Queue(BaseModel):
camelcase_attributes = ['ApproximateNumberOfMessages',
'ApproximateNumberOfMessagesDelayed',
'ApproximateNumberOfMessagesNotVisible',
'ContentBasedDeduplication',
'CreatedTimestamp',
'DelaySeconds',
'FifoQueue',
'KmsDataKeyReusePeriodSeconds',
'KmsMasterKeyId',
'LastModifiedTimestamp',
'MaximumMessageSize',
'MessageRetentionPeriod',
'QueueArn',
'ReceiveMessageWaitTimeSeconds',
'RedrivePolicy',
'VisibilityTimeout',
'WaitTimeSeconds']
ALLOWED_PERMISSIONS = ('*', 'ChangeMessageVisibility', 'DeleteMessage', 'GetQueueAttributes',
'GetQueueUrl', 'ReceiveMessage', 'SendMessage')
base_attributes = ['ApproximateNumberOfMessages',
'ApproximateNumberOfMessagesDelayed',
'ApproximateNumberOfMessagesNotVisible',
'CreatedTimestamp',
'DelaySeconds',
'LastModifiedTimestamp',
'MaximumMessageSize',
'MessageRetentionPeriod',
'QueueArn',
'ReceiveMessageWaitTimeSeconds',
'VisibilityTimeout']
fifo_attributes = ['FifoQueue',
'ContentBasedDeduplication']
kms_attributes = ['KmsDataKeyReusePeriodSeconds',
'KmsMasterKeyId']
ALLOWED_PERMISSIONS = ('*', 'ChangeMessageVisibility', 'DeleteMessage',
'GetQueueAttributes', 'GetQueueUrl',
'ReceiveMessage', 'SendMessage')
def __init__(self, name, region, **kwargs):
self.name = name
self.visibility_timeout = int(kwargs.get('VisibilityTimeout', 30))
self.region = region
self.tags = {}
self.permissions = {}
self._messages = []
now = unix_time()
# kwargs can also have:
# [Policy, RedrivePolicy]
self.fifo_queue = kwargs.get('FifoQueue', 'false') == 'true'
self.content_based_deduplication = kwargs.get('ContentBasedDeduplication', 'false') == 'true'
self.kms_master_key_id = kwargs.get('KmsMasterKeyId', 'alias/aws/sqs')
self.kms_data_key_reuse_period_seconds = int(kwargs.get('KmsDataKeyReusePeriodSeconds', 300))
self.created_timestamp = now
self.delay_seconds = int(kwargs.get('DelaySeconds', 0))
self.last_modified_timestamp = now
self.maximum_message_size = int(kwargs.get('MaximumMessageSize', 64 << 10))
self.message_retention_period = int(kwargs.get('MessageRetentionPeriod', 86400 * 4)) # four days
self.queue_arn = 'arn:aws:sqs:{0}:123456789012:{1}'.format(self.region, self.name)
self.receive_message_wait_time_seconds = int(kwargs.get('ReceiveMessageWaitTimeSeconds', 0))
self.permissions = {}
# wait_time_seconds will be set to immediate return messages
self.wait_time_seconds = int(kwargs.get('WaitTimeSeconds', 0))
self.redrive_policy = {}
self.queue_arn = 'arn:aws:sqs:{0}:123456789012:{1}'.format(self.region,
self.name)
self.dead_letter_queue = None
if 'RedrivePolicy' in kwargs:
self._setup_dlq(kwargs['RedrivePolicy'])
# default settings for a non fifo queue
defaults = {
'ContentBasedDeduplication': 'false',
'DelaySeconds': 0,
'FifoQueue': 'false',
'KmsDataKeyReusePeriodSeconds': 300, # five minutes
'KmsMasterKeyId': None,
'MaximumMessageSize': int(64 << 10),
'MessageRetentionPeriod': 86400 * 4, # four days
'Policy': None,
'ReceiveMessageWaitTimeSeconds': 0,
'RedrivePolicy': None,
'VisibilityTimeout': 30,
}
defaults.update(kwargs)
self._set_attributes(defaults, now)
# Check some conditions
if self.fifo_queue and not self.name.endswith('.fifo'):
raise MessageAttributesInvalid('Queue name must end in .fifo for FIFO queues')
def _set_attributes(self, attributes, now=None):
if not now:
now = unix_time()
integer_fields = ('DelaySeconds', 'KmsDataKeyreusePeriodSeconds',
'MaximumMessageSize', 'MessageRetentionPeriod',
'ReceiveMessageWaitTime', 'VisibilityTimeout')
bool_fields = ('ContentBasedDeduplication', 'FifoQueue')
for key, value in six.iteritems(attributes):
if key in integer_fields:
value = int(value)
if key in bool_fields:
value = value == "true"
if key == 'RedrivePolicy' and value is not None:
continue
setattr(self, camelcase_to_underscores(key), value)
if attributes.get('RedrivePolicy', None):
self._setup_dlq(attributes['RedrivePolicy'])
self.last_modified_timestamp = now
def _setup_dlq(self, policy_json):
try:
self.redrive_policy = json.loads(policy_json)
@ -252,8 +274,8 @@ class Queue(BaseModel):
if 'VisibilityTimeout' in properties:
queue.visibility_timeout = int(properties['VisibilityTimeout'])
if 'WaitTimeSeconds' in properties:
queue.wait_time_seconds = int(properties['WaitTimeSeconds'])
if 'ReceiveMessageWaitTimeSeconds' in properties:
queue.receive_message_wait_time_seconds = int(properties['ReceiveMessageWaitTimeSeconds'])
return queue
@classmethod
@ -282,13 +304,31 @@ class Queue(BaseModel):
@property
def attributes(self):
result = {}
for attribute in self.camelcase_attributes:
for attribute in self.base_attributes:
attr = getattr(self, camelcase_to_underscores(attribute))
if isinstance(attr, bool):
attr = str(attr).lower()
elif attribute == 'RedrivePolicy':
attr = json.dumps(attr)
result[attribute] = attr
if self.fifo_queue:
for attribute in self.fifo_attributes:
attr = getattr(self, camelcase_to_underscores(attribute))
result[attribute] = attr
if self.kms_master_key_id:
for attribute in self.kms_attributes:
attr = getattr(self, camelcase_to_underscores(attribute))
result[attribute] = attr
if self.policy:
result['Policy'] = self.policy
if self.redrive_policy:
result['RedrivePolicy'] = json.dumps(self.redrive_policy)
for key in result:
if isinstance(result[key], bool):
result[key] = str(result[key]).lower()
return result
def url(self, request_url):
@ -355,9 +395,9 @@ class SQSBackend(BaseBackend):
return self.queues.pop(queue_name)
return False
def set_queue_attribute(self, queue_name, key, value):
def set_queue_attributes(self, queue_name, attributes):
queue = self.get_queue(queue_name)
setattr(queue, key, value)
queue._set_attributes(attributes)
return queue
def send_message(self, queue_name, message_body, message_attributes=None, delay_seconds=None):

View File

@ -4,7 +4,7 @@ import re
from six.moves.urllib.parse import urlparse
from moto.core.responses import BaseResponse
from moto.core.utils import camelcase_to_underscores, amz_crc32, amzn_request_id
from moto.core.utils import amz_crc32, amzn_request_id
from .utils import parse_message_attributes
from .models import sqs_backends
from .exceptions import (
@ -87,7 +87,8 @@ class SQSResponse(BaseResponse):
try:
queue = self.sqs_backend.get_queue(queue_name)
except QueueDoesNotExist as e:
return self._error('QueueDoesNotExist', e.description)
return self._error('AWS.SimpleQueueService.NonExistentQueue',
e.description)
if queue:
template = self.response_template(GET_QUEUE_URL_RESPONSE)
@ -171,7 +172,8 @@ class SQSResponse(BaseResponse):
try:
queue = self.sqs_backend.get_queue(queue_name)
except QueueDoesNotExist as e:
return self._error('QueueDoesNotExist', e.description)
return self._error('AWS.SimpleQueueService.NonExistentQueue',
e.description)
template = self.response_template(GET_QUEUE_ATTRIBUTES_RESPONSE)
return template.render(queue=queue)
@ -179,9 +181,8 @@ class SQSResponse(BaseResponse):
def set_queue_attributes(self):
# TODO validate self.get_param('QueueUrl')
queue_name = self._get_queue_name()
for key, value in self.attribute.items():
key = camelcase_to_underscores(key)
self.sqs_backend.set_queue_attribute(queue_name, key, value)
self.sqs_backend.set_queue_attributes(queue_name, self.attribute)
return SET_QUEUE_ATTRIBUTE_RESPONSE
def delete_queue(self):
@ -323,7 +324,7 @@ class SQSResponse(BaseResponse):
try:
wait_time = int(self.querystring.get("WaitTimeSeconds")[0])
except TypeError:
wait_time = queue.wait_time_seconds
wait_time = queue.receive_message_wait_time_seconds
try:
visibility_timeout = self._get_validated_visibility_timeout()

View File

@ -72,6 +72,24 @@ def test_create_queue():
queue.attributes.get('VisibilityTimeout').should.equal('30')
@mock_sqs
def test_create_queue_kms():
sqs = boto3.resource('sqs', region_name='us-east-1')
new_queue = sqs.create_queue(
QueueName='test-queue',
Attributes={
'KmsMasterKeyId': 'master-key-id',
'KmsDataKeyReusePeriodSeconds': '600'
})
new_queue.should_not.be.none
queue = sqs.get_queue_by_name(QueueName='test-queue')
queue.attributes.get('KmsMasterKeyId').should.equal('master-key-id')
queue.attributes.get('KmsDataKeyReusePeriodSeconds').should.equal('600')
@mock_sqs
def test_get_nonexistent_queue():
sqs = boto3.resource('sqs', region_name='us-east-1')
@ -79,13 +97,15 @@ def test_get_nonexistent_queue():
sqs.get_queue_by_name(QueueName='nonexisting-queue')
ex = err.exception
ex.operation_name.should.equal('GetQueueUrl')
ex.response['Error']['Code'].should.equal('QueueDoesNotExist')
ex.response['Error']['Code'].should.equal(
'AWS.SimpleQueueService.NonExistentQueue')
with assert_raises(ClientError) as err:
sqs.Queue('http://whatever-incorrect-queue-address').load()
ex = err.exception
ex.operation_name.should.equal('GetQueueAttributes')
ex.response['Error']['Code'].should.equal('QueueDoesNotExist')
ex.response['Error']['Code'].should.equal(
'AWS.SimpleQueueService.NonExistentQueue')
@mock_sqs
@ -890,7 +910,7 @@ def test_create_fifo_queue_with_dlq():
def test_queue_with_dlq():
if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'true':
raise SkipTest('Cant manipulate time in server mode')
sqs = boto3.client('sqs', region_name='us-east-1')
with freeze_time("2015-01-01 12:00:00"):
@ -933,6 +953,7 @@ def test_queue_with_dlq():
resp = sqs.list_dead_letter_source_queues(QueueUrl=queue_url1)
resp['queueUrls'][0].should.equal(queue_url2)
@mock_sqs
def test_redrive_policy_available():
sqs = boto3.client('sqs', region_name='us-east-1')
@ -956,3 +977,51 @@ def test_redrive_policy_available():
attributes = sqs.get_queue_attributes(QueueUrl=queue_url2)['Attributes']
assert 'RedrivePolicy' in attributes
assert json.loads(attributes['RedrivePolicy']) == redrive_policy
# Cant have redrive policy without maxReceiveCount
with assert_raises(ClientError):
sqs.create_queue(
QueueName='test-queue2',
Attributes={
'FifoQueue': 'true',
'RedrivePolicy': json.dumps({'deadLetterTargetArn': queue_arn1})
}
)
@mock_sqs
def test_redrive_policy_non_existent_queue():
sqs = boto3.client('sqs', region_name='us-east-1')
redrive_policy = {
'deadLetterTargetArn': 'arn:aws:sqs:us-east-1:123456789012:no-queue',
'maxReceiveCount': 1,
}
with assert_raises(ClientError):
sqs.create_queue(
QueueName='test-queue',
Attributes={
'RedrivePolicy': json.dumps(redrive_policy)
}
)
@mock_sqs
def test_redrive_policy_set_attributes():
sqs = boto3.resource('sqs', region_name='us-east-1')
queue = sqs.create_queue(QueueName='test-queue')
deadletter_queue = sqs.create_queue(QueueName='test-deadletter')
redrive_policy = {
'deadLetterTargetArn': deadletter_queue.attributes['QueueArn'],
'maxReceiveCount': 1,
}
queue.set_attributes(Attributes={
'RedrivePolicy': json.dumps(redrive_policy)})
copy = sqs.get_queue_by_name(QueueName='test-queue')
assert 'RedrivePolicy' in copy.attributes
copy_policy = json.loads(copy.attributes['RedrivePolicy'])
assert copy_policy == redrive_policy