diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index 0377b27ed..813f5bf36 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -3149,24 +3149,27 @@ - [ ] put_scaling_policy - [ ] register_scalable_target -## sqs - 41% implemented -- [ ] add_permission +## sqs - 100% implemented +- [X] add_permission - [X] change_message_visibility -- [ ] change_message_visibility_batch +- [X] change_message_visibility_batch - [X] create_queue - [X] delete_message -- [ ] delete_message_batch +- [X] delete_message_batch - [X] delete_queue -- [ ] get_queue_attributes -- [ ] get_queue_url -- [ ] list_dead_letter_source_queues +- [X] get_queue_attributes +- [X] get_queue_url +- [X] list_dead_letter_source_queues - [X] list_queues - [X] purge_queue -- [ ] receive_message -- [ ] remove_permission +- [X] receive_message +- [X] remove_permission - [X] send_message -- [ ] send_message_batch -- [ ] set_queue_attributes +- [X] send_message_batch +- [X] set_queue_attributes +- [X] tag_queue +- [X] untag_queue +- [X] list_queue_tags ## iot - 0% implemented - [ ] accept_certificate_transfer diff --git a/README.md b/README.md index b2ca5a807..9a20bbe15 100644 --- a/README.md +++ b/README.md @@ -117,7 +117,7 @@ It gets even better! Moto isn't just for Python code and it isn't just for S3. L |------------------------------------------------------------------------------| | S3 | @mock_s3 | core endpoints done | |------------------------------------------------------------------------------| -| SES | @mock_ses | core endpoints done | +| SES | @mock_ses | all endpoints done | |------------------------------------------------------------------------------| | SNS | @mock_sns | all endpoints done | |------------------------------------------------------------------------------| diff --git a/moto/sqs/models.py b/moto/sqs/models.py index 22f310228..85b69ab0e 100644 --- a/moto/sqs/models.py +++ b/moto/sqs/models.py @@ -2,6 +2,7 @@ from __future__ import unicode_literals import base64 import hashlib +import json import re import six import struct @@ -9,6 +10,7 @@ from xml.sax.saxutils import escape import boto.sqs +from moto.core.exceptions import RESTError from moto.core import BaseBackend, BaseModel from moto.core.utils import camelcase_to_underscores, get_random_message_id, unix_time, unix_time_millis from .utils import generate_receipt_handle @@ -166,11 +168,14 @@ class Queue(BaseModel): 'ReceiveMessageWaitTimeSeconds', 'VisibilityTimeout', 'WaitTimeSeconds'] + ALLOWED_PERMISSIONS = ('*', 'ChangeMessageVisibility', 'DeleteMessage', 'GetQueueAttributes', + 'GetQueueUrl', 'ReceiveMessage', 'SendMessage') def __init__(self, name, region, **kwargs): self.name = name self.visibility_timeout = int(kwargs.get('VisibilityTimeout', 30)) self.region = region + self.tags = {} self._messages = [] @@ -189,14 +194,42 @@ class Queue(BaseModel): self.message_retention_period = int(kwargs.get('MessageRetentionPeriod', 86400 * 4)) # four days self.queue_arn = 'arn:aws:sqs:{0}:123456789012:{1}'.format(self.region, self.name) self.receive_message_wait_time_seconds = int(kwargs.get('ReceiveMessageWaitTimeSeconds', 0)) + self.permissions = {} # wait_time_seconds will be set to immediate return messages self.wait_time_seconds = int(kwargs.get('WaitTimeSeconds', 0)) + self.redrive_policy = {} + self.dead_letter_queue = None + + if 'RedrivePolicy' in kwargs: + self._setup_dlq(kwargs['RedrivePolicy']) + # Check some conditions if self.fifo_queue and not self.name.endswith('.fifo'): raise MessageAttributesInvalid('Queue name must end in .fifo for FIFO queues') + def _setup_dlq(self, policy_json): + try: + self.redrive_policy = json.loads(policy_json) + except ValueError: + raise RESTError('InvalidParameterValue', 'Redrive policy does not contain valid json') + + if 'deadLetterTargetArn' not in self.redrive_policy: + raise RESTError('InvalidParameterValue', 'Redrive policy does not contain deadLetterTargetArn') + if 'maxReceiveCount' not in self.redrive_policy: + raise RESTError('InvalidParameterValue', 'Redrive policy does not contain maxReceiveCount') + + for queue in sqs_backends[self.region].queues.values(): + if queue.queue_arn == self.redrive_policy['deadLetterTargetArn']: + self.dead_letter_queue = queue + + if self.fifo_queue and not queue.fifo_queue: + raise RESTError('InvalidParameterCombination', 'Fifo queues cannot use non fifo dead letter queues') + break + else: + raise RESTError('AWS.SimpleQueueService.NonExistentQueue', 'Could not find DLQ for {0}'.format(self.redrive_policy['deadLetterTargetArn'])) + @classmethod def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name): properties = cloudformation_json['Properties'] @@ -382,9 +415,14 @@ class SQSBackend(BaseBackend): time.sleep(0.001) continue + messages_to_dlq = [] for message in queue.messages: if not message.visible: continue + if queue.dead_letter_queue is not None and message.approximate_receive_count >= queue.redrive_policy['maxReceiveCount']: + messages_to_dlq.append(message) + continue + message.mark_received( visibility_timeout=visibility_timeout ) @@ -392,6 +430,10 @@ class SQSBackend(BaseBackend): if len(result) >= count: break + for message in messages_to_dlq: + queue._messages.remove(message) + queue.dead_letter_queue.add_message(message) + return result def delete_message(self, queue_name, receipt_handle): @@ -419,6 +461,49 @@ class SQSBackend(BaseBackend): queue = self.get_queue(queue_name) queue._messages = [] + def list_dead_letter_source_queues(self, queue_name): + dlq = self.get_queue(queue_name) + + queues = [] + for queue in self.queues.values(): + if queue.dead_letter_queue is dlq: + queues.append(queue) + + return queues + + def add_permission(self, queue_name, actions, account_ids, label): + queue = self.get_queue(queue_name) + + if actions is None or len(actions) == 0: + raise RESTError('InvalidParameterValue', 'Need at least one Action') + if account_ids is None or len(account_ids) == 0: + raise RESTError('InvalidParameterValue', 'Need at least one Account ID') + + if not all([item in Queue.ALLOWED_PERMISSIONS for item in actions]): + raise RESTError('InvalidParameterValue', 'Invalid permissions') + + queue.permissions[label] = (account_ids, actions) + + def remove_permission(self, queue_name, label): + queue = self.get_queue(queue_name) + + if label not in queue.permissions: + raise RESTError('InvalidParameterValue', 'Permission doesnt exist for the given label') + + del queue.permissions[label] + + def tag_queue(self, queue_name, tags): + queue = self.get_queue(queue_name) + queue.tags.update(tags) + + def untag_queue(self, queue_name, tag_keys): + queue = self.get_queue(queue_name) + for key in tag_keys: + try: + del queue.tags[key] + except KeyError: + pass + sqs_backends = {} for region in boto.sqs.regions(): diff --git a/moto/sqs/responses.py b/moto/sqs/responses.py index 63a5036d6..bb21c1e2a 100644 --- a/moto/sqs/responses.py +++ b/moto/sqs/responses.py @@ -40,12 +40,15 @@ class SQSResponse(BaseResponse): queue_name = self.path.split("/")[-1] return queue_name - def _get_validated_visibility_timeout(self): + def _get_validated_visibility_timeout(self, timeout=None): """ :raises ValueError: If specified visibility timeout exceeds MAXIMUM_VISIBILTY_TIMEOUT :raises TypeError: If visibility timeout was not specified """ - visibility_timeout = int(self.querystring.get("VisibilityTimeout")[0]) + if timeout is not None: + visibility_timeout = int(timeout) + else: + visibility_timeout = int(self.querystring.get("VisibilityTimeout")[0]) if visibility_timeout > MAXIMUM_VISIBILTY_TIMEOUT: raise ValueError @@ -119,6 +122,49 @@ class SQSResponse(BaseResponse): template = self.response_template(CHANGE_MESSAGE_VISIBILITY_RESPONSE) return template.render() + def change_message_visibility_batch(self): + queue_name = self._get_queue_name() + entries = self._get_list_prefix('ChangeMessageVisibilityBatchRequestEntry') + + success = [] + error = [] + for entry in entries: + try: + visibility_timeout = self._get_validated_visibility_timeout(entry['visibility_timeout']) + except ValueError: + error.append({ + 'Id': entry['id'], + 'SenderFault': 'true', + 'Code': 'InvalidParameterValue', + 'Message': 'Visibility timeout invalid' + }) + continue + + try: + self.sqs_backend.change_message_visibility( + queue_name=queue_name, + receipt_handle=entry['receipt_handle'], + visibility_timeout=visibility_timeout + ) + success.append(entry['id']) + except ReceiptHandleIsInvalid as e: + error.append({ + 'Id': entry['id'], + 'SenderFault': 'true', + 'Code': 'ReceiptHandleIsInvalid', + 'Message': e.description + }) + except MessageNotInflight as e: + error.append({ + 'Id': entry['id'], + 'SenderFault': 'false', + 'Code': 'AWS.SimpleQueueService.MessageNotInflight', + 'Message': e.description + }) + + template = self.response_template(CHANGE_MESSAGE_VISIBILITY_BATCH_RESPONSE) + return template.render(success=success, errors=error) + def get_queue_attributes(self): queue_name = self._get_queue_name() try: @@ -288,8 +334,62 @@ class SQSResponse(BaseResponse): messages = self.sqs_backend.receive_messages( queue_name, message_count, wait_time, visibility_timeout) template = self.response_template(RECEIVE_MESSAGE_RESPONSE) - output = template.render(messages=messages) - return output + return template.render(messages=messages) + + def list_dead_letter_source_queues(self): + request_url = urlparse(self.uri) + queue_name = self._get_queue_name() + + source_queue_urls = self.sqs_backend.list_dead_letter_source_queues(queue_name) + + template = self.response_template(LIST_DEAD_LETTER_SOURCE_QUEUES_RESPONSE) + return template.render(queues=source_queue_urls, request_url=request_url) + + def add_permission(self): + queue_name = self._get_queue_name() + actions = self._get_multi_param('ActionName') + account_ids = self._get_multi_param('AWSAccountId') + label = self._get_param('Label') + + self.sqs_backend.add_permission(queue_name, actions, account_ids, label) + + template = self.response_template(ADD_PERMISSION_RESPONSE) + return template.render() + + def remove_permission(self): + queue_name = self._get_queue_name() + label = self._get_param('Label') + + self.sqs_backend.remove_permission(queue_name, label) + + template = self.response_template(REMOVE_PERMISSION_RESPONSE) + return template.render() + + def tag_queue(self): + queue_name = self._get_queue_name() + tags = self._get_map_prefix('Tag', key_end='.Key', value_end='.Value') + + self.sqs_backend.tag_queue(queue_name, tags) + + template = self.response_template(TAG_QUEUE_RESPONSE) + return template.render() + + def untag_queue(self): + queue_name = self._get_queue_name() + tag_keys = self._get_multi_param('TagKey') + + self.sqs_backend.untag_queue(queue_name, tag_keys) + + template = self.response_template(UNTAG_QUEUE_RESPONSE) + return template.render() + + def list_queue_tags(self): + queue_name = self._get_queue_name() + + queue = self.sqs_backend.get_queue(queue_name) + + template = self.response_template(LIST_QUEUE_TAGS_RESPONSE) + return template.render(tags=queue.tags) CREATE_QUEUE_RESPONSE = """ @@ -307,7 +407,7 @@ GET_QUEUE_URL_RESPONSE = """ {{ queue.url(request_url) }} - 470a6f13-2ed9-4181-ad8a-2fdea142988e + {{ requestid }} """ @@ -318,13 +418,13 @@ LIST_QUEUES_RESPONSE = """ {% endfor %} - 725275ae-0b9b-4762-b238-436d7c65a1ac + {{ requestid }} """ DELETE_QUEUE_RESPONSE = """ - 6fde8d1e-52cd-4581-8cd9-c512f4c64223 + {{ requestid }} """ @@ -338,13 +438,13 @@ GET_QUEUE_ATTRIBUTES_RESPONSE = """ {% endfor %} - 1ea71be5-b5a2-4f9d-b85a-945d8d08cd0b + {{ requestid }} """ SET_QUEUE_ATTRIBUTE_RESPONSE = """ - e5cca473-4fc0-4198-a451-8abb94d02c75 + {{ requestid }} """ @@ -361,7 +461,7 @@ SEND_MESSAGE_RESPONSE = """ - 27daac76-34dd-47df-bd01-1f6e873584a0 + {{ requestid }} """ @@ -409,7 +509,7 @@ RECEIVE_MESSAGE_RESPONSE = """ {% endfor %} - b6633655-283d-45b4-aee4-4e84e0ae6afa + {{ requestid }} """ @@ -427,13 +527,13 @@ SEND_MESSAGE_BATCH_RESPONSE = """ {% endfor %} - ca1ad5d0-8271-408b-8d0f-1351bf547e74 + {{ requestid }} """ DELETE_MESSAGE_RESPONSE = """ - b5293cb5-d306-4a17-9048-b263635abe42 + {{ requestid }} """ @@ -446,22 +546,92 @@ DELETE_MESSAGE_BATCH_RESPONSE = """ {% endfor %} - d6f86b7a-74d1-4439-b43f-196a1e29cd85 + {{ requestid }} """ CHANGE_MESSAGE_VISIBILITY_RESPONSE = """ - 6a7a282a-d013-4a59-aba9-335b0fa48bed + {{ requestid }} """ +CHANGE_MESSAGE_VISIBILITY_BATCH_RESPONSE = """ + + {% for success_id in success %} + + {{ success_id }} + + {% endfor %} + {% for error_dict in errors %} + + {{ error_dict['Id'] }} + {{ error_dict['Code'] }} + {{ error_dict['Message'] }} + {{ error_dict['SenderFault'] }} + + {% endfor %} + + + {{ request_id }} + +""" + PURGE_QUEUE_RESPONSE = """ - 6fde8d1e-52cd-4581-8cd9-c512f4c64223 + {{ requestid }} """ +LIST_DEAD_LETTER_SOURCE_QUEUES_RESPONSE = """ + + {% for queue in queues %} + {{ queue.url(request_url) }} + {% endfor %} + + + 8ffb921f-b85e-53d9-abcf-d8d0057f38fc + +""" + +ADD_PERMISSION_RESPONSE = """ + + {{ request_id }} + +""" + +REMOVE_PERMISSION_RESPONSE = """ + + {{ request_id }} + +""" + +TAG_QUEUE_RESPONSE = """ + + {{ request_id }} + +""" + +UNTAG_QUEUE_RESPONSE = """ + + {{ request_id }} + +""" + +LIST_QUEUE_TAGS_RESPONSE = """ + + {% for key, value in tags.items() %} + + {{ key }} + {{ value }} + + {% endfor %} + + + {{ request_id }} + +""" + ERROR_TOO_LONG_RESPONSE = """ Sender diff --git a/tests/test_sqs/test_sqs.py b/tests/test_sqs/test_sqs.py index 536261504..9d191d6e3 100644 --- a/tests/test_sqs/test_sqs.py +++ b/tests/test_sqs/test_sqs.py @@ -8,9 +8,12 @@ from botocore.exceptions import ClientError from boto.exception import SQSError from boto.sqs.message import RawMessage, Message +from freezegun import freeze_time import base64 +import json import sure # noqa import time +import uuid from moto import settings, mock_sqs, mock_sqs_deprecated from tests.helpers import requires_boto_gte @@ -93,8 +96,6 @@ def test_message_send_without_attributes(): msg.get('MD5OfMessageBody').should.equal( '58fd9edd83341c29f1aebba81c31e257') msg.shouldnt.have.key('MD5OfMessageAttributes') - msg.get('ResponseMetadata', {}).get('RequestId').should.equal( - '27daac76-34dd-47df-bd01-1f6e873584a0') msg.get('MessageId').should_not.contain(' \n') messages = queue.receive_messages() @@ -118,8 +119,6 @@ def test_message_send_with_attributes(): '58fd9edd83341c29f1aebba81c31e257') msg.get('MD5OfMessageAttributes').should.equal( '235c5c510d26fb653d073faed50ae77c') - msg.get('ResponseMetadata', {}).get('RequestId').should.equal( - '27daac76-34dd-47df-bd01-1f6e873584a0') msg.get('MessageId').should_not.contain(' \n') messages = queue.receive_messages() @@ -143,8 +142,6 @@ def test_message_with_complex_attributes(): '58fd9edd83341c29f1aebba81c31e257') msg.get('MD5OfMessageAttributes').should.equal( '8ae21a7957029ef04146b42aeaa18a22') - msg.get('ResponseMetadata', {}).get('RequestId').should.equal( - '27daac76-34dd-47df-bd01-1f6e873584a0') msg.get('MessageId').should_not.contain(' \n') messages = queue.receive_messages() @@ -755,3 +752,177 @@ def test_delete_message_after_visibility_timeout(): m1_retrieved.delete() assert new_queue.count() == 0 + + + + +@mock_sqs +def test_change_message_visibility(): + with freeze_time("2015-01-01 12:00:00"): + sqs = boto3.client('sqs', region_name='us-east-1') + resp = sqs.create_queue( + QueueName='test-dlr-queue.fifo', + Attributes={'FifoQueue': 'true'} + ) + queue_url = resp['QueueUrl'] + + sqs.send_message(QueueUrl=queue_url, MessageBody='msg1') + sqs.send_message(QueueUrl=queue_url, MessageBody='msg2') + sqs.send_message(QueueUrl=queue_url, MessageBody='msg3') + + with freeze_time("2015-01-01 12:01:00"): + receive_resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=2) + len(receive_resp['Messages']).should.equal(2) + + handles = [item['ReceiptHandle'] for item in receive_resp['Messages']] + entries = [{'Id': str(uuid.uuid4()), 'ReceiptHandle': handle, 'VisibilityTimeout': 43200} for handle in handles] + + resp = sqs.change_message_visibility_batch(QueueUrl=queue_url, Entries=entries) + len(resp['Successful']).should.equal(2) + + with freeze_time("2015-01-01 14:00:00"): + resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=3) + len(resp['Messages']).should.equal(1) + + with freeze_time("2015-01-01 16:00:00"): + resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=3) + len(resp['Messages']).should.equal(1) + + with freeze_time("2015-01-02 12:00:00"): + resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=3) + len(resp['Messages']).should.equal(3) + + +@mock_sqs +def test_permissions(): + client = boto3.client('sqs', region_name='us-east-1') + + resp = client.create_queue( + QueueName='test-dlr-queue.fifo', + Attributes={'FifoQueue': 'true'} + ) + queue_url = resp['QueueUrl'] + + client.add_permission(QueueUrl=queue_url, Label='account1', AWSAccountIds=['111111111111'], Actions=['*']) + client.add_permission(QueueUrl=queue_url, Label='account2', AWSAccountIds=['222211111111'], Actions=['SendMessage']) + + with assert_raises(ClientError): + client.add_permission(QueueUrl=queue_url, Label='account2', AWSAccountIds=['222211111111'], Actions=['SomeRubbish']) + + client.remove_permission(QueueUrl=queue_url, Label='account2') + + with assert_raises(ClientError): + client.remove_permission(QueueUrl=queue_url, Label='non_existant') + + +@mock_sqs +def test_tags(): + client = boto3.client('sqs', region_name='us-east-1') + + resp = client.create_queue( + QueueName='test-dlr-queue.fifo', + Attributes={'FifoQueue': 'true'} + ) + queue_url = resp['QueueUrl'] + + client.tag_queue( + QueueUrl=queue_url, + Tags={ + 'test1': 'value1', + 'test2': 'value2', + } + ) + + resp = client.list_queue_tags(QueueUrl=queue_url) + resp['Tags'].should.contain('test1') + resp['Tags'].should.contain('test2') + + client.untag_queue( + QueueUrl=queue_url, + TagKeys=['test2'] + ) + + resp = client.list_queue_tags(QueueUrl=queue_url) + resp['Tags'].should.contain('test1') + resp['Tags'].should_not.contain('test2') + + +@mock_sqs +def test_create_fifo_queue_with_dlq(): + sqs = boto3.client('sqs', region_name='us-east-1') + resp = sqs.create_queue( + QueueName='test-dlr-queue.fifo', + Attributes={'FifoQueue': 'true'} + ) + queue_url1 = resp['QueueUrl'] + queue_arn1 = sqs.get_queue_attributes(QueueUrl=queue_url1)['Attributes']['QueueArn'] + + resp = sqs.create_queue( + QueueName='test-dlr-queue', + Attributes={'FifoQueue': 'false'} + ) + queue_url2 = resp['QueueUrl'] + queue_arn2 = sqs.get_queue_attributes(QueueUrl=queue_url2)['Attributes']['QueueArn'] + + sqs.create_queue( + QueueName='test-queue.fifo', + Attributes={ + 'FifoQueue': 'true', + 'RedrivePolicy': json.dumps({'deadLetterTargetArn': queue_arn1, 'maxReceiveCount': 2}) + } + ) + + # Cant have fifo queue with non fifo DLQ + with assert_raises(ClientError): + sqs.create_queue( + QueueName='test-queue2.fifo', + Attributes={ + 'FifoQueue': 'true', + 'RedrivePolicy': json.dumps({'deadLetterTargetArn': queue_arn2, 'maxReceiveCount': 2}) + } + ) + + +@mock_sqs +def test_queue_with_dlq(): + sqs = boto3.client('sqs', region_name='us-east-1') + + with freeze_time("2015-01-01 12:00:00"): + resp = sqs.create_queue( + QueueName='test-dlr-queue.fifo', + Attributes={'FifoQueue': 'true'} + ) + queue_url1 = resp['QueueUrl'] + queue_arn1 = sqs.get_queue_attributes(QueueUrl=queue_url1)['Attributes']['QueueArn'] + + resp = sqs.create_queue( + QueueName='test-queue.fifo', + Attributes={ + 'FifoQueue': 'true', + 'RedrivePolicy': json.dumps({'deadLetterTargetArn': queue_arn1, 'maxReceiveCount': 2}) + } + ) + queue_url2 = resp['QueueUrl'] + + sqs.send_message(QueueUrl=queue_url2, MessageBody='msg1') + sqs.send_message(QueueUrl=queue_url2, MessageBody='msg2') + + with freeze_time("2015-01-01 13:00:00"): + resp = sqs.receive_message(QueueUrl=queue_url2, VisibilityTimeout=30, WaitTimeSeconds=0) + resp['Messages'][0]['Body'].should.equal('msg1') + + with freeze_time("2015-01-01 13:01:00"): + resp = sqs.receive_message(QueueUrl=queue_url2, VisibilityTimeout=30, WaitTimeSeconds=0) + resp['Messages'][0]['Body'].should.equal('msg1') + + with freeze_time("2015-01-01 13:02:00"): + resp = sqs.receive_message(QueueUrl=queue_url2, VisibilityTimeout=30, WaitTimeSeconds=0) + len(resp['Messages']).should.equal(1) + + resp = sqs.receive_message(QueueUrl=queue_url1, VisibilityTimeout=30, WaitTimeSeconds=0) + resp['Messages'][0]['Body'].should.equal('msg1') + + # Might as well test list source queues + + resp = sqs.list_dead_letter_source_queues(QueueUrl=queue_url1) + resp['queueUrls'][0].should.equal(queue_url2)