Add error handling for sqs.list_queue_tags

This commit is contained in:
gruebel 2019-10-17 22:38:16 +02:00
parent dbfb319def
commit 19a34ea57a
2 changed files with 25 additions and 1 deletions

View File

@ -474,7 +474,11 @@ class SQSResponse(BaseResponse):
def list_queue_tags(self): def list_queue_tags(self):
queue_name = self._get_queue_name() queue_name = self._get_queue_name()
queue = self.sqs_backend.get_queue(queue_name) try:
queue = self.sqs_backend.get_queue(queue_name)
except QueueDoesNotExist as e:
return self._error('AWS.SimpleQueueService.NonExistentQueue',
e.description)
template = self.response_template(LIST_QUEUE_TAGS_RESPONSE) template = self.response_template(LIST_QUEUE_TAGS_RESPONSE)
return template.render(tags=queue.tags) return template.render(tags=queue.tags)

View File

@ -1108,6 +1108,26 @@ def test_tags():
}) })
@mock_sqs
def test_list_queue_tags_errors():
client = boto3.client('sqs', region_name='us-east-1')
response = client.create_queue(
QueueName='test-queue-with-tags',
tags={
'tag_key_1': 'tag_value_X'
}
)
queue_url = response['QueueUrl']
client.list_queue_tags.when.called_with(
QueueUrl=queue_url + '-not-existing',
).should.throw(
ClientError,
'The specified queue does not exist for this wsdl version.'
)
@mock_sqs @mock_sqs
def test_tag_queue_errors(): def test_tag_queue_errors():
client = boto3.client('sqs', region_name='us-east-1') client = boto3.client('sqs', region_name='us-east-1')