Fix retrieving inexistent queue errors when using boto3.

Handle 404 errors when trying to retrieve a SQS queue that does not
exist. Add get_queue and get_inexistent_queue tests for boto3.
This commit is contained in:
Nuno Santos 2016-06-01 16:39:06 +02:00
parent 1f618a3cb5
commit 8ffd4e51ec
2 changed files with 37 additions and 0 deletions

View File

@ -50,6 +50,12 @@ class SQSResponse(BaseResponse):
return visibility_timeout
def call_action(self):
status_code, headers, body = super(SQSResponse, self).call_action()
if status_code == 404:
return 404, headers, ERROR_INEXISTENT_QUEUE
return status_code, headers, body
def create_queue(self):
queue_name = self.querystring.get("QueueName")[0]
queue = self.sqs_backend.create_queue(queue_name, visibility_timeout=self.attribute.get('VisibilityTimeout'),
@ -438,3 +444,13 @@ ERROR_TOO_LONG_RESPONSE = """<ErrorResponse xmlns="http://queue.amazonaws.com/do
</ErrorResponse>"""
ERROR_MAX_VISIBILITY_TIMEOUT_RESPONSE = "Invalid request, maximum visibility timeout is {0}".format(MAXIMUM_VISIBILTY_TIMEOUT)
ERROR_INEXISTENT_QUEUE = """<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
<Error>
<Type>Sender</Type>
<Code>AWS.SimpleQueueService.NonExistentQueue</Code>
<Message>The specified queue does not exist for this wsdl version.</Message>
<Detail/>
</Error>
<RequestId>b8bc806b-fa6b-53b5-8be8-cfa2f9836bc3</RequestId>
</ErrorResponse>"""

View File

@ -1,6 +1,7 @@
from __future__ import unicode_literals
import boto
import boto3
import botocore.exceptions
from boto.exception import SQSError
from boto.sqs.message import RawMessage, Message
@ -500,6 +501,26 @@ boto3
"""
@mock_sqs
def test_boto3_get_queue():
sqs = boto3.resource('sqs', region_name='us-east-1')
new_queue = sqs.create_queue(QueueName='test-queue')
new_queue.should_not.be.none
new_queue.should.have.property('url').should.contain('test-queue')
queue = sqs.get_queue_by_name(QueueName='test-queue')
queue.attributes.get('QueueArn').should_not.be.none
queue.attributes.get('QueueArn').split(':')[-1].should.equal('test-queue')
queue.attributes.get('VisibilityTimeout').should_not.be.none
queue.attributes.get('VisibilityTimeout').should.equal('30')
@mock_sqs
def test_boto3_get_inexistent_queue():
sqs = boto3.resource('sqs', region_name='us-east-1')
sqs.get_queue_by_name.when.called_with(QueueName='nonexisting-queue').should.throw(botocore.exceptions.ClientError)
@mock_sqs
def test_boto3_message_send():
sqs = boto3.resource('sqs', region_name='us-east-1')