diff --git a/moto/kms/models.py b/moto/kms/models.py index 8808565a5..a5adaede8 100644 --- a/moto/kms/models.py +++ b/moto/kms/models.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals import boto.kms from moto.core import BaseBackend from .utils import generate_key_id +from collections import defaultdict class Key(object): @@ -37,6 +38,7 @@ class KmsBackend(BaseBackend): def __init__(self): self.keys = {} + self.key_to_aliases = defaultdict(set) def create_key(self, policy, key_usage, description, region): key = Key(policy, key_usage, description, region) @@ -49,6 +51,23 @@ class KmsBackend(BaseBackend): def list_keys(self): return self.keys.values() + def alias_exists(self, alias_name): + for aliases in self.key_to_aliases.values(): + if alias_name in aliases: + return True + + return False + + def add_alias(self, target_key_id, alias_name): + self.key_to_aliases[target_key_id].add(alias_name) + + def delete_alias(self, alias_name): + for aliases in self.key_to_aliases.values(): + aliases.remove(alias_name) + + def get_all_aliases(self): + return self.key_to_aliases + kms_backends = {} for region in boto.kms.regions(): kms_backends[region.name] = KmsBackend() diff --git a/moto/kms/responses.py b/moto/kms/responses.py index 647594e6b..0c4563f07 100644 --- a/moto/kms/responses.py +++ b/moto/kms/responses.py @@ -1,10 +1,20 @@ from __future__ import unicode_literals import json +import re + +from boto.exception import JSONResponseError +from boto.kms.exceptions import AlreadyExistsException, NotFoundException from moto.core.responses import BaseResponse from .models import kms_backends +reserved_aliases = [ + 'alias/aws/ebs', + 'alias/aws/s3', + 'alias/aws/redshift', + 'alias/aws/rds', +] class KmsResponse(BaseResponse): @@ -46,3 +56,83 @@ class KmsResponse(BaseResponse): "NextMarker": None, "Truncated": False, }) + + def create_alias(self): + alias_name = self.parameters['AliasName'] + target_key_id = self.parameters['TargetKeyId'] + region = self.region + + if not alias_name.startswith('alias/'): + raise JSONResponseError(400, 'Bad Request', + body={'message': 'Invalid identifier', '__type': 'ValidationException'}) + + if alias_name in reserved_aliases: + raise JSONResponseError(400, 'Bad Request', body={'__type': 'NotAuthorizedException'}) + + if ':' in alias_name: + raise JSONResponseError(400, 'Bad Request', body={ + 'message': '{alias_name} contains invalid characters for an alias'.format(**locals()), + '__type': 'ValidationException'}) + + if not re.match(r'^[a-zA-Z0-9:/_-]+$', alias_name): + raise JSONResponseError(400, 'Bad Request', body={ + 'message': "1 validation error detected: Value '{alias_name}' at 'aliasName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9:/_-]+$" + .format(**locals()), + '__type': 'ValidationException'}) + + if self.kms_backend.alias_exists(target_key_id): + raise JSONResponseError(400, 'Bad Request', body={ + 'message': 'Aliases must refer to keys. Not aliases', + '__type': 'ValidationException'}) + + if self.kms_backend.alias_exists(alias_name): + raise AlreadyExistsException(400, 'Bad Request', body={ + 'message': 'An alias with the name arn:aws:kms:{region}:012345678912:{alias_name} already exists' + .format(**locals()), '__type': 'AlreadyExistsException'}) + + self.kms_backend.add_alias(target_key_id, alias_name) + + return json.dumps(None) + + def delete_alias(self): + alias_name = self.parameters['AliasName'] + region = self.region + + if not alias_name.startswith('alias/'): + raise JSONResponseError(400, 'Bad Request', + body={'message': 'Invalid identifier', '__type': 'ValidationException'}) + + if not self.kms_backend.alias_exists(alias_name): + raise NotFoundException(400, 'Bad Request', body={ + 'message': 'Alias arn:aws:kms:{region}:012345678912:{alias_name} is not found.'.format(**locals()), + '__type': 'NotFoundException'}) + + self.kms_backend.delete_alias(alias_name) + + return json.dumps(None) + + def list_aliases(self): + region = self.region + + response_aliases = [ + { + 'AliasArn': u'arn:aws:kms:{region}:012345678912:{reserved_alias}'.format(region=region, + reserved_alias=reserved_alias), + 'AliasName': reserved_alias + } for reserved_alias in reserved_aliases + ] + + backend_aliases = self.kms_backend.get_all_aliases() + for target_key_id, aliases in backend_aliases.items(): + for alias_name in aliases: + response_aliases.append({ + 'AliasArn': u'arn:aws:kms:{region}:012345678912:{alias_name}'.format(region=region, + alias_name=alias_name), + 'AliasName': alias_name, + 'TargetKeyId': target_key_id, + }) + + return json.dumps({ + 'Truncated': False, + 'Aliases': response_aliases, + }) diff --git a/tests/test_kms/test_kms.py b/tests/test_kms/test_kms.py index f913d840d..b68d9538f 100644 --- a/tests/test_kms/test_kms.py +++ b/tests/test_kms/test_kms.py @@ -1,11 +1,12 @@ from __future__ import unicode_literals +import re import boto.kms from boto.exception import JSONResponseError +from boto.kms.exceptions import AlreadyExistsException, NotFoundException import sure # noqa - from moto import mock_kms - +from nose.tools import assert_raises @mock_kms def test_create_key(): @@ -44,3 +45,271 @@ def test_list_keys(): keys = conn.list_keys() keys['Keys'].should.have.length_of(2) + + +@mock_kms +def test__create_alias__returns_none_if_correct(): + kms = boto.connect_kms() + create_resp = kms.create_key() + key_id = create_resp['KeyMetadata']['KeyId'] + + resp = kms.create_alias('alias/my-alias', key_id) + + resp.should.be.none + + +@mock_kms +def test__create_alias__raises_if_reserved_alias(): + kms = boto.connect_kms() + create_resp = kms.create_key() + key_id = create_resp['KeyMetadata']['KeyId'] + + reserved_aliases = [ + 'alias/aws/ebs', + 'alias/aws/s3', + 'alias/aws/redshift', + 'alias/aws/rds', + ] + + for alias_name in reserved_aliases: + with assert_raises(JSONResponseError) as err: + kms.create_alias(alias_name, key_id) + + ex = err.exception + ex.error_message.should.be.none + ex.error_code.should.equal('NotAuthorizedException') + ex.body.should.equal({'__type': 'NotAuthorizedException'}) + ex.reason.should.equal('Bad Request') + ex.status.should.equal(400) + + +@mock_kms +def test__create_alias__can_create_multiple_aliases_for_same_key_id(): + kms = boto.connect_kms() + create_resp = kms.create_key() + key_id = create_resp['KeyMetadata']['KeyId'] + + kms.create_alias('alias/my-alias3', key_id).should.be.none + kms.create_alias('alias/my-alias4', key_id).should.be.none + kms.create_alias('alias/my-alias5', key_id).should.be.none + + +@mock_kms +def test__create_alias__raises_if_wrong_prefix(): + kms = boto.connect_kms() + create_resp = kms.create_key() + key_id = create_resp['KeyMetadata']['KeyId'] + + with assert_raises(JSONResponseError) as err: + kms.create_alias('wrongprefix/my-alias', key_id) + + ex = err.exception + ex.error_message.should.equal('Invalid identifier') + ex.error_code.should.equal('ValidationException') + ex.body.should.equal({'message': 'Invalid identifier', '__type': 'ValidationException'}) + ex.reason.should.equal('Bad Request') + ex.status.should.equal(400) + + +@mock_kms +def test__create_alias__raises_if_duplicate(): + region = 'us-west-2' + kms = boto.kms.connect_to_region(region) + create_resp = kms.create_key() + key_id = create_resp['KeyMetadata']['KeyId'] + alias = 'alias/my-alias' + + kms.create_alias(alias, key_id) + + with assert_raises(AlreadyExistsException) as err: + kms.create_alias(alias, key_id) + + ex = err.exception + ex.error_message.should.match(r'An alias with the name arn:aws:kms:{region}:\d{{12}}:{alias} already exists' + .format(**locals())) + ex.error_code.should.be.none + ex.box_usage.should.be.none + ex.request_id.should.be.none + ex.body['message'].should.match(r'An alias with the name arn:aws:kms:{region}:\d{{12}}:{alias} already exists' + .format(**locals())) + ex.body['__type'].should.equal('AlreadyExistsException') + ex.reason.should.equal('Bad Request') + ex.status.should.equal(400) + + +@mock_kms +def test__create_alias__raises_if_alias_has_restricted_characters(): + kms = boto.connect_kms() + create_resp = kms.create_key() + key_id = create_resp['KeyMetadata']['KeyId'] + + alias_names_with_restricted_characters = [ + 'alias/my-alias!', + 'alias/my-alias$', + 'alias/my-alias@', + ] + + for alias_name in alias_names_with_restricted_characters: + with assert_raises(JSONResponseError) as err: + kms.create_alias(alias_name, key_id) + ex = err.exception + ex.body['__type'].should.equal('ValidationException') + ex.body['message'].should.equal("1 validation error detected: Value '{alias_name}' at 'aliasName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9:/_-]+$".format(**locals())) + ex.error_code.should.equal('ValidationException') + ex.message.should.equal("1 validation error detected: Value '{alias_name}' at 'aliasName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9:/_-]+$".format(**locals())) + ex.reason.should.equal('Bad Request') + ex.status.should.equal(400) + + +@mock_kms +def test__create_alias__raises_if_alias_has_colon_character(): + # For some reason, colons are not accepted for an alias, even though they are accepted by regex ^[a-zA-Z0-9:/_-]+$ + kms = boto.connect_kms() + create_resp = kms.create_key() + key_id = create_resp['KeyMetadata']['KeyId'] + + alias_names_with_restricted_characters = [ + 'alias/my:alias', + ] + + for alias_name in alias_names_with_restricted_characters: + with assert_raises(JSONResponseError) as err: + kms.create_alias(alias_name, key_id) + ex = err.exception + ex.body['__type'].should.equal('ValidationException') + ex.body['message'].should.equal("{alias_name} contains invalid characters for an alias".format(**locals())) + ex.error_code.should.equal('ValidationException') + ex.message.should.equal("{alias_name} contains invalid characters for an alias".format(**locals())) + ex.reason.should.equal('Bad Request') + ex.status.should.equal(400) + + +@mock_kms +def test__create_alias__accepted_characters(): + kms = boto.connect_kms() + create_resp = kms.create_key() + key_id = create_resp['KeyMetadata']['KeyId'] + + alias_names_with_accepted_characters = [ + 'alias/my-alias_/', + 'alias/my_alias-/', + ] + + for alias_name in alias_names_with_accepted_characters: + kms.create_alias(alias_name, key_id) + + +@mock_kms +def test__create_alias__raises_if_target_key_id_is_existing_alias(): + kms = boto.connect_kms() + create_resp = kms.create_key() + key_id = create_resp['KeyMetadata']['KeyId'] + alias = 'alias/my-alias' + + kms.create_alias(alias, key_id) + + with assert_raises(JSONResponseError) as err: + kms.create_alias(alias, alias) + + ex = err.exception + ex.body['__type'].should.equal('ValidationException') + ex.body['message'].should.equal('Aliases must refer to keys. Not aliases') + ex.error_code.should.equal('ValidationException') + ex.message.should.equal('Aliases must refer to keys. Not aliases') + ex.reason.should.equal('Bad Request') + ex.status.should.equal(400) + + +@mock_kms +def test__delete_alias(): + kms = boto.connect_kms() + create_resp = kms.create_key() + key_id = create_resp['KeyMetadata']['KeyId'] + alias = 'alias/my-alias' + + kms.create_alias(alias, key_id) + + resp = kms.delete_alias(alias) + + resp.should.be.none + + # we can create the alias again, since it has been deleted + kms.create_alias(alias, key_id) + + +@mock_kms +def test__delete_alias__raises_if_wrong_prefix(): + kms = boto.connect_kms() + + with assert_raises(JSONResponseError) as err: + kms.delete_alias('wrongprefix/my-alias') + + ex = err.exception + ex.body['__type'].should.equal('ValidationException') + ex.body['message'].should.equal('Invalid identifier') + ex.error_code.should.equal('ValidationException') + ex.message.should.equal('Invalid identifier') + ex.reason.should.equal('Bad Request') + ex.status.should.equal(400) + + +@mock_kms +def test__delete_alias__raises_if_alias_is_not_found(): + region = 'us-west-2' + kms = boto.kms.connect_to_region(region) + alias_name = 'alias/unexisting-alias' + + with assert_raises(NotFoundException) as err: + kms.delete_alias(alias_name) + + ex = err.exception + ex.body['__type'].should.equal('NotFoundException') + ex.body['message'].should.match(r'Alias arn:aws:kms:{region}:\d{{12}}:{alias_name} is not found.'.format(**locals())) + ex.box_usage.should.be.none + ex.error_code.should.be.none + ex.message.should.match(r'Alias arn:aws:kms:{region}:\d{{12}}:{alias_name} is not found.'.format(**locals())) + ex.reason.should.equal('Bad Request') + ex.request_id.should.be.none + ex.status.should.equal(400) + + +@mock_kms +def test__list_aliases(): + region = "eu-west-1" + kms = boto.kms.connect_to_region(region) + + create_resp = kms.create_key() + key_id = create_resp['KeyMetadata']['KeyId'] + kms.create_alias('alias/my-alias1', key_id) + kms.create_alias('alias/my-alias2', key_id) + kms.create_alias('alias/my-alias3', key_id) + + resp = kms.list_aliases() + + resp['Truncated'].should.be.false + + aliases = resp['Aliases'] + + def has_correct_arn(alias_obj): + alias_name = alias_obj['AliasName'] + alias_arn = alias_obj['AliasArn'] + return re.match(r'arn:aws:kms:{region}:\d{{12}}:{alias_name}'.format(region=region, alias_name=alias_name), + alias_arn) + + len([alias for alias in aliases if + has_correct_arn(alias) and 'alias/aws/ebs' == alias['AliasName']]).should.equal(1) + len([alias for alias in aliases if + has_correct_arn(alias) and 'alias/aws/rds' == alias['AliasName']]).should.equal(1) + len([alias for alias in aliases if + has_correct_arn(alias) and 'alias/aws/redshift' == alias['AliasName']]).should.equal(1) + len([alias for alias in aliases if + has_correct_arn(alias) and 'alias/aws/s3' == alias['AliasName']]).should.equal(1) + + len([alias for alias in aliases if + has_correct_arn(alias) and 'alias/my-alias1' == alias['AliasName']]).should.equal(1) + len([alias for alias in aliases if + has_correct_arn(alias) and 'alias/my-alias2' == alias['AliasName']]).should.equal(1) + + len([alias for alias in aliases if 'TargetKeyId' in alias and key_id == alias['TargetKeyId']]).should.equal(3) + + len(aliases).should.equal(7)