KMS generate_data_key (#2071)

* Added KMS.generate_data_key and KMS.generate_date_key_without_plaintext

Increase test coverage to cover Key not found

* Added test for kms.put_key_policy key not found
This commit is contained in:
Terry Cain 2019-04-26 20:52:24 +01:00 committed by GitHub
parent 603f7c58a2
commit 4a286c4bc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 370 additions and 88 deletions

36
moto/kms/exceptions.py Normal file
View File

@ -0,0 +1,36 @@
from __future__ import unicode_literals
from moto.core.exceptions import JsonRESTError
class NotFoundException(JsonRESTError):
code = 400
def __init__(self, message):
super(NotFoundException, self).__init__(
"NotFoundException", message)
class ValidationException(JsonRESTError):
code = 400
def __init__(self, message):
super(ValidationException, self).__init__(
"ValidationException", message)
class AlreadyExistsException(JsonRESTError):
code = 400
def __init__(self, message):
super(AlreadyExistsException, self).__init__(
"AlreadyExistsException", message)
class NotAuthorizedException(JsonRESTError):
code = 400
def __init__(self):
super(NotAuthorizedException, self).__init__(
"NotAuthorizedException", None)
self.description = '{"__type":"NotAuthorizedException"}'

View File

@ -1,5 +1,6 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import os
import boto.kms import boto.kms
from moto.core import BaseBackend, BaseModel from moto.core import BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_without_milliseconds from moto.core.utils import iso_8601_datetime_without_milliseconds
@ -159,27 +160,38 @@ class KmsBackend(BaseBackend):
return self.keys[self.get_key_id(key_id)].policy return self.keys[self.get_key_id(key_id)].policy
def disable_key(self, key_id): def disable_key(self, key_id):
if key_id in self.keys: self.keys[key_id].enabled = False
self.keys[key_id].enabled = False self.keys[key_id].key_state = 'Disabled'
self.keys[key_id].key_state = 'Disabled'
def enable_key(self, key_id): def enable_key(self, key_id):
if key_id in self.keys: self.keys[key_id].enabled = True
self.keys[key_id].enabled = True self.keys[key_id].key_state = 'Enabled'
self.keys[key_id].key_state = 'Enabled'
def cancel_key_deletion(self, key_id): def cancel_key_deletion(self, key_id):
if key_id in self.keys: self.keys[key_id].key_state = 'Disabled'
self.keys[key_id].key_state = 'Disabled' self.keys[key_id].deletion_date = None
self.keys[key_id].deletion_date = None
def schedule_key_deletion(self, key_id, pending_window_in_days): def schedule_key_deletion(self, key_id, pending_window_in_days):
if key_id in self.keys: if 7 <= pending_window_in_days <= 30:
if 7 <= pending_window_in_days <= 30: self.keys[key_id].enabled = False
self.keys[key_id].enabled = False self.keys[key_id].key_state = 'PendingDeletion'
self.keys[key_id].key_state = 'PendingDeletion' self.keys[key_id].deletion_date = datetime.now() + timedelta(days=pending_window_in_days)
self.keys[key_id].deletion_date = datetime.now() + timedelta(days=pending_window_in_days) return iso_8601_datetime_without_milliseconds(self.keys[key_id].deletion_date)
return iso_8601_datetime_without_milliseconds(self.keys[key_id].deletion_date)
def generate_data_key(self, key_id, encryption_context, number_of_bytes, key_spec, grant_tokens):
key = self.keys[self.get_key_id(key_id)]
if key_spec:
if key_spec == 'AES_128':
bytes = 16
else:
bytes = 32
else:
bytes = number_of_bytes
plaintext = os.urandom(bytes)
return plaintext, key.arn
kms_backends = {} kms_backends = {}

View File

@ -5,11 +5,9 @@ import json
import re import re
import six import six
from boto.exception import JSONResponseError
from boto.kms.exceptions import AlreadyExistsException, NotFoundException
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from .models import kms_backends from .models import kms_backends
from .exceptions import NotFoundException, ValidationException, AlreadyExistsException, NotAuthorizedException
reserved_aliases = [ reserved_aliases = [
'alias/aws/ebs', 'alias/aws/ebs',
@ -88,36 +86,28 @@ class KmsResponse(BaseResponse):
def create_alias(self): def create_alias(self):
alias_name = self.parameters['AliasName'] alias_name = self.parameters['AliasName']
target_key_id = self.parameters['TargetKeyId'] target_key_id = self.parameters['TargetKeyId']
region = self.region
if not alias_name.startswith('alias/'): if not alias_name.startswith('alias/'):
raise JSONResponseError(400, 'Bad Request', raise ValidationException('Invalid identifier')
body={'message': 'Invalid identifier', '__type': 'ValidationException'})
if alias_name in reserved_aliases: if alias_name in reserved_aliases:
raise JSONResponseError(400, 'Bad Request', body={ raise NotAuthorizedException()
'__type': 'NotAuthorizedException'})
if ':' in alias_name: if ':' in alias_name:
raise JSONResponseError(400, 'Bad Request', body={ raise ValidationException('{alias_name} contains invalid characters for an alias'.format(alias_name=alias_name))
'message': '{alias_name} contains invalid characters for an alias'.format(**locals()),
'__type': 'ValidationException'})
if not re.match(r'^[a-zA-Z0-9:/_-]+$', alias_name): if not re.match(r'^[a-zA-Z0-9:/_-]+$', alias_name):
raise JSONResponseError(400, 'Bad Request', body={ raise ValidationException("1 validation error detected: Value '{alias_name}' at 'aliasName' "
'message': "1 validation error detected: Value '{alias_name}' at 'aliasName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9:/_-]+$" "failed to satisfy constraint: Member must satisfy regular "
.format(**locals()), "expression pattern: ^[a-zA-Z0-9:/_-]+$"
'__type': 'ValidationException'}) .format(alias_name=alias_name))
if self.kms_backend.alias_exists(target_key_id): if self.kms_backend.alias_exists(target_key_id):
raise JSONResponseError(400, 'Bad Request', body={ raise ValidationException('Aliases must refer to keys. Not aliases')
'message': 'Aliases must refer to keys. Not aliases',
'__type': 'ValidationException'})
if self.kms_backend.alias_exists(alias_name): if self.kms_backend.alias_exists(alias_name):
raise AlreadyExistsException(400, 'Bad Request', body={ raise AlreadyExistsException('An alias with the name arn:aws:kms:{region}:012345678912:{alias_name} '
'message': 'An alias with the name arn:aws:kms:{region}:012345678912:{alias_name} already exists' 'already exists'.format(region=self.region, alias_name=alias_name))
.format(**locals()), '__type': 'AlreadyExistsException'})
self.kms_backend.add_alias(target_key_id, alias_name) self.kms_backend.add_alias(target_key_id, alias_name)
@ -125,16 +115,13 @@ class KmsResponse(BaseResponse):
def delete_alias(self): def delete_alias(self):
alias_name = self.parameters['AliasName'] alias_name = self.parameters['AliasName']
region = self.region
if not alias_name.startswith('alias/'): if not alias_name.startswith('alias/'):
raise JSONResponseError(400, 'Bad Request', raise ValidationException('Invalid identifier')
body={'message': 'Invalid identifier', '__type': 'ValidationException'})
if not self.kms_backend.alias_exists(alias_name): if not self.kms_backend.alias_exists(alias_name):
raise NotFoundException(400, 'Bad Request', body={ raise NotFoundException('Alias arn:aws:kms:{region}:012345678912:'
'message': 'Alias arn:aws:kms:{region}:012345678912:{alias_name} is not found.'.format(**locals()), '{alias_name} is not found.'.format(region=self.region, alias_name=alias_name))
'__type': 'NotFoundException'})
self.kms_backend.delete_alias(alias_name) self.kms_backend.delete_alias(alias_name)
@ -172,9 +159,8 @@ class KmsResponse(BaseResponse):
try: try:
self.kms_backend.enable_key_rotation(key_id) self.kms_backend.enable_key_rotation(key_id)
except KeyError: except KeyError:
raise JSONResponseError(404, 'Not Found', body={ raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/"
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), "{key_id}' does not exist".format(region=self.region, key_id=key_id))
'__type': 'NotFoundException'})
return json.dumps(None) return json.dumps(None)
@ -184,9 +170,8 @@ class KmsResponse(BaseResponse):
try: try:
self.kms_backend.disable_key_rotation(key_id) self.kms_backend.disable_key_rotation(key_id)
except KeyError: except KeyError:
raise JSONResponseError(404, 'Not Found', body={ raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/"
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), "{key_id}' does not exist".format(region=self.region, key_id=key_id))
'__type': 'NotFoundException'})
return json.dumps(None) return json.dumps(None)
def get_key_rotation_status(self): def get_key_rotation_status(self):
@ -195,9 +180,8 @@ class KmsResponse(BaseResponse):
try: try:
rotation_enabled = self.kms_backend.get_key_rotation_status(key_id) rotation_enabled = self.kms_backend.get_key_rotation_status(key_id)
except KeyError: except KeyError:
raise JSONResponseError(404, 'Not Found', body={ raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/"
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), "{key_id}' does not exist".format(region=self.region, key_id=key_id))
'__type': 'NotFoundException'})
return json.dumps({'KeyRotationEnabled': rotation_enabled}) return json.dumps({'KeyRotationEnabled': rotation_enabled})
def put_key_policy(self): def put_key_policy(self):
@ -210,9 +194,8 @@ class KmsResponse(BaseResponse):
try: try:
self.kms_backend.put_key_policy(key_id, policy) self.kms_backend.put_key_policy(key_id, policy)
except KeyError: except KeyError:
raise JSONResponseError(404, 'Not Found', body={ raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/"
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), "{key_id}' does not exist".format(region=self.region, key_id=key_id))
'__type': 'NotFoundException'})
return json.dumps(None) return json.dumps(None)
@ -225,9 +208,8 @@ class KmsResponse(BaseResponse):
try: try:
return json.dumps({'Policy': self.kms_backend.get_key_policy(key_id)}) return json.dumps({'Policy': self.kms_backend.get_key_policy(key_id)})
except KeyError: except KeyError:
raise JSONResponseError(404, 'Not Found', body={ raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/"
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), "{key_id}' does not exist".format(region=self.region, key_id=key_id))
'__type': 'NotFoundException'})
def list_key_policies(self): def list_key_policies(self):
key_id = self.parameters.get('KeyId') key_id = self.parameters.get('KeyId')
@ -235,9 +217,8 @@ class KmsResponse(BaseResponse):
try: try:
self.kms_backend.describe_key(key_id) self.kms_backend.describe_key(key_id)
except KeyError: except KeyError:
raise JSONResponseError(404, 'Not Found', body={ raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/"
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), "{key_id}' does not exist".format(region=self.region, key_id=key_id))
'__type': 'NotFoundException'})
return json.dumps({'Truncated': False, 'PolicyNames': ['default']}) return json.dumps({'Truncated': False, 'PolicyNames': ['default']})
@ -252,8 +233,14 @@ class KmsResponse(BaseResponse):
return json.dumps({"CiphertextBlob": base64.b64encode(value).decode("utf-8"), 'KeyId': 'key_id'}) return json.dumps({"CiphertextBlob": base64.b64encode(value).decode("utf-8"), 'KeyId': 'key_id'})
def decrypt(self): def decrypt(self):
# TODO refuse decode if EncryptionContext is not the same as when it was encrypted / generated
value = self.parameters.get("CiphertextBlob") value = self.parameters.get("CiphertextBlob")
return json.dumps({"Plaintext": base64.b64decode(value).decode("utf-8"), 'KeyId': 'key_id'}) try:
return json.dumps({"Plaintext": base64.b64decode(value).decode("utf-8")})
except UnicodeDecodeError:
# Generate data key will produce random bytes which when decrypted is still returned as base64
return json.dumps({"Plaintext": value})
def disable_key(self): def disable_key(self):
key_id = self.parameters.get('KeyId') key_id = self.parameters.get('KeyId')
@ -261,9 +248,8 @@ class KmsResponse(BaseResponse):
try: try:
self.kms_backend.disable_key(key_id) self.kms_backend.disable_key(key_id)
except KeyError: except KeyError:
raise JSONResponseError(404, 'Not Found', body={ raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/"
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), "{key_id}' does not exist".format(region=self.region, key_id=key_id))
'__type': 'NotFoundException'})
return json.dumps(None) return json.dumps(None)
def enable_key(self): def enable_key(self):
@ -272,9 +258,8 @@ class KmsResponse(BaseResponse):
try: try:
self.kms_backend.enable_key(key_id) self.kms_backend.enable_key(key_id)
except KeyError: except KeyError:
raise JSONResponseError(404, 'Not Found', body={ raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/"
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), "{key_id}' does not exist".format(region=self.region, key_id=key_id))
'__type': 'NotFoundException'})
return json.dumps(None) return json.dumps(None)
def cancel_key_deletion(self): def cancel_key_deletion(self):
@ -283,9 +268,8 @@ class KmsResponse(BaseResponse):
try: try:
self.kms_backend.cancel_key_deletion(key_id) self.kms_backend.cancel_key_deletion(key_id)
except KeyError: except KeyError:
raise JSONResponseError(404, 'Not Found', body={ raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/"
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), "{key_id}' does not exist".format(region=self.region, key_id=key_id))
'__type': 'NotFoundException'})
return json.dumps({'KeyId': key_id}) return json.dumps({'KeyId': key_id})
def schedule_key_deletion(self): def schedule_key_deletion(self):
@ -301,19 +285,62 @@ class KmsResponse(BaseResponse):
'DeletionDate': self.kms_backend.schedule_key_deletion(key_id, pending_window_in_days) 'DeletionDate': self.kms_backend.schedule_key_deletion(key_id, pending_window_in_days)
}) })
except KeyError: except KeyError:
raise JSONResponseError(404, 'Not Found', body={ raise NotFoundException("Key 'arn:aws:kms:{region}:012345678912:key/"
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), "{key_id}' does not exist".format(region=self.region, key_id=key_id))
'__type': 'NotFoundException'})
def generate_data_key(self):
key_id = self.parameters.get('KeyId')
encryption_context = self.parameters.get('EncryptionContext')
number_of_bytes = self.parameters.get('NumberOfBytes')
key_spec = self.parameters.get('KeySpec')
grant_tokens = self.parameters.get('GrantTokens')
# Param validation
if key_id.startswith('alias'):
if self.kms_backend.get_key_id_from_alias(key_id) is None:
raise NotFoundException('Alias arn:aws:kms:{region}:012345678912:{alias_name} is not found.'.format(
region=self.region, alias_name=key_id))
else:
if self.kms_backend.get_key_id(key_id) not in self.kms_backend.keys:
raise NotFoundException('Invalid keyId')
if number_of_bytes and (number_of_bytes > 1024 or number_of_bytes < 0):
raise ValidationException("1 validation error detected: Value '2048' at 'numberOfBytes' failed "
"to satisfy constraint: Member must have value less than or "
"equal to 1024")
if key_spec and key_spec not in ('AES_256', 'AES_128'):
raise ValidationException("1 validation error detected: Value 'AES_257' at 'keySpec' failed "
"to satisfy constraint: Member must satisfy enum value set: "
"[AES_256, AES_128]")
if not key_spec and not number_of_bytes:
raise ValidationException("Please specify either number of bytes or key spec.")
if key_spec and number_of_bytes:
raise ValidationException("Please specify either number of bytes or key spec.")
plaintext, key_arn = self.kms_backend.generate_data_key(key_id, encryption_context,
number_of_bytes, key_spec, grant_tokens)
plaintext = base64.b64encode(plaintext).decode()
return json.dumps({
'CiphertextBlob': plaintext,
'Plaintext': plaintext,
'KeyId': key_arn # not alias
})
def generate_data_key_without_plaintext(self):
result = json.loads(self.generate_data_key())
del result['Plaintext']
return json.dumps(result)
def _assert_valid_key_id(key_id): def _assert_valid_key_id(key_id):
if not re.match(r'^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$', key_id, re.IGNORECASE): if not re.match(r'^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$', key_id, re.IGNORECASE):
raise JSONResponseError(404, 'Not Found', body={ raise NotFoundException('Invalid keyId')
'message': ' Invalid keyId', '__type': 'NotFoundException'})
def _assert_default_policy(policy_name): def _assert_default_policy(policy_name):
if policy_name != 'default': if policy_name != 'default':
raise JSONResponseError(404, 'Not Found', body={ raise NotFoundException("No such policy exists")
'message': "No such policy exists",
'__type': 'NotFoundException'})

