diff --git a/moto/sqs/models.py b/moto/sqs/models.py index 0a268e9eb..044759e4f 100644 --- a/moto/sqs/models.py +++ b/moto/sqs/models.py @@ -152,64 +152,86 @@ class Message(BaseModel): class Queue(BaseModel): - camelcase_attributes = ['ApproximateNumberOfMessages', - 'ApproximateNumberOfMessagesDelayed', - 'ApproximateNumberOfMessagesNotVisible', - 'ContentBasedDeduplication', - 'CreatedTimestamp', - 'DelaySeconds', - 'FifoQueue', - 'KmsDataKeyReusePeriodSeconds', - 'KmsMasterKeyId', - 'LastModifiedTimestamp', - 'MaximumMessageSize', - 'MessageRetentionPeriod', - 'QueueArn', - 'ReceiveMessageWaitTimeSeconds', - 'RedrivePolicy', - 'VisibilityTimeout', - 'WaitTimeSeconds'] - ALLOWED_PERMISSIONS = ('*', 'ChangeMessageVisibility', 'DeleteMessage', 'GetQueueAttributes', - 'GetQueueUrl', 'ReceiveMessage', 'SendMessage') + base_attributes = ['ApproximateNumberOfMessages', + 'ApproximateNumberOfMessagesDelayed', + 'ApproximateNumberOfMessagesNotVisible', + 'CreatedTimestamp', + 'DelaySeconds', + 'LastModifiedTimestamp', + 'MaximumMessageSize', + 'MessageRetentionPeriod', + 'QueueArn', + 'ReceiveMessageWaitTimeSeconds', + 'VisibilityTimeout'] + fifo_attributes = ['FifoQueue', + 'ContentBasedDeduplication'] + kms_attributes = ['KmsDataKeyReusePeriodSeconds', + 'KmsMasterKeyId'] + ALLOWED_PERMISSIONS = ('*', 'ChangeMessageVisibility', 'DeleteMessage', + 'GetQueueAttributes', 'GetQueueUrl', + 'ReceiveMessage', 'SendMessage') def __init__(self, name, region, **kwargs): self.name = name - self.visibility_timeout = int(kwargs.get('VisibilityTimeout', 30)) self.region = region self.tags = {} + self.permissions = {} self._messages = [] now = unix_time() - - # kwargs can also have: - # [Policy, RedrivePolicy] - self.fifo_queue = kwargs.get('FifoQueue', 'false') == 'true' - self.content_based_deduplication = kwargs.get('ContentBasedDeduplication', 'false') == 'true' - self.kms_master_key_id = kwargs.get('KmsMasterKeyId', 'alias/aws/sqs') - self.kms_data_key_reuse_period_seconds = int(kwargs.get('KmsDataKeyReusePeriodSeconds', 300)) self.created_timestamp = now - self.delay_seconds = int(kwargs.get('DelaySeconds', 0)) - self.last_modified_timestamp = now - self.maximum_message_size = int(kwargs.get('MaximumMessageSize', 64 << 10)) - self.message_retention_period = int(kwargs.get('MessageRetentionPeriod', 86400 * 4)) # four days - self.queue_arn = 'arn:aws:sqs:{0}:123456789012:{1}'.format(self.region, self.name) - self.receive_message_wait_time_seconds = int(kwargs.get('ReceiveMessageWaitTimeSeconds', 0)) - self.permissions = {} - - # wait_time_seconds will be set to immediate return messages - self.wait_time_seconds = int(kwargs.get('WaitTimeSeconds', 0)) - - self.redrive_policy = {} + self.queue_arn = 'arn:aws:sqs:{0}:123456789012:{1}'.format(self.region, + self.name) self.dead_letter_queue = None - if 'RedrivePolicy' in kwargs: - self._setup_dlq(kwargs['RedrivePolicy']) + # default settings for a non fifo queue + defaults = { + 'ContentBasedDeduplication': 'false', + 'DelaySeconds': 0, + 'FifoQueue': 'false', + 'KmsDataKeyReusePeriodSeconds': 300, # five minutes + 'KmsMasterKeyId': None, + 'MaximumMessageSize': int(64 << 10), + 'MessageRetentionPeriod': 86400 * 4, # four days + 'Policy': None, + 'ReceiveMessageWaitTimeSeconds': 0, + 'RedrivePolicy': None, + 'VisibilityTimeout': 30, + } + + defaults.update(kwargs) + self._set_attributes(defaults, now) # Check some conditions if self.fifo_queue and not self.name.endswith('.fifo'): raise MessageAttributesInvalid('Queue name must end in .fifo for FIFO queues') + def _set_attributes(self, attributes, now=None): + if not now: + now = unix_time() + + integer_fields = ('DelaySeconds', 'KmsDataKeyreusePeriodSeconds', + 'MaximumMessageSize', 'MessageRetentionPeriod', + 'ReceiveMessageWaitTime', 'VisibilityTimeout') + bool_fields = ('ContentBasedDeduplication', 'FifoQueue') + + for key, value in six.iteritems(attributes): + if key in integer_fields: + value = int(value) + if key in bool_fields: + value = value == "true" + + if key == 'RedrivePolicy' and value is not None: + continue + + setattr(self, camelcase_to_underscores(key), value) + + if attributes.get('RedrivePolicy', None): + self._setup_dlq(attributes['RedrivePolicy']) + + self.last_modified_timestamp = now + def _setup_dlq(self, policy_json): try: self.redrive_policy = json.loads(policy_json) @@ -252,8 +274,8 @@ class Queue(BaseModel): if 'VisibilityTimeout' in properties: queue.visibility_timeout = int(properties['VisibilityTimeout']) - if 'WaitTimeSeconds' in properties: - queue.wait_time_seconds = int(properties['WaitTimeSeconds']) + if 'ReceiveMessageWaitTimeSeconds' in properties: + queue.receive_message_wait_time_seconds = int(properties['ReceiveMessageWaitTimeSeconds']) return queue @classmethod @@ -282,13 +304,31 @@ class Queue(BaseModel): @property def attributes(self): result = {} - for attribute in self.camelcase_attributes: + + for attribute in self.base_attributes: attr = getattr(self, camelcase_to_underscores(attribute)) - if isinstance(attr, bool): - attr = str(attr).lower() - elif attribute == 'RedrivePolicy': - attr = json.dumps(attr) result[attribute] = attr + + if self.fifo_queue: + for attribute in self.fifo_attributes: + attr = getattr(self, camelcase_to_underscores(attribute)) + result[attribute] = attr + + if self.kms_master_key_id: + for attribute in self.kms_attributes: + attr = getattr(self, camelcase_to_underscores(attribute)) + result[attribute] = attr + + if self.policy: + result['Policy'] = self.policy + + if self.redrive_policy: + result['RedrivePolicy'] = json.dumps(self.redrive_policy) + + for key in result: + if isinstance(result[key], bool): + result[key] = str(result[key]).lower() + return result def url(self, request_url): @@ -355,9 +395,9 @@ class SQSBackend(BaseBackend): return self.queues.pop(queue_name) return False - def set_queue_attribute(self, queue_name, key, value): + def set_queue_attributes(self, queue_name, attributes): queue = self.get_queue(queue_name) - setattr(queue, key, value) + queue._set_attributes(attributes) return queue def send_message(self, queue_name, message_body, message_attributes=None, delay_seconds=None): diff --git a/moto/sqs/responses.py b/moto/sqs/responses.py index b31681f16..71aab9a58 100644 --- a/moto/sqs/responses.py +++ b/moto/sqs/responses.py @@ -4,7 +4,7 @@ import re from six.moves.urllib.parse import urlparse from moto.core.responses import BaseResponse -from moto.core.utils import camelcase_to_underscores, amz_crc32, amzn_request_id +from moto.core.utils import amz_crc32, amzn_request_id from .utils import parse_message_attributes from .models import sqs_backends from .exceptions import ( @@ -87,7 +87,8 @@ class SQSResponse(BaseResponse): try: queue = self.sqs_backend.get_queue(queue_name) except QueueDoesNotExist as e: - return self._error('QueueDoesNotExist', e.description) + return self._error('AWS.SimpleQueueService.NonExistentQueue', + e.description) if queue: template = self.response_template(GET_QUEUE_URL_RESPONSE) @@ -171,7 +172,8 @@ class SQSResponse(BaseResponse): try: queue = self.sqs_backend.get_queue(queue_name) except QueueDoesNotExist as e: - return self._error('QueueDoesNotExist', e.description) + return self._error('AWS.SimpleQueueService.NonExistentQueue', + e.description) template = self.response_template(GET_QUEUE_ATTRIBUTES_RESPONSE) return template.render(queue=queue) @@ -179,9 +181,8 @@ class SQSResponse(BaseResponse): def set_queue_attributes(self): # TODO validate self.get_param('QueueUrl') queue_name = self._get_queue_name() - for key, value in self.attribute.items(): - key = camelcase_to_underscores(key) - self.sqs_backend.set_queue_attribute(queue_name, key, value) + self.sqs_backend.set_queue_attributes(queue_name, self.attribute) + return SET_QUEUE_ATTRIBUTE_RESPONSE def delete_queue(self): @@ -323,7 +324,7 @@ class SQSResponse(BaseResponse): try: wait_time = int(self.querystring.get("WaitTimeSeconds")[0]) except TypeError: - wait_time = queue.wait_time_seconds + wait_time = queue.receive_message_wait_time_seconds try: visibility_timeout = self._get_validated_visibility_timeout() diff --git a/tests/test_sqs/test_sqs.py b/tests/test_sqs/test_sqs.py index b91fd7bc7..05936ab39 100644 --- a/tests/test_sqs/test_sqs.py +++ b/tests/test_sqs/test_sqs.py @@ -72,6 +72,24 @@ def test_create_queue(): queue.attributes.get('VisibilityTimeout').should.equal('30') +@mock_sqs +def test_create_queue_kms(): + sqs = boto3.resource('sqs', region_name='us-east-1') + + new_queue = sqs.create_queue( + QueueName='test-queue', + Attributes={ + 'KmsMasterKeyId': 'master-key-id', + 'KmsDataKeyReusePeriodSeconds': '600' + }) + new_queue.should_not.be.none + + queue = sqs.get_queue_by_name(QueueName='test-queue') + + queue.attributes.get('KmsMasterKeyId').should.equal('master-key-id') + queue.attributes.get('KmsDataKeyReusePeriodSeconds').should.equal('600') + + @mock_sqs def test_get_nonexistent_queue(): sqs = boto3.resource('sqs', region_name='us-east-1') @@ -79,13 +97,15 @@ def test_get_nonexistent_queue(): sqs.get_queue_by_name(QueueName='nonexisting-queue') ex = err.exception ex.operation_name.should.equal('GetQueueUrl') - ex.response['Error']['Code'].should.equal('QueueDoesNotExist') + ex.response['Error']['Code'].should.equal( + 'AWS.SimpleQueueService.NonExistentQueue') with assert_raises(ClientError) as err: sqs.Queue('http://whatever-incorrect-queue-address').load() ex = err.exception ex.operation_name.should.equal('GetQueueAttributes') - ex.response['Error']['Code'].should.equal('QueueDoesNotExist') + ex.response['Error']['Code'].should.equal( + 'AWS.SimpleQueueService.NonExistentQueue') @mock_sqs @@ -890,7 +910,7 @@ def test_create_fifo_queue_with_dlq(): def test_queue_with_dlq(): if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'true': raise SkipTest('Cant manipulate time in server mode') - + sqs = boto3.client('sqs', region_name='us-east-1') with freeze_time("2015-01-01 12:00:00"): @@ -933,6 +953,7 @@ def test_queue_with_dlq(): resp = sqs.list_dead_letter_source_queues(QueueUrl=queue_url1) resp['queueUrls'][0].should.equal(queue_url2) + @mock_sqs def test_redrive_policy_available(): sqs = boto3.client('sqs', region_name='us-east-1') @@ -956,3 +977,51 @@ def test_redrive_policy_available(): attributes = sqs.get_queue_attributes(QueueUrl=queue_url2)['Attributes'] assert 'RedrivePolicy' in attributes assert json.loads(attributes['RedrivePolicy']) == redrive_policy + + # Cant have redrive policy without maxReceiveCount + with assert_raises(ClientError): + sqs.create_queue( + QueueName='test-queue2', + Attributes={ + 'FifoQueue': 'true', + 'RedrivePolicy': json.dumps({'deadLetterTargetArn': queue_arn1}) + } + ) + + +@mock_sqs +def test_redrive_policy_non_existent_queue(): + sqs = boto3.client('sqs', region_name='us-east-1') + redrive_policy = { + 'deadLetterTargetArn': 'arn:aws:sqs:us-east-1:123456789012:no-queue', + 'maxReceiveCount': 1, + } + + with assert_raises(ClientError): + sqs.create_queue( + QueueName='test-queue', + Attributes={ + 'RedrivePolicy': json.dumps(redrive_policy) + } + ) + + +@mock_sqs +def test_redrive_policy_set_attributes(): + sqs = boto3.resource('sqs', region_name='us-east-1') + + queue = sqs.create_queue(QueueName='test-queue') + deadletter_queue = sqs.create_queue(QueueName='test-deadletter') + + redrive_policy = { + 'deadLetterTargetArn': deadletter_queue.attributes['QueueArn'], + 'maxReceiveCount': 1, + } + + queue.set_attributes(Attributes={ + 'RedrivePolicy': json.dumps(redrive_policy)}) + + copy = sqs.get_queue_by_name(QueueName='test-queue') + assert 'RedrivePolicy' in copy.attributes + copy_policy = json.loads(copy.attributes['RedrivePolicy']) + assert copy_policy == redrive_policy