From 4a286c4bc288933bb023396e2784a6fdbb966bc9 Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Fri, 26 Apr 2019 20:52:24 +0100 Subject: [PATCH] KMS generate_data_key (#2071) * Added KMS.generate_data_key and KMS.generate_date_key_without_plaintext Increase test coverage to cover Key not found * Added test for kms.put_key_policy key not found --- moto/kms/exceptions.py | 36 ++++++ moto/kms/models.py | 42 ++++--- moto/kms/responses.py | 153 +++++++++++++++---------- tests/test_kms/test_kms.py | 227 +++++++++++++++++++++++++++++++++++-- 4 files changed, 370 insertions(+), 88 deletions(-) create mode 100644 moto/kms/exceptions.py diff --git a/moto/kms/exceptions.py b/moto/kms/exceptions.py new file mode 100644 index 000000000..70edd3dcd --- /dev/null +++ b/moto/kms/exceptions.py @@ -0,0 +1,36 @@ +from __future__ import unicode_literals +from moto.core.exceptions import JsonRESTError + + +class NotFoundException(JsonRESTError): + code = 400 + + def __init__(self, message): + super(NotFoundException, self).__init__( + "NotFoundException", message) + + +class ValidationException(JsonRESTError): + code = 400 + + def __init__(self, message): + super(ValidationException, self).__init__( + "ValidationException", message) + + +class AlreadyExistsException(JsonRESTError): + code = 400 + + def __init__(self, message): + super(AlreadyExistsException, self).__init__( + "AlreadyExistsException", message) + + +class NotAuthorizedException(JsonRESTError): + code = 400 + + def __init__(self): + super(NotAuthorizedException, self).__init__( + "NotAuthorizedException", None) + + self.description = '{"__type":"NotAuthorizedException"}' diff --git a/moto/kms/models.py b/moto/kms/models.py index 9fbb2b587..b49e9dd09 100644 --- a/moto/kms/models.py +++ b/moto/kms/models.py @@ -1,5 +1,6 @@ from __future__ import unicode_literals +import os import boto.kms from moto.core import BaseBackend, BaseModel from moto.core.utils import iso_8601_datetime_without_milliseconds @@ -159,27 +160,38 @@ class KmsBackend(BaseBackend): return self.keys[self.get_key_id(key_id)].policy def disable_key(self, key_id): - if key_id in self.keys: - self.keys[key_id].enabled = False - self.keys[key_id].key_state = 'Disabled' + self.keys[key_id].enabled = False + self.keys[key_id].key_state = 'Disabled' def enable_key(self, key_id): - if key_id in self.keys: - self.keys[key_id].enabled = True - self.keys[key_id].key_state = 'Enabled' + self.keys[key_id].enabled = True + self.keys[key_id].key_state = 'Enabled' def cancel_key_deletion(self, key_id): - if key_id in self.keys: - self.keys[key_id].key_state = 'Disabled' - self.keys[key_id].deletion_date = None + self.keys[key_id].key_state = 'Disabled' + self.keys[key_id].deletion_date = None def schedule_key_deletion(self, key_id, pending_window_in_days): - if key_id in self.keys: - if 7 <= pending_window_in_days <= 30: - self.keys[key_id].enabled = False - self.keys[key_id].key_state = 'PendingDeletion' - self.keys[key_id].deletion_date = datetime.now() + timedelta(days=pending_window_in_days) - return iso_8601_datetime_without_milliseconds(self.keys[key_id].deletion_date) + if 7 <= pending_window_in_days <= 30: + self.keys[key_id].enabled = False + self.keys[key_id].key_state = 'PendingDeletion' + self.keys[key_id].deletion_date = datetime.now() + timedelta(days=pending_window_in_days) + return iso_8601_datetime_without_milliseconds(self.keys[key_id].deletion_date) + + def generate_data_key(self, key_id, encryption_context, number_of_bytes, key_spec, grant_tokens): + key = self.keys[self.get_key_id(key_id)] + + if key_spec: + if key_spec == 'AES_128': + bytes = 16 + else: + bytes = 32 + else: + bytes = number_of_bytes + + plaintext = os.urandom(bytes) + + return plaintext, key.arn kms_backends = {} diff --git a/moto/kms/responses.py b/moto/kms/responses.py index ed6accc78..92195ed6b 100644 --- a/moto/kms/responses.py +++ b/moto/kms/responses.py @@ -5,11 +5,9 @@ import json import re import six -from boto.exception import JSONResponseError -from boto.kms.exceptions import AlreadyExistsException, NotFoundException - from moto.core.responses import BaseResponse from .models import kms_backends +from .exceptions import NotFoundException, ValidationException, AlreadyExistsException, NotAuthorizedException reserved_aliases = [ 'alias/aws/ebs', @@ -88,36 +86,28 @@ class KmsResponse(BaseResponse): def create_alias(self): alias_name = self.parameters['AliasName'] target_key_id = self.parameters['TargetKeyId'] - region = self.region if not alias_name.startswith('alias/'): - raise JSONResponseError(400, 'Bad Request', - body={'message': 'Invalid identifier', '__type': 'ValidationException'}) + raise ValidationException('Invalid identifier') if alias_name in reserved_aliases: - raise JSONResponseError(400, 'Bad Request', body={ - '__type': 'NotAuthorizedException'}) + raise NotAuthorizedException() if ':' in alias_name: - raise JSONResponseError(400, 'Bad Request', body={ - 'message': '{alias_name} contains invalid characters for an alias'.format(**locals()), - '__type': 'ValidationException'}) + raise ValidationException('{alias_name} contains invalid characters for an alias'.format(alias_name=alias_name)) if not re.match(r'^[a-zA-Z0-9:/_-]+$', alias_name): - raise JSONResponseError(400, 'Bad Request', body={ - 'message': "1 validation error detected: Value '{alias_name}' at 'aliasName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9:/_-]+$" - .format(**locals()), - '__type': 'ValidationException'}) + raise ValidationException("1 validation error detected: Value '{alias_name}' at 'aliasName' " + "failed to satisfy constraint: Member must satisfy regular " + "expression pattern: ^[a-zA-Z0-9:/_-]+$" + .format(alias_name=alias_name)) if self.kms_backend.alias_exists(target_key_id): - raise JSONResponseError(400, 'Bad Request', body={ - 'message': 'Aliases must refer to keys. Not aliases', - '__type': 'ValidationException'}) + raise ValidationException('Aliases must refer to keys. Not aliases') if self.kms_backend.alias_exists(alias_name): - raise AlreadyExistsException(400, 'Bad Request', body={ - 'message': 'An alias with the name arn:aws:kms:{region}:012345678912:{alias_name} already exists' - .format(**locals()), '__type': 'AlreadyExistsException'}) + raise AlreadyExistsException('An alias with the name arn:aws:kms:{region}:012345678912:{alias_name} ' + 'already exists'.format(region=self.region, alias_name=alias_name)) self.kms_backend.add_alias(target_key_id, alias_name) @@ -125,16 +115,13 @@ class KmsResponse(BaseResponse): def delete_alias(self): alias_name = self.parameters['AliasName'] - region = self.region if not alias_name.startswith('alias/'): - raise JSONResponseError(400, 'Bad Request', - body={'message': 'Invalid identifier', '__type': 'ValidationException'}) + raise ValidationException('Invalid identifier') if not self.kms_backend.alias_exists(alias_name): - raise NotFoundException(400, 'Bad Request', body={ - 'message': 'Alias arn:aws:kms:{region}:012345678912:{alias_name} is not found.'.format(**locals()), - '__type': 'NotFoundException'}) + raise NotFoundException('Alias arn:aws:kms:{region}:012345678912:' + '{alias_name} is not found.'.format(region=self.region, alias_name=alias_name)) self.kms_backend.delete_alias(alias_name) @@ -172,9 +159,8 @@ class KmsResponse(BaseResponse): try: self.kms_backend.enable_key_rotation(key_id) except KeyError: - raise JSONResponseError(404, 'Not Found', body={ - 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), - '__type': 'NotFoundException'}) + raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/" + "{key_id}' does not exist".format(region=self.region, key_id=key_id)) return json.dumps(None) @@ -184,9 +170,8 @@ class KmsResponse(BaseResponse): try: self.kms_backend.disable_key_rotation(key_id) except KeyError: - raise JSONResponseError(404, 'Not Found', body={ - 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), - '__type': 'NotFoundException'}) + raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/" + "{key_id}' does not exist".format(region=self.region, key_id=key_id)) return json.dumps(None) def get_key_rotation_status(self): @@ -195,9 +180,8 @@ class KmsResponse(BaseResponse): try: rotation_enabled = self.kms_backend.get_key_rotation_status(key_id) except KeyError: - raise JSONResponseError(404, 'Not Found', body={ - 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), - '__type': 'NotFoundException'}) + raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/" + "{key_id}' does not exist".format(region=self.region, key_id=key_id)) return json.dumps({'KeyRotationEnabled': rotation_enabled}) def put_key_policy(self): @@ -210,9 +194,8 @@ class KmsResponse(BaseResponse): try: self.kms_backend.put_key_policy(key_id, policy) except KeyError: - raise JSONResponseError(404, 'Not Found', body={ - 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), - '__type': 'NotFoundException'}) + raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/" + "{key_id}' does not exist".format(region=self.region, key_id=key_id)) return json.dumps(None) @@ -225,9 +208,8 @@ class KmsResponse(BaseResponse): try: return json.dumps({'Policy': self.kms_backend.get_key_policy(key_id)}) except KeyError: - raise JSONResponseError(404, 'Not Found', body={ - 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), - '__type': 'NotFoundException'}) + raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/" + "{key_id}' does not exist".format(region=self.region, key_id=key_id)) def list_key_policies(self): key_id = self.parameters.get('KeyId') @@ -235,9 +217,8 @@ class KmsResponse(BaseResponse): try: self.kms_backend.describe_key(key_id) except KeyError: - raise JSONResponseError(404, 'Not Found', body={ - 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), - '__type': 'NotFoundException'}) + raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/" + "{key_id}' does not exist".format(region=self.region, key_id=key_id)) return json.dumps({'Truncated': False, 'PolicyNames': ['default']}) @@ -252,8 +233,14 @@ class KmsResponse(BaseResponse): return json.dumps({"CiphertextBlob": base64.b64encode(value).decode("utf-8"), 'KeyId': 'key_id'}) def decrypt(self): + # TODO refuse decode if EncryptionContext is not the same as when it was encrypted / generated + value = self.parameters.get("CiphertextBlob") - return json.dumps({"Plaintext": base64.b64decode(value).decode("utf-8"), 'KeyId': 'key_id'}) + try: + return json.dumps({"Plaintext": base64.b64decode(value).decode("utf-8")}) + except UnicodeDecodeError: + # Generate data key will produce random bytes which when decrypted is still returned as base64 + return json.dumps({"Plaintext": value}) def disable_key(self): key_id = self.parameters.get('KeyId') @@ -261,9 +248,8 @@ class KmsResponse(BaseResponse): try: self.kms_backend.disable_key(key_id) except KeyError: - raise JSONResponseError(404, 'Not Found', body={ - 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), - '__type': 'NotFoundException'}) + raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/" + "{key_id}' does not exist".format(region=self.region, key_id=key_id)) return json.dumps(None) def enable_key(self): @@ -272,9 +258,8 @@ class KmsResponse(BaseResponse): try: self.kms_backend.enable_key(key_id) except KeyError: - raise JSONResponseError(404, 'Not Found', body={ - 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), - '__type': 'NotFoundException'}) + raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/" + "{key_id}' does not exist".format(region=self.region, key_id=key_id)) return json.dumps(None) def cancel_key_deletion(self): @@ -283,9 +268,8 @@ class KmsResponse(BaseResponse): try: self.kms_backend.cancel_key_deletion(key_id) except KeyError: - raise JSONResponseError(404, 'Not Found', body={ - 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), - '__type': 'NotFoundException'}) + raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/" + "{key_id}' does not exist".format(region=self.region, key_id=key_id)) return json.dumps({'KeyId': key_id}) def schedule_key_deletion(self): @@ -301,19 +285,62 @@ class KmsResponse(BaseResponse): 'DeletionDate': self.kms_backend.schedule_key_deletion(key_id, pending_window_in_days) }) except KeyError: - raise JSONResponseError(404, 'Not Found', body={ - 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), - '__type': 'NotFoundException'}) + raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/" + "{key_id}' does not exist".format(region=self.region, key_id=key_id)) + + def generate_data_key(self): + key_id = self.parameters.get('KeyId') + encryption_context = self.parameters.get('EncryptionContext') + number_of_bytes = self.parameters.get('NumberOfBytes') + key_spec = self.parameters.get('KeySpec') + grant_tokens = self.parameters.get('GrantTokens') + + # Param validation + if key_id.startswith('alias'): + if self.kms_backend.get_key_id_from_alias(key_id) is None: + raise NotFoundException('Alias arn:aws:kms:{region}:012345678912:{alias_name} is not found.'.format( + region=self.region, alias_name=key_id)) + else: + if self.kms_backend.get_key_id(key_id) not in self.kms_backend.keys: + raise NotFoundException('Invalid keyId') + + if number_of_bytes and (number_of_bytes > 1024 or number_of_bytes < 0): + raise ValidationException("1 validation error detected: Value '2048' at 'numberOfBytes' failed " + "to satisfy constraint: Member must have value less than or " + "equal to 1024") + + if key_spec and key_spec not in ('AES_256', 'AES_128'): + raise ValidationException("1 validation error detected: Value 'AES_257' at 'keySpec' failed " + "to satisfy constraint: Member must satisfy enum value set: " + "[AES_256, AES_128]") + if not key_spec and not number_of_bytes: + raise ValidationException("Please specify either number of bytes or key spec.") + if key_spec and number_of_bytes: + raise ValidationException("Please specify either number of bytes or key spec.") + + plaintext, key_arn = self.kms_backend.generate_data_key(key_id, encryption_context, + number_of_bytes, key_spec, grant_tokens) + + plaintext = base64.b64encode(plaintext).decode() + + return json.dumps({ + 'CiphertextBlob': plaintext, + 'Plaintext': plaintext, + 'KeyId': key_arn # not alias + }) + + def generate_data_key_without_plaintext(self): + result = json.loads(self.generate_data_key()) + del result['Plaintext'] + + return json.dumps(result) def _assert_valid_key_id(key_id): if not re.match(r'^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$', key_id, re.IGNORECASE): - raise JSONResponseError(404, 'Not Found', body={ - 'message': ' Invalid keyId', '__type': 'NotFoundException'}) + raise NotFoundException('Invalid keyId') def _assert_default_policy(policy_name): if policy_name != 'default': - raise JSONResponseError(404, 'Not Found', body={ - 'message': "No such policy exists", - '__type': 'NotFoundException'}) + raise NotFoundException("No such policy exists") diff --git a/tests/test_kms/test_kms.py b/tests/test_kms/test_kms.py index 79f95fa1b..e7ce9f74b 100644 --- a/tests/test_kms/test_kms.py +++ b/tests/test_kms/test_kms.py @@ -2,8 +2,11 @@ from __future__ import unicode_literals import os, re import boto3 import boto.kms +import botocore.exceptions from boto.exception import JSONResponseError from boto.kms.exceptions import AlreadyExistsException, NotFoundException + +from moto.kms.exceptions import NotFoundException as MotoNotFoundException import sure # noqa from moto import mock_kms, mock_kms_deprecated from nose.tools import assert_raises @@ -127,7 +130,7 @@ def test_enable_key_rotation_via_arn(): def test_enable_key_rotation_with_missing_key(): conn = boto.kms.connect_to_region("us-west-2") conn.enable_key_rotation.when.called_with( - "not-a-key").should.throw(JSONResponseError) + "not-a-key").should.throw(NotFoundException) @mock_kms_deprecated @@ -142,7 +145,7 @@ def test_enable_key_rotation_with_alias_name_should_fail(): alias_key['KeyMetadata']['Arn'].should.equal(key['KeyMetadata']['Arn']) conn.enable_key_rotation.when.called_with( - 'alias/my-alias').should.throw(JSONResponseError) + 'alias/my-alias').should.throw(NotFoundException) @mock_kms_deprecated @@ -179,21 +182,20 @@ def test_decrypt(): conn = boto.kms.connect_to_region('us-west-2') response = conn.decrypt('ZW5jcnlwdG1l'.encode('utf-8')) response['Plaintext'].should.equal(b'encryptme') - response['KeyId'].should.equal('key_id') @mock_kms_deprecated def test_disable_key_rotation_with_missing_key(): conn = boto.kms.connect_to_region("us-west-2") conn.disable_key_rotation.when.called_with( - "not-a-key").should.throw(JSONResponseError) + "not-a-key").should.throw(NotFoundException) @mock_kms_deprecated def test_get_key_rotation_status_with_missing_key(): conn = boto.kms.connect_to_region("us-west-2") conn.get_key_rotation_status.when.called_with( - "not-a-key").should.throw(JSONResponseError) + "not-a-key").should.throw(NotFoundException) @mock_kms_deprecated @@ -279,7 +281,7 @@ def test_put_key_policy_via_alias_should_not_update(): target_key_id=key['KeyMetadata']['KeyId']) conn.put_key_policy.when.called_with( - 'alias/my-key-alias', 'default', 'new policy').should.throw(JSONResponseError) + 'alias/my-key-alias', 'default', 'new policy').should.throw(NotFoundException) policy = conn.get_key_policy(key['KeyMetadata']['KeyId'], 'default') policy['Policy'].should.equal('my policy') @@ -599,9 +601,9 @@ def test__assert_valid_key_id(): import uuid _assert_valid_key_id.when.called_with( - "not-a-key").should.throw(JSONResponseError) + "not-a-key").should.throw(MotoNotFoundException) _assert_valid_key_id.when.called_with( - str(uuid.uuid4())).should_not.throw(JSONResponseError) + str(uuid.uuid4())).should_not.throw(MotoNotFoundException) @mock_kms_deprecated @@ -609,9 +611,9 @@ def test__assert_default_policy(): from moto.kms.responses import _assert_default_policy _assert_default_policy.when.called_with( - "not-default").should.throw(JSONResponseError) + "not-default").should.throw(MotoNotFoundException) _assert_default_policy.when.called_with( - "default").should_not.throw(JSONResponseError) + "default").should_not.throw(MotoNotFoundException) @mock_kms @@ -775,3 +777,208 @@ def test_list_resource_tags(): response = client.list_resource_tags(KeyId=keyid) assert response['Tags'][0]['TagKey'] == 'string' assert response['Tags'][0]['TagValue'] == 'string' + + +@mock_kms +def test_generate_data_key_sizes(): + client = boto3.client('kms', region_name='us-east-1') + key = client.create_key(Description='generate-data-key-size') + + resp1 = client.generate_data_key( + KeyId=key['KeyMetadata']['KeyId'], + KeySpec='AES_256' + ) + resp2 = client.generate_data_key( + KeyId=key['KeyMetadata']['KeyId'], + KeySpec='AES_128' + ) + resp3 = client.generate_data_key( + KeyId=key['KeyMetadata']['KeyId'], + NumberOfBytes=64 + ) + + assert len(resp1['Plaintext']) == 32 + assert len(resp2['Plaintext']) == 16 + assert len(resp3['Plaintext']) == 64 + + +@mock_kms +def test_generate_data_key_decrypt(): + client = boto3.client('kms', region_name='us-east-1') + key = client.create_key(Description='generate-data-key-decrypt') + + resp1 = client.generate_data_key( + KeyId=key['KeyMetadata']['KeyId'], + KeySpec='AES_256' + ) + resp2 = client.decrypt( + CiphertextBlob=resp1['CiphertextBlob'] + ) + + assert resp1['Plaintext'] == resp2['Plaintext'] + + +@mock_kms +def test_generate_data_key_invalid_size_params(): + client = boto3.client('kms', region_name='us-east-1') + key = client.create_key(Description='generate-data-key-size') + + with assert_raises(botocore.exceptions.ClientError) as err: + client.generate_data_key( + KeyId=key['KeyMetadata']['KeyId'], + KeySpec='AES_257' + ) + + with assert_raises(botocore.exceptions.ClientError) as err: + client.generate_data_key( + KeyId=key['KeyMetadata']['KeyId'], + KeySpec='AES_128', + NumberOfBytes=16 + ) + + with assert_raises(botocore.exceptions.ClientError) as err: + client.generate_data_key( + KeyId=key['KeyMetadata']['KeyId'], + NumberOfBytes=2048 + ) + + with assert_raises(botocore.exceptions.ClientError) as err: + client.generate_data_key( + KeyId=key['KeyMetadata']['KeyId'] + ) + + +@mock_kms +def test_generate_data_key_invalid_key(): + client = boto3.client('kms', region_name='us-east-1') + key = client.create_key(Description='generate-data-key-size') + + with assert_raises(client.exceptions.NotFoundException): + client.generate_data_key( + KeyId='alias/randomnonexistantkey', + KeySpec='AES_256' + ) + + with assert_raises(client.exceptions.NotFoundException): + client.generate_data_key( + KeyId=key['KeyMetadata']['KeyId'] + '4', + KeySpec='AES_256' + ) + + +@mock_kms +def test_generate_data_key_without_plaintext_decrypt(): + client = boto3.client('kms', region_name='us-east-1') + key = client.create_key(Description='generate-data-key-decrypt') + + resp1 = client.generate_data_key_without_plaintext( + KeyId=key['KeyMetadata']['KeyId'], + KeySpec='AES_256' + ) + + assert 'Plaintext' not in resp1 + + +@mock_kms +def test_enable_key_rotation_key_not_found(): + client = boto3.client('kms', region_name='us-east-1') + + with assert_raises(client.exceptions.NotFoundException): + client.enable_key_rotation( + KeyId='12366f9b-1230-123d-123e-123e6ae60c02' + ) + + +@mock_kms +def test_disable_key_rotation_key_not_found(): + client = boto3.client('kms', region_name='us-east-1') + + with assert_raises(client.exceptions.NotFoundException): + client.disable_key_rotation( + KeyId='12366f9b-1230-123d-123e-123e6ae60c02' + ) + + +@mock_kms +def test_enable_key_key_not_found(): + client = boto3.client('kms', region_name='us-east-1') + + with assert_raises(client.exceptions.NotFoundException): + client.enable_key( + KeyId='12366f9b-1230-123d-123e-123e6ae60c02' + ) + + +@mock_kms +def test_disable_key_key_not_found(): + client = boto3.client('kms', region_name='us-east-1') + + with assert_raises(client.exceptions.NotFoundException): + client.disable_key( + KeyId='12366f9b-1230-123d-123e-123e6ae60c02' + ) + + +@mock_kms +def test_cancel_key_deletion_key_not_found(): + client = boto3.client('kms', region_name='us-east-1') + + with assert_raises(client.exceptions.NotFoundException): + client.cancel_key_deletion( + KeyId='12366f9b-1230-123d-123e-123e6ae60c02' + ) + + +@mock_kms +def test_schedule_key_deletion_key_not_found(): + client = boto3.client('kms', region_name='us-east-1') + + with assert_raises(client.exceptions.NotFoundException): + client.schedule_key_deletion( + KeyId='12366f9b-1230-123d-123e-123e6ae60c02' + ) + + +@mock_kms +def test_get_key_rotation_status_key_not_found(): + client = boto3.client('kms', region_name='us-east-1') + + with assert_raises(client.exceptions.NotFoundException): + client.get_key_rotation_status( + KeyId='12366f9b-1230-123d-123e-123e6ae60c02' + ) + + +@mock_kms +def test_get_key_policy_key_not_found(): + client = boto3.client('kms', region_name='us-east-1') + + with assert_raises(client.exceptions.NotFoundException): + client.get_key_policy( + KeyId='12366f9b-1230-123d-123e-123e6ae60c02', + PolicyName='default' + ) + + +@mock_kms +def test_list_key_policies_key_not_found(): + client = boto3.client('kms', region_name='us-east-1') + + with assert_raises(client.exceptions.NotFoundException): + client.list_key_policies( + KeyId='12366f9b-1230-123d-123e-123e6ae60c02' + ) + + +@mock_kms +def test_put_key_policy_key_not_found(): + client = boto3.client('kms', region_name='us-east-1') + + with assert_raises(client.exceptions.NotFoundException): + client.put_key_policy( + KeyId='00000000-0000-0000-0000-000000000000', + PolicyName='default', + Policy='new policy' + ) + +