View File

@ -2,8 +2,11 @@ from __future__ import unicode_literals
import os, re import os, re
import boto3 import boto3
import boto.kms import boto.kms
import botocore.exceptions
from boto.exception import JSONResponseError from boto.exception import JSONResponseError
from boto.kms.exceptions import AlreadyExistsException, NotFoundException from boto.kms.exceptions import AlreadyExistsException, NotFoundException
from moto.kms.exceptions import NotFoundException as MotoNotFoundException
import sure # noqa import sure # noqa
from moto import mock_kms, mock_kms_deprecated from moto import mock_kms, mock_kms_deprecated
from nose.tools import assert_raises from nose.tools import assert_raises
@ -127,7 +130,7 @@ def test_enable_key_rotation_via_arn():
def test_enable_key_rotation_with_missing_key(): def test_enable_key_rotation_with_missing_key():
conn = boto.kms.connect_to_region("us-west-2") conn = boto.kms.connect_to_region("us-west-2")
conn.enable_key_rotation.when.called_with( conn.enable_key_rotation.when.called_with(
"not-a-key").should.throw(JSONResponseError) "not-a-key").should.throw(NotFoundException)
@mock_kms_deprecated @mock_kms_deprecated
@ -142,7 +145,7 @@ def test_enable_key_rotation_with_alias_name_should_fail():
alias_key['KeyMetadata']['Arn'].should.equal(key['KeyMetadata']['Arn']) alias_key['KeyMetadata']['Arn'].should.equal(key['KeyMetadata']['Arn'])
conn.enable_key_rotation.when.called_with( conn.enable_key_rotation.when.called_with(
'alias/my-alias').should.throw(JSONResponseError) 'alias/my-alias').should.throw(NotFoundException)
@mock_kms_deprecated @mock_kms_deprecated
@ -179,21 +182,20 @@ def test_decrypt():
conn = boto.kms.connect_to_region('us-west-2') conn = boto.kms.connect_to_region('us-west-2')
response = conn.decrypt('ZW5jcnlwdG1l'.encode('utf-8')) response = conn.decrypt('ZW5jcnlwdG1l'.encode('utf-8'))
response['Plaintext'].should.equal(b'encryptme') response['Plaintext'].should.equal(b'encryptme')
response['KeyId'].should.equal('key_id')
@mock_kms_deprecated @mock_kms_deprecated
def test_disable_key_rotation_with_missing_key(): def test_disable_key_rotation_with_missing_key():
conn = boto.kms.connect_to_region("us-west-2") conn = boto.kms.connect_to_region("us-west-2")
conn.disable_key_rotation.when.called_with( conn.disable_key_rotation.when.called_with(
"not-a-key").should.throw(JSONResponseError) "not-a-key").should.throw(NotFoundException)
@mock_kms_deprecated @mock_kms_deprecated
def test_get_key_rotation_status_with_missing_key(): def test_get_key_rotation_status_with_missing_key():
conn = boto.kms.connect_to_region("us-west-2") conn = boto.kms.connect_to_region("us-west-2")
conn.get_key_rotation_status.when.called_with( conn.get_key_rotation_status.when.called_with(
"not-a-key").should.throw(JSONResponseError) "not-a-key").should.throw(NotFoundException)
@mock_kms_deprecated @mock_kms_deprecated
@ -279,7 +281,7 @@ def test_put_key_policy_via_alias_should_not_update():
target_key_id=key['KeyMetadata']['KeyId']) target_key_id=key['KeyMetadata']['KeyId'])
conn.put_key_policy.when.called_with( conn.put_key_policy.when.called_with(
'alias/my-key-alias', 'default', 'new policy').should.throw(JSONResponseError) 'alias/my-key-alias', 'default', 'new policy').should.throw(NotFoundException)
policy = conn.get_key_policy(key['KeyMetadata']['KeyId'], 'default') policy = conn.get_key_policy(key['KeyMetadata']['KeyId'], 'default')
policy['Policy'].should.equal('my policy') policy['Policy'].should.equal('my policy')
@ -599,9 +601,9 @@ def test__assert_valid_key_id():
import uuid import uuid
_assert_valid_key_id.when.called_with( _assert_valid_key_id.when.called_with(
"not-a-key").should.throw(JSONResponseError) "not-a-key").should.throw(MotoNotFoundException)
_assert_valid_key_id.when.called_with( _assert_valid_key_id.when.called_with(
str(uuid.uuid4())).should_not.throw(JSONResponseError) str(uuid.uuid4())).should_not.throw(MotoNotFoundException)
@mock_kms_deprecated @mock_kms_deprecated
@ -609,9 +611,9 @@ def test__assert_default_policy():
from moto.kms.responses import _assert_default_policy from moto.kms.responses import _assert_default_policy
_assert_default_policy.when.called_with( _assert_default_policy.when.called_with(
"not-default").should.throw(JSONResponseError) "not-default").should.throw(MotoNotFoundException)
_assert_default_policy.when.called_with( _assert_default_policy.when.called_with(
"default").should_not.throw(JSONResponseError) "default").should_not.throw(MotoNotFoundException)
@mock_kms @mock_kms
@ -775,3 +777,208 @@ def test_list_resource_tags():
response = client.list_resource_tags(KeyId=keyid) response = client.list_resource_tags(KeyId=keyid)
assert response['Tags'][0]['TagKey'] == 'string' assert response['Tags'][0]['TagKey'] == 'string'
assert response['Tags'][0]['TagValue'] == 'string' assert response['Tags'][0]['TagValue'] == 'string'
@mock_kms
def test_generate_data_key_sizes():
client = boto3.client('kms', region_name='us-east-1')
key = client.create_key(Description='generate-data-key-size')
resp1 = client.generate_data_key(
KeyId=key['KeyMetadata']['KeyId'],
KeySpec='AES_256'
)
resp2 = client.generate_data_key(
KeyId=key['KeyMetadata']['KeyId'],
KeySpec='AES_128'
)
resp3 = client.generate_data_key(
KeyId=key['KeyMetadata']['KeyId'],
NumberOfBytes=64
)
assert len(resp1['Plaintext']) == 32
assert len(resp2['Plaintext']) == 16
assert len(resp3['Plaintext']) == 64
@mock_kms
def test_generate_data_key_decrypt():
client = boto3.client('kms', region_name='us-east-1')
key = client.create_key(Description='generate-data-key-decrypt')
resp1 = client.generate_data_key(
KeyId=key['KeyMetadata']['KeyId'],
KeySpec='AES_256'
)
resp2 = client.decrypt(
CiphertextBlob=resp1['CiphertextBlob']
)
assert resp1['Plaintext'] == resp2['Plaintext']
@mock_kms
def test_generate_data_key_invalid_size_params():
client = boto3.client('kms', region_name='us-east-1')
key = client.create_key(Description='generate-data-key-size')
with assert_raises(botocore.exceptions.ClientError) as err:
client.generate_data_key(
KeyId=key['KeyMetadata']['KeyId'],
KeySpec='AES_257'
)
with assert_raises(botocore.exceptions.ClientError) as err:
client.generate_data_key(
KeyId=key['KeyMetadata']['KeyId'],
KeySpec='AES_128',
NumberOfBytes=16
)
with assert_raises(botocore.exceptions.ClientError) as err:
client.generate_data_key(
KeyId=key['KeyMetadata']['KeyId'],
NumberOfBytes=2048
)
with assert_raises(botocore.exceptions.ClientError) as err:
client.generate_data_key(
KeyId=key['KeyMetadata']['KeyId']
)
@mock_kms
def test_generate_data_key_invalid_key():
client = boto3.client('kms', region_name='us-east-1')
key = client.create_key(Description='generate-data-key-size')
with assert_raises(client.exceptions.NotFoundException):
client.generate_data_key(
KeyId='alias/randomnonexistantkey',
KeySpec='AES_256'
)
with assert_raises(client.exceptions.NotFoundException):
client.generate_data_key(
KeyId=key['KeyMetadata']['KeyId'] + '4',
KeySpec='AES_256'
)
@mock_kms
def test_generate_data_key_without_plaintext_decrypt():
client = boto3.client('kms', region_name='us-east-1')
key = client.create_key(Description='generate-data-key-decrypt')
resp1 = client.generate_data_key_without_plaintext(
KeyId=key['KeyMetadata']['KeyId'],
KeySpec='AES_256'
)
assert 'Plaintext' not in resp1
@mock_kms
def test_enable_key_rotation_key_not_found():
client = boto3.client('kms', region_name='us-east-1')
with assert_raises(client.exceptions.NotFoundException):
client.enable_key_rotation(
KeyId='12366f9b-1230-123d-123e-123e6ae60c02'
)
@mock_kms
def test_disable_key_rotation_key_not_found():
client = boto3.client('kms', region_name='us-east-1')
with assert_raises(client.exceptions.NotFoundException):
client.disable_key_rotation(
KeyId='12366f9b-1230-123d-123e-123e6ae60c02'
)
@mock_kms
def test_enable_key_key_not_found():
client = boto3.client('kms', region_name='us-east-1')
with assert_raises(client.exceptions.NotFoundException):
client.enable_key(
KeyId='12366f9b-1230-123d-123e-123e6ae60c02'
)
@mock_kms
def test_disable_key_key_not_found():
client = boto3.client('kms', region_name='us-east-1')
with assert_raises(client.exceptions.NotFoundException):
client.disable_key(
KeyId='12366f9b-1230-123d-123e-123e6ae60c02'
)
@mock_kms
def test_cancel_key_deletion_key_not_found():
client = boto3.client('kms', region_name='us-east-1')
with assert_raises(client.exceptions.NotFoundException):
client.cancel_key_deletion(
KeyId='12366f9b-1230-123d-123e-123e6ae60c02'
)
@mock_kms
def test_schedule_key_deletion_key_not_found():
client = boto3.client('kms', region_name='us-east-1')
with assert_raises(client.exceptions.NotFoundException):
client.schedule_key_deletion(
KeyId='12366f9b-1230-123d-123e-123e6ae60c02'
)
@mock_kms
def test_get_key_rotation_status_key_not_found():
client = boto3.client('kms', region_name='us-east-1')
with assert_raises(client.exceptions.NotFoundException):
client.get_key_rotation_status(
KeyId='12366f9b-1230-123d-123e-123e6ae60c02'
)
@mock_kms
def test_get_key_policy_key_not_found():
client = boto3.client('kms', region_name='us-east-1')
with assert_raises(client.exceptions.NotFoundException):
client.get_key_policy(
KeyId='12366f9b-1230-123d-123e-123e6ae60c02',
PolicyName='default'
)
@mock_kms
def test_list_key_policies_key_not_found():
client = boto3.client('kms', region_name='us-east-1')
with assert_raises(client.exceptions.NotFoundException):
client.list_key_policies(
KeyId='12366f9b-1230-123d-123e-123e6ae60c02'
)
@mock_kms
def test_put_key_policy_key_not_found():
client = boto3.client('kms', region_name='us-east-1')
with assert_raises(client.exceptions.NotFoundException):
client.put_key_policy(
KeyId='00000000-0000-0000-0000-000000000000',
PolicyName='default',
Policy='new policy'
)