diff --git a/moto/kms/models.py b/moto/kms/models.py index d1b61d86c..9fb28bb5f 100644 --- a/moto/kms/models.py +++ b/moto/kms/models.py @@ -4,13 +4,12 @@ import os import boto.kms from moto.core import BaseBackend, BaseModel from moto.core.utils import iso_8601_datetime_without_milliseconds -from .utils import generate_key_id, generate_master_key +from .utils import decrypt, encrypt, generate_key_id, generate_master_key from collections import defaultdict from datetime import datetime, timedelta class Key(BaseModel): - def __init__(self, policy, key_usage, description, tags, region): self.id = generate_key_id() self.policy = policy @@ -46,8 +45,8 @@ class Key(BaseModel): "KeyState": self.key_state, } } - if self.key_state == 'PendingDeletion': - key_dict['KeyMetadata']['DeletionDate'] = iso_8601_datetime_without_milliseconds(self.deletion_date) + if self.key_state == "PendingDeletion": + key_dict["KeyMetadata"]["DeletionDate"] = iso_8601_datetime_without_milliseconds(self.deletion_date) return key_dict def delete(self, region_name): @@ -56,28 +55,28 @@ class Key(BaseModel): @classmethod def create_from_cloudformation_json(self, resource_name, cloudformation_json, region_name): kms_backend = kms_backends[region_name] - properties = cloudformation_json['Properties'] + properties = cloudformation_json["Properties"] key = kms_backend.create_key( - policy=properties['KeyPolicy'], - key_usage='ENCRYPT_DECRYPT', - description=properties['Description'], - tags=properties.get('Tags'), + policy=properties["KeyPolicy"], + key_usage="ENCRYPT_DECRYPT", + description=properties["Description"], + tags=properties.get("Tags"), region=region_name, ) - key.key_rotation_status = properties['EnableKeyRotation'] - key.enabled = properties['Enabled'] + key.key_rotation_status = properties["EnableKeyRotation"] + key.enabled = properties["Enabled"] return key def get_cfn_attribute(self, attribute_name): from moto.cloudformation.exceptions import UnformattedGetAttTemplateException - if attribute_name == 'Arn': + + if attribute_name == "Arn": return self.arn raise UnformattedGetAttTemplateException() class KmsBackend(BaseBackend): - def __init__(self): self.keys = {} self.key_to_aliases = defaultdict(set) @@ -110,8 +109,8 @@ class KmsBackend(BaseBackend): # allow the different methods (alias, ARN :key/, keyId, ARN alias) to # describe key not just KeyId key_id = self.get_key_id(key_id) - if r'alias/' in str(key_id).lower(): - key_id = self.get_key_id_from_alias(key_id.split('alias/')[1]) + if r"alias/" in str(key_id).lower(): + key_id = self.get_key_id_from_alias(key_id.split("alias/")[1]) return self.keys[self.get_key_id(key_id)] def list_keys(self): @@ -119,7 +118,26 @@ class KmsBackend(BaseBackend): def get_key_id(self, key_id): # Allow use of ARN as well as pure KeyId - return str(key_id).split(r':key/')[1] if r':key/' in str(key_id).lower() else key_id + return str(key_id).split(r":key/")[1] if r":key/" in str(key_id).lower() else key_id + + def get_alias_name(self, alias_name): + # Allow use of ARN as well as alias name + return str(alias_name).split(r":alias/")[1] if r":alias/" in str(alias_name).lower() else alias_name + + def any_id_to_key_id(self, key_id): + """Go from any valid key ID to the raw key ID. + + Acceptable inputs: + - raw key ID + - key ARN + - alias name + - alias ARN + """ + key_id = self.get_alias_name(key_id) + key_id = self.get_key_id(key_id) + if key_id.startswith("alias/"): + key_id = self.get_key_id_from_alias(key_id) + return key_id def alias_exists(self, alias_name): for aliases in self.key_to_aliases.values(): @@ -163,37 +181,56 @@ class KmsBackend(BaseBackend): def disable_key(self, key_id): self.keys[key_id].enabled = False - self.keys[key_id].key_state = 'Disabled' + self.keys[key_id].key_state = "Disabled" def enable_key(self, key_id): self.keys[key_id].enabled = True - self.keys[key_id].key_state = 'Enabled' + self.keys[key_id].key_state = "Enabled" def cancel_key_deletion(self, key_id): - self.keys[key_id].key_state = 'Disabled' + self.keys[key_id].key_state = "Disabled" self.keys[key_id].deletion_date = None def schedule_key_deletion(self, key_id, pending_window_in_days): if 7 <= pending_window_in_days <= 30: self.keys[key_id].enabled = False - self.keys[key_id].key_state = 'PendingDeletion' + self.keys[key_id].key_state = "PendingDeletion" self.keys[key_id].deletion_date = datetime.now() + timedelta(days=pending_window_in_days) return iso_8601_datetime_without_milliseconds(self.keys[key_id].deletion_date) + def encrypt(self, key_id, plaintext, encryption_context): + key_id = self.any_id_to_key_id(key_id) + + ciphertext_blob = encrypt( + master_keys=self.keys, key_id=key_id, plaintext=plaintext, encryption_context=encryption_context + ) + arn = self.keys[key_id].arn + return ciphertext_blob, arn + + def decrypt(self, ciphertext_blob, encryption_context): + plaintext, key_id = decrypt( + master_keys=self.keys, ciphertext_blob=ciphertext_blob, encryption_context=encryption_context + ) + arn = self.keys[key_id].arn + return plaintext, arn + def generate_data_key(self, key_id, encryption_context, number_of_bytes, key_spec, grant_tokens): - key = self.keys[self.get_key_id(key_id)] + key_id = self.any_id_to_key_id(key_id) if key_spec: - if key_spec == 'AES_128': - bytes = 16 + # Note: Actual validation of key_spec is done in kms.responses + if key_spec == "AES_128": + plaintext_len = 16 else: - bytes = 32 + plaintext_len = 32 else: - bytes = number_of_bytes + plaintext_len = number_of_bytes - plaintext = os.urandom(bytes) + plaintext = os.urandom(plaintext_len) - return plaintext, key.arn + ciphertext_blob, arn = self.encrypt(key_id=key_id, plaintext=plaintext, encryption_context=encryption_context) + + return plaintext, ciphertext_blob, arn kms_backends = {} diff --git a/moto/kms/responses.py b/moto/kms/responses.py index 53012b7f8..0b8684019 100644 --- a/moto/kms/responses.py +++ b/moto/kms/responses.py @@ -8,6 +8,7 @@ import six from moto.core.responses import BaseResponse from .models import kms_backends from .exceptions import NotFoundException, ValidationException, AlreadyExistsException, NotAuthorizedException +from .utils import decrypt, encrypt reserved_aliases = [ 'alias/aws/ebs', @@ -21,7 +22,13 @@ class KmsResponse(BaseResponse): @property def parameters(self): - return json.loads(self.body) + params = json.loads(self.body) + + for key in ("Plaintext", "CiphertextBlob"): + if key in params: + params[key] = base64.b64decode(params[key].encode("utf-8")) + + return params @property def kms_backend(self): @@ -224,24 +231,34 @@ class KmsResponse(BaseResponse): return json.dumps({'Truncated': False, 'PolicyNames': ['default']}) def encrypt(self): - """ - We perform no encryption, we just encode the value as base64 and then - decode it in decrypt(). - """ - value = self.parameters.get("Plaintext") - if isinstance(value, six.text_type): - value = value.encode('utf-8') - return json.dumps({"CiphertextBlob": base64.b64encode(value).decode("utf-8"), 'KeyId': 'key_id'}) + key_id = self.parameters.get("KeyId") + encryption_context = self.parameters.get('EncryptionContext', {}) + plaintext = self.parameters.get("Plaintext") + + if isinstance(plaintext, six.text_type): + plaintext = plaintext.encode('utf-8') + + ciphertext_blob, arn = self.kms_backend.encrypt( + key_id=key_id, + plaintext=plaintext, + encryption_context=encryption_context, + ) + ciphertext_blob_response = base64.b64encode(ciphertext_blob).decode("utf-8") + + return json.dumps({"CiphertextBlob": ciphertext_blob_response, "KeyId": arn}) def decrypt(self): - # TODO refuse decode if EncryptionContext is not the same as when it was encrypted / generated + ciphertext_blob = self.parameters.get("CiphertextBlob") + encryption_context = self.parameters.get('EncryptionContext', {}) - value = self.parameters.get("CiphertextBlob") - try: - return json.dumps({"Plaintext": base64.b64decode(value).decode("utf-8"), 'KeyId': 'key_id'}) - except UnicodeDecodeError: - # Generate data key will produce random bytes which when decrypted is still returned as base64 - return json.dumps({"Plaintext": value}) + plaintext, arn = self.kms_backend.decrypt( + ciphertext_blob=ciphertext_blob, + encryption_context=encryption_context, + ) + + plaintext_response = base64.b64encode(plaintext).decode("utf-8") + + return json.dumps({"Plaintext": plaintext_response, 'KeyId': arn}) def disable_key(self): key_id = self.parameters.get('KeyId') @@ -291,7 +308,7 @@ class KmsResponse(BaseResponse): def generate_data_key(self): key_id = self.parameters.get('KeyId') - encryption_context = self.parameters.get('EncryptionContext') + encryption_context = self.parameters.get('EncryptionContext', {}) number_of_bytes = self.parameters.get('NumberOfBytes') key_spec = self.parameters.get('KeySpec') grant_tokens = self.parameters.get('GrantTokens') @@ -306,27 +323,39 @@ class KmsResponse(BaseResponse): raise NotFoundException('Invalid keyId') if number_of_bytes and (number_of_bytes > 1024 or number_of_bytes < 0): - raise ValidationException("1 validation error detected: Value '2048' at 'numberOfBytes' failed " - "to satisfy constraint: Member must have value less than or " - "equal to 1024") + raise ValidationException(( + "1 validation error detected: Value '{number_of_bytes:d}' at 'numberOfBytes' failed " + "to satisfy constraint: Member must have value less than or " + "equal to 1024" + ).format(number_of_bytes=number_of_bytes) + ) if key_spec and key_spec not in ('AES_256', 'AES_128'): - raise ValidationException("1 validation error detected: Value 'AES_257' at 'keySpec' failed " - "to satisfy constraint: Member must satisfy enum value set: " - "[AES_256, AES_128]") + raise ValidationException(( + "1 validation error detected: Value '{key_spec}' at 'keySpec' failed " + "to satisfy constraint: Member must satisfy enum value set: " + "[AES_256, AES_128]" + ).format(key_spec=key_spec) + ) if not key_spec and not number_of_bytes: raise ValidationException("Please specify either number of bytes or key spec.") if key_spec and number_of_bytes: raise ValidationException("Please specify either number of bytes or key spec.") - plaintext, key_arn = self.kms_backend.generate_data_key(key_id, encryption_context, - number_of_bytes, key_spec, grant_tokens) + plaintext, ciphertext_blob, key_arn = self.kms_backend.generate_data_key( + key_id=key_id, + encryption_context=encryption_context, + number_of_bytes=number_of_bytes, + key_spec=key_spec, + grant_tokens=grant_tokens + ) - plaintext = base64.b64encode(plaintext).decode() + plaintext_response = base64.b64encode(plaintext).decode("utf-8") + ciphertext_blob_response = base64.b64encode(ciphertext_blob).decode("utf-8") return json.dumps({ - 'CiphertextBlob': plaintext, - 'Plaintext': plaintext, + 'CiphertextBlob': ciphertext_blob_response, + 'Plaintext': plaintext_response, 'KeyId': key_arn # not alias }) diff --git a/tests/test_kms/test_kms.py b/tests/test_kms/test_kms.py index f189fbe41..4e1f39540 100644 --- a/tests/test_kms/test_kms.py +++ b/tests/test_kms/test_kms.py @@ -1,333 +1,368 @@ from __future__ import unicode_literals -import os, re -import boto3 -import boto.kms -import botocore.exceptions -from boto.exception import JSONResponseError -from boto.kms.exceptions import AlreadyExistsException, NotFoundException - -from moto.kms.exceptions import NotFoundException as MotoNotFoundException -import sure # noqa -from moto import mock_kms, mock_kms_deprecated -from nose.tools import assert_raises -from freezegun import freeze_time from datetime import date from datetime import datetime from dateutil.tz import tzutc +import base64 +import binascii +import os +import re + +import boto3 +import boto.kms +import botocore.exceptions +import sure # noqa +from boto.exception import JSONResponseError +from boto.kms.exceptions import AlreadyExistsException, NotFoundException +from freezegun import freeze_time +from nose.tools import assert_raises +from parameterized import parameterized + +from moto.kms.exceptions import NotFoundException as MotoNotFoundException +from moto import mock_kms, mock_kms_deprecated + +PLAINTEXT_VECTORS = ( + (b"some encodeable plaintext",), + (b"some unencodeable plaintext \xec\x8a\xcf\xb6r\xe9\xb5\xeb\xff\xa23\x16",), +) @mock_kms def test_create_key(): - conn = boto3.client('kms', region_name='us-east-1') + conn = boto3.client("kms", region_name="us-east-1") with freeze_time("2015-01-01 00:00:00"): - key = conn.create_key(Policy="my policy", - Description="my key", - KeyUsage='ENCRYPT_DECRYPT', - Tags=[ - { - 'TagKey': 'project', - 'TagValue': 'moto', - }, - ]) + key = conn.create_key( + Policy="my policy", + Description="my key", + KeyUsage="ENCRYPT_DECRYPT", + Tags=[{"TagKey": "project", "TagValue": "moto"}], + ) - key['KeyMetadata']['Description'].should.equal("my key") - key['KeyMetadata']['KeyUsage'].should.equal("ENCRYPT_DECRYPT") - key['KeyMetadata']['Enabled'].should.equal(True) - key['KeyMetadata']['CreationDate'].should.be.a(date) + key["KeyMetadata"]["Description"].should.equal("my key") + key["KeyMetadata"]["KeyUsage"].should.equal("ENCRYPT_DECRYPT") + key["KeyMetadata"]["Enabled"].should.equal(True) + key["KeyMetadata"]["CreationDate"].should.be.a(date) @mock_kms_deprecated def test_describe_key(): conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy="my policy", - description="my key", key_usage='ENCRYPT_DECRYPT') - key_id = key['KeyMetadata']['KeyId'] + key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT") + key_id = key["KeyMetadata"]["KeyId"] key = conn.describe_key(key_id) - key['KeyMetadata']['Description'].should.equal("my key") - key['KeyMetadata']['KeyUsage'].should.equal("ENCRYPT_DECRYPT") + key["KeyMetadata"]["Description"].should.equal("my key") + key["KeyMetadata"]["KeyUsage"].should.equal("ENCRYPT_DECRYPT") @mock_kms_deprecated def test_describe_key_via_alias(): conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy="my policy", - description="my key", key_usage='ENCRYPT_DECRYPT') - conn.create_alias(alias_name='alias/my-key-alias', - target_key_id=key['KeyMetadata']['KeyId']) + key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT") + conn.create_alias(alias_name="alias/my-key-alias", target_key_id=key["KeyMetadata"]["KeyId"]) - alias_key = conn.describe_key('alias/my-key-alias') - alias_key['KeyMetadata']['Description'].should.equal("my key") - alias_key['KeyMetadata']['KeyUsage'].should.equal("ENCRYPT_DECRYPT") - alias_key['KeyMetadata']['Arn'].should.equal(key['KeyMetadata']['Arn']) + alias_key = conn.describe_key("alias/my-key-alias") + alias_key["KeyMetadata"]["Description"].should.equal("my key") + alias_key["KeyMetadata"]["KeyUsage"].should.equal("ENCRYPT_DECRYPT") + alias_key["KeyMetadata"]["Arn"].should.equal(key["KeyMetadata"]["Arn"]) @mock_kms_deprecated def test_describe_key_via_alias_not_found(): conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy="my policy", - description="my key", key_usage='ENCRYPT_DECRYPT') - conn.create_alias(alias_name='alias/my-key-alias', - target_key_id=key['KeyMetadata']['KeyId']) + key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT") + conn.create_alias(alias_name="alias/my-key-alias", target_key_id=key["KeyMetadata"]["KeyId"]) - conn.describe_key.when.called_with( - 'alias/not-found-alias').should.throw(JSONResponseError) + conn.describe_key.when.called_with("alias/not-found-alias").should.throw(JSONResponseError) @mock_kms_deprecated def test_describe_key_via_arn(): conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy="my policy", - description="my key", key_usage='ENCRYPT_DECRYPT') - arn = key['KeyMetadata']['Arn'] + key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT") + arn = key["KeyMetadata"]["Arn"] the_key = conn.describe_key(arn) - the_key['KeyMetadata']['Description'].should.equal("my key") - the_key['KeyMetadata']['KeyUsage'].should.equal("ENCRYPT_DECRYPT") - the_key['KeyMetadata']['KeyId'].should.equal(key['KeyMetadata']['KeyId']) + the_key["KeyMetadata"]["Description"].should.equal("my key") + the_key["KeyMetadata"]["KeyUsage"].should.equal("ENCRYPT_DECRYPT") + the_key["KeyMetadata"]["KeyId"].should.equal(key["KeyMetadata"]["KeyId"]) @mock_kms_deprecated def test_describe_missing_key(): conn = boto.kms.connect_to_region("us-west-2") - conn.describe_key.when.called_with( - "not-a-key").should.throw(JSONResponseError) + conn.describe_key.when.called_with("not-a-key").should.throw(JSONResponseError) @mock_kms_deprecated def test_list_keys(): conn = boto.kms.connect_to_region("us-west-2") - conn.create_key(policy="my policy", description="my key1", - key_usage='ENCRYPT_DECRYPT') - conn.create_key(policy="my policy", description="my key2", - key_usage='ENCRYPT_DECRYPT') + conn.create_key(policy="my policy", description="my key1", key_usage="ENCRYPT_DECRYPT") + conn.create_key(policy="my policy", description="my key2", key_usage="ENCRYPT_DECRYPT") keys = conn.list_keys() - keys['Keys'].should.have.length_of(2) + keys["Keys"].should.have.length_of(2) @mock_kms_deprecated def test_enable_key_rotation(): conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy="my policy", - description="my key", key_usage='ENCRYPT_DECRYPT') - key_id = key['KeyMetadata']['KeyId'] + key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT") + key_id = key["KeyMetadata"]["KeyId"] conn.enable_key_rotation(key_id) - conn.get_key_rotation_status( - key_id)['KeyRotationEnabled'].should.equal(True) + conn.get_key_rotation_status(key_id)["KeyRotationEnabled"].should.equal(True) @mock_kms_deprecated def test_enable_key_rotation_via_arn(): conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy="my policy", - description="my key", key_usage='ENCRYPT_DECRYPT') - key_id = key['KeyMetadata']['Arn'] + key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT") + key_id = key["KeyMetadata"]["Arn"] conn.enable_key_rotation(key_id) - conn.get_key_rotation_status( - key_id)['KeyRotationEnabled'].should.equal(True) + conn.get_key_rotation_status(key_id)["KeyRotationEnabled"].should.equal(True) @mock_kms_deprecated def test_enable_key_rotation_with_missing_key(): conn = boto.kms.connect_to_region("us-west-2") - conn.enable_key_rotation.when.called_with( - "not-a-key").should.throw(NotFoundException) + conn.enable_key_rotation.when.called_with("not-a-key").should.throw(NotFoundException) @mock_kms_deprecated def test_enable_key_rotation_with_alias_name_should_fail(): conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy="my policy", - description="my key", key_usage='ENCRYPT_DECRYPT') - conn.create_alias(alias_name='alias/my-key-alias', - target_key_id=key['KeyMetadata']['KeyId']) + key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT") + conn.create_alias(alias_name="alias/my-key-alias", target_key_id=key["KeyMetadata"]["KeyId"]) - alias_key = conn.describe_key('alias/my-key-alias') - alias_key['KeyMetadata']['Arn'].should.equal(key['KeyMetadata']['Arn']) + alias_key = conn.describe_key("alias/my-key-alias") + alias_key["KeyMetadata"]["Arn"].should.equal(key["KeyMetadata"]["Arn"]) - conn.enable_key_rotation.when.called_with( - 'alias/my-alias').should.throw(NotFoundException) + conn.enable_key_rotation.when.called_with("alias/my-alias").should.throw(NotFoundException) @mock_kms_deprecated def test_disable_key_rotation(): conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy="my policy", - description="my key", key_usage='ENCRYPT_DECRYPT') - key_id = key['KeyMetadata']['KeyId'] + key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT") + key_id = key["KeyMetadata"]["KeyId"] conn.enable_key_rotation(key_id) - conn.get_key_rotation_status( - key_id)['KeyRotationEnabled'].should.equal(True) + conn.get_key_rotation_status(key_id)["KeyRotationEnabled"].should.equal(True) conn.disable_key_rotation(key_id) - conn.get_key_rotation_status( - key_id)['KeyRotationEnabled'].should.equal(False) + conn.get_key_rotation_status(key_id)["KeyRotationEnabled"].should.equal(False) @mock_kms_deprecated -def test_encrypt(): - """ - test_encrypt - Using base64 encoding to merely test that the endpoint was called - """ +def test_generate_data_key(): conn = boto.kms.connect_to_region("us-west-2") - response = conn.encrypt('key_id', 'encryptme'.encode('utf-8')) - response['CiphertextBlob'].should.equal(b'ZW5jcnlwdG1l') - response['KeyId'].should.equal('key_id') + + key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT") + key_id = key["KeyMetadata"]["KeyId"] + key_arn = key["KeyMetadata"]["Arn"] + + response = conn.generate_data_key(key_id=key_id, number_of_bytes=32) + + # CiphertextBlob must NOT be base64-encoded + with assert_raises(Exception): + base64.b64decode(response["CiphertextBlob"], validate=True) + # Plaintext must NOT be base64-encoded + with assert_raises(Exception): + base64.b64decode(response["Plaintext"], validate=True) + + response["KeyId"].should.equal(key_arn) +@mock_kms +def test_boto3_generate_data_key(): + kms = boto3.client("kms", region_name="us-west-2") + + key = kms.create_key() + key_id = key["KeyMetadata"]["KeyId"] + key_arn = key["KeyMetadata"]["Arn"] + + response = kms.generate_data_key(KeyId=key_id, NumberOfBytes=32) + + # CiphertextBlob must NOT be base64-encoded + with assert_raises(Exception): + base64.b64decode(response["CiphertextBlob"], validate=True) + # Plaintext must NOT be base64-encoded + with assert_raises(Exception): + base64.b64decode(response["Plaintext"], validate=True) + + response["KeyId"].should.equal(key_arn) + + +@parameterized(PLAINTEXT_VECTORS) @mock_kms_deprecated -def test_decrypt(): - conn = boto.kms.connect_to_region('us-west-2') - response = conn.decrypt('ZW5jcnlwdG1l'.encode('utf-8')) - response['Plaintext'].should.equal(b'encryptme') - response['KeyId'].should.equal('key_id') +def test_encrypt(plaintext): + conn = boto.kms.connect_to_region("us-west-2") + + key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT") + key_id = key["KeyMetadata"]["KeyId"] + key_arn = key["KeyMetadata"]["Arn"] + + response = conn.encrypt(key_id, plaintext) + response["CiphertextBlob"].should_not.equal(plaintext) + + # CiphertextBlob must NOT be base64-encoded + with assert_raises(Exception): + base64.b64decode(response["CiphertextBlob"], validate=True) + + response["KeyId"].should.equal(key_arn) + + +@parameterized(PLAINTEXT_VECTORS) +@mock_kms_deprecated +def test_decrypt(plaintext): + conn = boto.kms.connect_to_region("us-west-2") + + key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT") + key_id = key["KeyMetadata"]["KeyId"] + key_arn = key["KeyMetadata"]["Arn"] + + encrypt_response = conn.encrypt(key_id, plaintext) + + # CiphertextBlob must NOT be base64-encoded + with assert_raises(Exception): + base64.b64decode(encrypt_response["CiphertextBlob"], validate=True) + + decrypt_response = conn.decrypt(encrypt_response["CiphertextBlob"]) + + # Plaintext must NOT be base64-encoded + with assert_raises(Exception): + base64.b64decode(decrypt_response["Plaintext"], validate=True) + + decrypt_response["Plaintext"].should.equal(plaintext) + decrypt_response["KeyId"].should.equal(key_arn) @mock_kms_deprecated def test_disable_key_rotation_with_missing_key(): conn = boto.kms.connect_to_region("us-west-2") - conn.disable_key_rotation.when.called_with( - "not-a-key").should.throw(NotFoundException) + conn.disable_key_rotation.when.called_with("not-a-key").should.throw(NotFoundException) @mock_kms_deprecated def test_get_key_rotation_status_with_missing_key(): conn = boto.kms.connect_to_region("us-west-2") - conn.get_key_rotation_status.when.called_with( - "not-a-key").should.throw(NotFoundException) + conn.get_key_rotation_status.when.called_with("not-a-key").should.throw(NotFoundException) @mock_kms_deprecated def test_get_key_rotation_status(): conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy="my policy", - description="my key", key_usage='ENCRYPT_DECRYPT') - key_id = key['KeyMetadata']['KeyId'] + key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT") + key_id = key["KeyMetadata"]["KeyId"] - conn.get_key_rotation_status( - key_id)['KeyRotationEnabled'].should.equal(False) + conn.get_key_rotation_status(key_id)["KeyRotationEnabled"].should.equal(False) @mock_kms_deprecated def test_create_key_defaults_key_rotation(): conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy="my policy", - description="my key", key_usage='ENCRYPT_DECRYPT') - key_id = key['KeyMetadata']['KeyId'] + key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT") + key_id = key["KeyMetadata"]["KeyId"] - conn.get_key_rotation_status( - key_id)['KeyRotationEnabled'].should.equal(False) + conn.get_key_rotation_status(key_id)["KeyRotationEnabled"].should.equal(False) @mock_kms_deprecated def test_get_key_policy(): - conn = boto.kms.connect_to_region('us-west-2') + conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy='my policy', - description='my key1', key_usage='ENCRYPT_DECRYPT') - key_id = key['KeyMetadata']['KeyId'] + key = conn.create_key(policy="my policy", description="my key1", key_usage="ENCRYPT_DECRYPT") + key_id = key["KeyMetadata"]["KeyId"] - policy = conn.get_key_policy(key_id, 'default') - policy['Policy'].should.equal('my policy') + policy = conn.get_key_policy(key_id, "default") + policy["Policy"].should.equal("my policy") @mock_kms_deprecated def test_get_key_policy_via_arn(): - conn = boto.kms.connect_to_region('us-west-2') + conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy='my policy', - description='my key1', key_usage='ENCRYPT_DECRYPT') - policy = conn.get_key_policy(key['KeyMetadata']['Arn'], 'default') + key = conn.create_key(policy="my policy", description="my key1", key_usage="ENCRYPT_DECRYPT") + policy = conn.get_key_policy(key["KeyMetadata"]["Arn"], "default") - policy['Policy'].should.equal('my policy') + policy["Policy"].should.equal("my policy") @mock_kms_deprecated def test_put_key_policy(): - conn = boto.kms.connect_to_region('us-west-2') + conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy='my policy', - description='my key1', key_usage='ENCRYPT_DECRYPT') - key_id = key['KeyMetadata']['KeyId'] + key = conn.create_key(policy="my policy", description="my key1", key_usage="ENCRYPT_DECRYPT") + key_id = key["KeyMetadata"]["KeyId"] - conn.put_key_policy(key_id, 'default', 'new policy') - policy = conn.get_key_policy(key_id, 'default') - policy['Policy'].should.equal('new policy') + conn.put_key_policy(key_id, "default", "new policy") + policy = conn.get_key_policy(key_id, "default") + policy["Policy"].should.equal("new policy") @mock_kms_deprecated def test_put_key_policy_via_arn(): - conn = boto.kms.connect_to_region('us-west-2') + conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy='my policy', - description='my key1', key_usage='ENCRYPT_DECRYPT') - key_id = key['KeyMetadata']['Arn'] + key = conn.create_key(policy="my policy", description="my key1", key_usage="ENCRYPT_DECRYPT") + key_id = key["KeyMetadata"]["Arn"] - conn.put_key_policy(key_id, 'default', 'new policy') - policy = conn.get_key_policy(key_id, 'default') - policy['Policy'].should.equal('new policy') + conn.put_key_policy(key_id, "default", "new policy") + policy = conn.get_key_policy(key_id, "default") + policy["Policy"].should.equal("new policy") @mock_kms_deprecated def test_put_key_policy_via_alias_should_not_update(): - conn = boto.kms.connect_to_region('us-west-2') + conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy='my policy', - description='my key1', key_usage='ENCRYPT_DECRYPT') - conn.create_alias(alias_name='alias/my-key-alias', - target_key_id=key['KeyMetadata']['KeyId']) + key = conn.create_key(policy="my policy", description="my key1", key_usage="ENCRYPT_DECRYPT") + conn.create_alias(alias_name="alias/my-key-alias", target_key_id=key["KeyMetadata"]["KeyId"]) - conn.put_key_policy.when.called_with( - 'alias/my-key-alias', 'default', 'new policy').should.throw(NotFoundException) + conn.put_key_policy.when.called_with("alias/my-key-alias", "default", "new policy").should.throw(NotFoundException) - policy = conn.get_key_policy(key['KeyMetadata']['KeyId'], 'default') - policy['Policy'].should.equal('my policy') + policy = conn.get_key_policy(key["KeyMetadata"]["KeyId"], "default") + policy["Policy"].should.equal("my policy") @mock_kms_deprecated def test_put_key_policy(): - conn = boto.kms.connect_to_region('us-west-2') + conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy='my policy', - description='my key1', key_usage='ENCRYPT_DECRYPT') - conn.put_key_policy(key['KeyMetadata']['Arn'], 'default', 'new policy') + key = conn.create_key(policy="my policy", description="my key1", key_usage="ENCRYPT_DECRYPT") + conn.put_key_policy(key["KeyMetadata"]["Arn"], "default", "new policy") - policy = conn.get_key_policy(key['KeyMetadata']['KeyId'], 'default') - policy['Policy'].should.equal('new policy') + policy = conn.get_key_policy(key["KeyMetadata"]["KeyId"], "default") + policy["Policy"].should.equal("new policy") @mock_kms_deprecated def test_list_key_policies(): - conn = boto.kms.connect_to_region('us-west-2') + conn = boto.kms.connect_to_region("us-west-2") - key = conn.create_key(policy='my policy', - description='my key1', key_usage='ENCRYPT_DECRYPT') - key_id = key['KeyMetadata']['KeyId'] + key = conn.create_key(policy="my policy", description="my key1", key_usage="ENCRYPT_DECRYPT") + key_id = key["KeyMetadata"]["KeyId"] policies = conn.list_key_policies(key_id) - policies['PolicyNames'].should.equal(['default']) + policies["PolicyNames"].should.equal(["default"]) @mock_kms_deprecated def test__create_alias__returns_none_if_correct(): kms = boto.connect_kms() create_resp = kms.create_key() - key_id = create_resp['KeyMetadata']['KeyId'] + key_id = create_resp["KeyMetadata"]["KeyId"] - resp = kms.create_alias('alias/my-alias', key_id) + resp = kms.create_alias("alias/my-alias", key_id) resp.should.be.none @@ -336,14 +371,9 @@ def test__create_alias__returns_none_if_correct(): def test__create_alias__raises_if_reserved_alias(): kms = boto.connect_kms() create_resp = kms.create_key() - key_id = create_resp['KeyMetadata']['KeyId'] + key_id = create_resp["KeyMetadata"]["KeyId"] - reserved_aliases = [ - 'alias/aws/ebs', - 'alias/aws/s3', - 'alias/aws/redshift', - 'alias/aws/rds', - ] + reserved_aliases = ["alias/aws/ebs", "alias/aws/s3", "alias/aws/redshift", "alias/aws/rds"] for alias_name in reserved_aliases: with assert_raises(JSONResponseError) as err: @@ -351,9 +381,9 @@ def test__create_alias__raises_if_reserved_alias(): ex = err.exception ex.error_message.should.be.none - ex.error_code.should.equal('NotAuthorizedException') - ex.body.should.equal({'__type': 'NotAuthorizedException'}) - ex.reason.should.equal('Bad Request') + ex.error_code.should.equal("NotAuthorizedException") + ex.body.should.equal({"__type": "NotAuthorizedException"}) + ex.reason.should.equal("Bad Request") ex.status.should.equal(400) @@ -361,38 +391,37 @@ def test__create_alias__raises_if_reserved_alias(): def test__create_alias__can_create_multiple_aliases_for_same_key_id(): kms = boto.connect_kms() create_resp = kms.create_key() - key_id = create_resp['KeyMetadata']['KeyId'] + key_id = create_resp["KeyMetadata"]["KeyId"] - kms.create_alias('alias/my-alias3', key_id).should.be.none - kms.create_alias('alias/my-alias4', key_id).should.be.none - kms.create_alias('alias/my-alias5', key_id).should.be.none + kms.create_alias("alias/my-alias3", key_id).should.be.none + kms.create_alias("alias/my-alias4", key_id).should.be.none + kms.create_alias("alias/my-alias5", key_id).should.be.none @mock_kms_deprecated def test__create_alias__raises_if_wrong_prefix(): kms = boto.connect_kms() create_resp = kms.create_key() - key_id = create_resp['KeyMetadata']['KeyId'] + key_id = create_resp["KeyMetadata"]["KeyId"] with assert_raises(JSONResponseError) as err: - kms.create_alias('wrongprefix/my-alias', key_id) + kms.create_alias("wrongprefix/my-alias", key_id) ex = err.exception - ex.error_message.should.equal('Invalid identifier') - ex.error_code.should.equal('ValidationException') - ex.body.should.equal({'message': 'Invalid identifier', - '__type': 'ValidationException'}) - ex.reason.should.equal('Bad Request') + ex.error_message.should.equal("Invalid identifier") + ex.error_code.should.equal("ValidationException") + ex.body.should.equal({"message": "Invalid identifier", "__type": "ValidationException"}) + ex.reason.should.equal("Bad Request") ex.status.should.equal(400) @mock_kms_deprecated def test__create_alias__raises_if_duplicate(): - region = 'us-west-2' + region = "us-west-2" kms = boto.kms.connect_to_region(region) create_resp = kms.create_key() - key_id = create_resp['KeyMetadata']['KeyId'] - alias = 'alias/my-alias' + key_id = create_resp["KeyMetadata"]["KeyId"] + alias = "alias/my-alias" kms.create_alias(alias, key_id) @@ -400,15 +429,17 @@ def test__create_alias__raises_if_duplicate(): kms.create_alias(alias, key_id) ex = err.exception - ex.error_message.should.match(r'An alias with the name arn:aws:kms:{region}:\d{{12}}:{alias} already exists' - .format(**locals())) + ex.error_message.should.match( + r"An alias with the name arn:aws:kms:{region}:\d{{12}}:{alias} already exists".format(**locals()) + ) ex.error_code.should.be.none ex.box_usage.should.be.none ex.request_id.should.be.none - ex.body['message'].should.match(r'An alias with the name arn:aws:kms:{region}:\d{{12}}:{alias} already exists' - .format(**locals())) - ex.body['__type'].should.equal('AlreadyExistsException') - ex.reason.should.equal('Bad Request') + ex.body["message"].should.match( + r"An alias with the name arn:aws:kms:{region}:\d{{12}}:{alias} already exists".format(**locals()) + ) + ex.body["__type"].should.equal("AlreadyExistsException") + ex.reason.should.equal("Bad Request") ex.status.should.equal(400) @@ -416,25 +447,27 @@ def test__create_alias__raises_if_duplicate(): def test__create_alias__raises_if_alias_has_restricted_characters(): kms = boto.connect_kms() create_resp = kms.create_key() - key_id = create_resp['KeyMetadata']['KeyId'] + key_id = create_resp["KeyMetadata"]["KeyId"] - alias_names_with_restricted_characters = [ - 'alias/my-alias!', - 'alias/my-alias$', - 'alias/my-alias@', - ] + alias_names_with_restricted_characters = ["alias/my-alias!", "alias/my-alias$", "alias/my-alias@"] for alias_name in alias_names_with_restricted_characters: with assert_raises(JSONResponseError) as err: kms.create_alias(alias_name, key_id) ex = err.exception - ex.body['__type'].should.equal('ValidationException') - ex.body['message'].should.equal( - "1 validation error detected: Value '{alias_name}' at 'aliasName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9:/_-]+$".format(**locals())) - ex.error_code.should.equal('ValidationException') + ex.body["__type"].should.equal("ValidationException") + ex.body["message"].should.equal( + "1 validation error detected: Value '{alias_name}' at 'aliasName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9:/_-]+$".format( + **locals() + ) + ) + ex.error_code.should.equal("ValidationException") ex.message.should.equal( - "1 validation error detected: Value '{alias_name}' at 'aliasName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9:/_-]+$".format(**locals())) - ex.reason.should.equal('Bad Request') + "1 validation error detected: Value '{alias_name}' at 'aliasName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9:/_-]+$".format( + **locals() + ) + ) + ex.reason.should.equal("Bad Request") ex.status.should.equal(400) @@ -444,23 +477,19 @@ def test__create_alias__raises_if_alias_has_colon_character(): # are accepted by regex ^[a-zA-Z0-9:/_-]+$ kms = boto.connect_kms() create_resp = kms.create_key() - key_id = create_resp['KeyMetadata']['KeyId'] + key_id = create_resp["KeyMetadata"]["KeyId"] - alias_names_with_restricted_characters = [ - 'alias/my:alias', - ] + alias_names_with_restricted_characters = ["alias/my:alias"] for alias_name in alias_names_with_restricted_characters: with assert_raises(JSONResponseError) as err: kms.create_alias(alias_name, key_id) ex = err.exception - ex.body['__type'].should.equal('ValidationException') - ex.body['message'].should.equal( - "{alias_name} contains invalid characters for an alias".format(**locals())) - ex.error_code.should.equal('ValidationException') - ex.message.should.equal( - "{alias_name} contains invalid characters for an alias".format(**locals())) - ex.reason.should.equal('Bad Request') + ex.body["__type"].should.equal("ValidationException") + ex.body["message"].should.equal("{alias_name} contains invalid characters for an alias".format(**locals())) + ex.error_code.should.equal("ValidationException") + ex.message.should.equal("{alias_name} contains invalid characters for an alias".format(**locals())) + ex.reason.should.equal("Bad Request") ex.status.should.equal(400) @@ -468,12 +497,9 @@ def test__create_alias__raises_if_alias_has_colon_character(): def test__create_alias__accepted_characters(): kms = boto.connect_kms() create_resp = kms.create_key() - key_id = create_resp['KeyMetadata']['KeyId'] + key_id = create_resp["KeyMetadata"]["KeyId"] - alias_names_with_accepted_characters = [ - 'alias/my-alias_/', - 'alias/my_alias-/', - ] + alias_names_with_accepted_characters = ["alias/my-alias_/", "alias/my_alias-/"] for alias_name in alias_names_with_accepted_characters: kms.create_alias(alias_name, key_id) @@ -483,8 +509,8 @@ def test__create_alias__accepted_characters(): def test__create_alias__raises_if_target_key_id_is_existing_alias(): kms = boto.connect_kms() create_resp = kms.create_key() - key_id = create_resp['KeyMetadata']['KeyId'] - alias = 'alias/my-alias' + key_id = create_resp["KeyMetadata"]["KeyId"] + alias = "alias/my-alias" kms.create_alias(alias, key_id) @@ -492,11 +518,11 @@ def test__create_alias__raises_if_target_key_id_is_existing_alias(): kms.create_alias(alias, alias) ex = err.exception - ex.body['__type'].should.equal('ValidationException') - ex.body['message'].should.equal('Aliases must refer to keys. Not aliases') - ex.error_code.should.equal('ValidationException') - ex.message.should.equal('Aliases must refer to keys. Not aliases') - ex.reason.should.equal('Bad Request') + ex.body["__type"].should.equal("ValidationException") + ex.body["message"].should.equal("Aliases must refer to keys. Not aliases") + ex.error_code.should.equal("ValidationException") + ex.message.should.equal("Aliases must refer to keys. Not aliases") + ex.reason.should.equal("Bad Request") ex.status.should.equal(400) @@ -504,14 +530,14 @@ def test__create_alias__raises_if_target_key_id_is_existing_alias(): def test__delete_alias(): kms = boto.connect_kms() create_resp = kms.create_key() - key_id = create_resp['KeyMetadata']['KeyId'] - alias = 'alias/my-alias' + key_id = create_resp["KeyMetadata"]["KeyId"] + alias = "alias/my-alias" # added another alias here to make sure that the deletion of the alias can # be done when there are multiple existing aliases. another_create_resp = kms.create_key() - another_key_id = create_resp['KeyMetadata']['KeyId'] - another_alias = 'alias/another-alias' + another_key_id = create_resp["KeyMetadata"]["KeyId"] + another_alias = "alias/another-alias" kms.create_alias(alias, key_id) kms.create_alias(another_alias, another_key_id) @@ -529,35 +555,35 @@ def test__delete_alias__raises_if_wrong_prefix(): kms = boto.connect_kms() with assert_raises(JSONResponseError) as err: - kms.delete_alias('wrongprefix/my-alias') + kms.delete_alias("wrongprefix/my-alias") ex = err.exception - ex.body['__type'].should.equal('ValidationException') - ex.body['message'].should.equal('Invalid identifier') - ex.error_code.should.equal('ValidationException') - ex.message.should.equal('Invalid identifier') - ex.reason.should.equal('Bad Request') + ex.body["__type"].should.equal("ValidationException") + ex.body["message"].should.equal("Invalid identifier") + ex.error_code.should.equal("ValidationException") + ex.message.should.equal("Invalid identifier") + ex.reason.should.equal("Bad Request") ex.status.should.equal(400) @mock_kms_deprecated def test__delete_alias__raises_if_alias_is_not_found(): - region = 'us-west-2' + region = "us-west-2" kms = boto.kms.connect_to_region(region) - alias_name = 'alias/unexisting-alias' + alias_name = "alias/unexisting-alias" with assert_raises(NotFoundException) as err: kms.delete_alias(alias_name) ex = err.exception - ex.body['__type'].should.equal('NotFoundException') - ex.body['message'].should.match( - r'Alias arn:aws:kms:{region}:\d{{12}}:{alias_name} is not found.'.format(**locals())) + ex.body["__type"].should.equal("NotFoundException") + ex.body["message"].should.match( + r"Alias arn:aws:kms:{region}:\d{{12}}:{alias_name} is not found.".format(**locals()) + ) ex.box_usage.should.be.none ex.error_code.should.be.none - ex.message.should.match( - r'Alias arn:aws:kms:{region}:\d{{12}}:{alias_name} is not found.'.format(**locals())) - ex.reason.should.equal('Bad Request') + ex.message.should.match(r"Alias arn:aws:kms:{region}:\d{{12}}:{alias_name} is not found.".format(**locals())) + ex.reason.should.equal("Bad Request") ex.request_id.should.be.none ex.status.should.equal(400) @@ -568,39 +594,43 @@ def test__list_aliases(): kms = boto.kms.connect_to_region(region) create_resp = kms.create_key() - key_id = create_resp['KeyMetadata']['KeyId'] - kms.create_alias('alias/my-alias1', key_id) - kms.create_alias('alias/my-alias2', key_id) - kms.create_alias('alias/my-alias3', key_id) + key_id = create_resp["KeyMetadata"]["KeyId"] + kms.create_alias("alias/my-alias1", key_id) + kms.create_alias("alias/my-alias2", key_id) + kms.create_alias("alias/my-alias3", key_id) resp = kms.list_aliases() - resp['Truncated'].should.be.false + resp["Truncated"].should.be.false - aliases = resp['Aliases'] + aliases = resp["Aliases"] def has_correct_arn(alias_obj): - alias_name = alias_obj['AliasName'] - alias_arn = alias_obj['AliasArn'] - return re.match(r'arn:aws:kms:{region}:\d{{12}}:{alias_name}'.format(region=region, alias_name=alias_name), - alias_arn) + alias_name = alias_obj["AliasName"] + alias_arn = alias_obj["AliasArn"] + return re.match( + r"arn:aws:kms:{region}:\d{{12}}:{alias_name}".format(region=region, alias_name=alias_name), alias_arn + ) - len([alias for alias in aliases if - has_correct_arn(alias) and 'alias/aws/ebs' == alias['AliasName']]).should.equal(1) - len([alias for alias in aliases if - has_correct_arn(alias) and 'alias/aws/rds' == alias['AliasName']]).should.equal(1) - len([alias for alias in aliases if - has_correct_arn(alias) and 'alias/aws/redshift' == alias['AliasName']]).should.equal(1) - len([alias for alias in aliases if - has_correct_arn(alias) and 'alias/aws/s3' == alias['AliasName']]).should.equal(1) + len([alias for alias in aliases if has_correct_arn(alias) and "alias/aws/ebs" == alias["AliasName"]]).should.equal( + 1 + ) + len([alias for alias in aliases if has_correct_arn(alias) and "alias/aws/rds" == alias["AliasName"]]).should.equal( + 1 + ) + len( + [alias for alias in aliases if has_correct_arn(alias) and "alias/aws/redshift" == alias["AliasName"]] + ).should.equal(1) + len([alias for alias in aliases if has_correct_arn(alias) and "alias/aws/s3" == alias["AliasName"]]).should.equal(1) - len([alias for alias in aliases if - has_correct_arn(alias) and 'alias/my-alias1' == alias['AliasName']]).should.equal(1) - len([alias for alias in aliases if - has_correct_arn(alias) and 'alias/my-alias2' == alias['AliasName']]).should.equal(1) + len( + [alias for alias in aliases if has_correct_arn(alias) and "alias/my-alias1" == alias["AliasName"]] + ).should.equal(1) + len( + [alias for alias in aliases if has_correct_arn(alias) and "alias/my-alias2" == alias["AliasName"]] + ).should.equal(1) - len([alias for alias in aliases if 'TargetKeyId' in alias and key_id == - alias['TargetKeyId']]).should.equal(3) + len([alias for alias in aliases if "TargetKeyId" in alias and key_id == alias["TargetKeyId"]]).should.equal(3) len(aliases).should.equal(7) @@ -610,156 +640,124 @@ def test__assert_valid_key_id(): from moto.kms.responses import _assert_valid_key_id import uuid - _assert_valid_key_id.when.called_with( - "not-a-key").should.throw(MotoNotFoundException) - _assert_valid_key_id.when.called_with( - str(uuid.uuid4())).should_not.throw(MotoNotFoundException) + _assert_valid_key_id.when.called_with("not-a-key").should.throw(MotoNotFoundException) + _assert_valid_key_id.when.called_with(str(uuid.uuid4())).should_not.throw(MotoNotFoundException) @mock_kms_deprecated def test__assert_default_policy(): from moto.kms.responses import _assert_default_policy - _assert_default_policy.when.called_with( - "not-default").should.throw(MotoNotFoundException) - _assert_default_policy.when.called_with( - "default").should_not.throw(MotoNotFoundException) + _assert_default_policy.when.called_with("not-default").should.throw(MotoNotFoundException) + _assert_default_policy.when.called_with("default").should_not.throw(MotoNotFoundException) +@parameterized(PLAINTEXT_VECTORS) @mock_kms -def test_kms_encrypt_boto3(): - client = boto3.client('kms', region_name='us-east-1') - response = client.encrypt(KeyId='foo', Plaintext=b'bar') +def test_kms_encrypt_boto3(plaintext): + client = boto3.client("kms", region_name="us-east-1") + key = client.create_key(Description="key") + response = client.encrypt(KeyId=key["KeyMetadata"]["KeyId"], Plaintext=plaintext) - response = client.decrypt(CiphertextBlob=response['CiphertextBlob']) - response['Plaintext'].should.equal(b'bar') + response = client.decrypt(CiphertextBlob=response["CiphertextBlob"]) + response["Plaintext"].should.equal(plaintext) @mock_kms def test_disable_key(): - client = boto3.client('kms', region_name='us-east-1') - key = client.create_key(Description='disable-key') - client.disable_key( - KeyId=key['KeyMetadata']['KeyId'] - ) + client = boto3.client("kms", region_name="us-east-1") + key = client.create_key(Description="disable-key") + client.disable_key(KeyId=key["KeyMetadata"]["KeyId"]) - result = client.describe_key(KeyId=key['KeyMetadata']['KeyId']) + result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) assert result["KeyMetadata"]["Enabled"] == False - assert result["KeyMetadata"]["KeyState"] == 'Disabled' + assert result["KeyMetadata"]["KeyState"] == "Disabled" @mock_kms def test_enable_key(): - client = boto3.client('kms', region_name='us-east-1') - key = client.create_key(Description='enable-key') - client.disable_key( - KeyId=key['KeyMetadata']['KeyId'] - ) - client.enable_key( - KeyId=key['KeyMetadata']['KeyId'] - ) + client = boto3.client("kms", region_name="us-east-1") + key = client.create_key(Description="enable-key") + client.disable_key(KeyId=key["KeyMetadata"]["KeyId"]) + client.enable_key(KeyId=key["KeyMetadata"]["KeyId"]) - result = client.describe_key(KeyId=key['KeyMetadata']['KeyId']) + result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) assert result["KeyMetadata"]["Enabled"] == True - assert result["KeyMetadata"]["KeyState"] == 'Enabled' + assert result["KeyMetadata"]["KeyState"] == "Enabled" @mock_kms def test_schedule_key_deletion(): - client = boto3.client('kms', region_name='us-east-1') - key = client.create_key(Description='schedule-key-deletion') - if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'false': + client = boto3.client("kms", region_name="us-east-1") + key = client.create_key(Description="schedule-key-deletion") + if os.environ.get("TEST_SERVER_MODE", "false").lower() == "false": with freeze_time("2015-01-01 12:00:00"): - response = client.schedule_key_deletion( - KeyId=key['KeyMetadata']['KeyId'] - ) - assert response['KeyId'] == key['KeyMetadata']['KeyId'] - assert response['DeletionDate'] == datetime(2015, 1, 31, 12, 0, tzinfo=tzutc()) + response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) + assert response["KeyId"] == key["KeyMetadata"]["KeyId"] + assert response["DeletionDate"] == datetime(2015, 1, 31, 12, 0, tzinfo=tzutc()) else: # Can't manipulate time in server mode - response = client.schedule_key_deletion( - KeyId=key['KeyMetadata']['KeyId'] - ) - assert response['KeyId'] == key['KeyMetadata']['KeyId'] + response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) + assert response["KeyId"] == key["KeyMetadata"]["KeyId"] - result = client.describe_key(KeyId=key['KeyMetadata']['KeyId']) + result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) assert result["KeyMetadata"]["Enabled"] == False - assert result["KeyMetadata"]["KeyState"] == 'PendingDeletion' - assert 'DeletionDate' in result["KeyMetadata"] + assert result["KeyMetadata"]["KeyState"] == "PendingDeletion" + assert "DeletionDate" in result["KeyMetadata"] @mock_kms def test_schedule_key_deletion_custom(): - client = boto3.client('kms', region_name='us-east-1') - key = client.create_key(Description='schedule-key-deletion') - if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'false': + client = boto3.client("kms", region_name="us-east-1") + key = client.create_key(Description="schedule-key-deletion") + if os.environ.get("TEST_SERVER_MODE", "false").lower() == "false": with freeze_time("2015-01-01 12:00:00"): - response = client.schedule_key_deletion( - KeyId=key['KeyMetadata']['KeyId'], - PendingWindowInDays=7 - ) - assert response['KeyId'] == key['KeyMetadata']['KeyId'] - assert response['DeletionDate'] == datetime(2015, 1, 8, 12, 0, tzinfo=tzutc()) + response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"], PendingWindowInDays=7) + assert response["KeyId"] == key["KeyMetadata"]["KeyId"] + assert response["DeletionDate"] == datetime(2015, 1, 8, 12, 0, tzinfo=tzutc()) else: # Can't manipulate time in server mode - response = client.schedule_key_deletion( - KeyId=key['KeyMetadata']['KeyId'], - PendingWindowInDays=7 - ) - assert response['KeyId'] == key['KeyMetadata']['KeyId'] + response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"], PendingWindowInDays=7) + assert response["KeyId"] == key["KeyMetadata"]["KeyId"] - result = client.describe_key(KeyId=key['KeyMetadata']['KeyId']) + result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) assert result["KeyMetadata"]["Enabled"] == False - assert result["KeyMetadata"]["KeyState"] == 'PendingDeletion' - assert 'DeletionDate' in result["KeyMetadata"] + assert result["KeyMetadata"]["KeyState"] == "PendingDeletion" + assert "DeletionDate" in result["KeyMetadata"] @mock_kms def test_cancel_key_deletion(): - client = boto3.client('kms', region_name='us-east-1') - key = client.create_key(Description='cancel-key-deletion') - client.schedule_key_deletion( - KeyId=key['KeyMetadata']['KeyId'] - ) - response = client.cancel_key_deletion( - KeyId=key['KeyMetadata']['KeyId'] - ) - assert response['KeyId'] == key['KeyMetadata']['KeyId'] + client = boto3.client("kms", region_name="us-east-1") + key = client.create_key(Description="cancel-key-deletion") + client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) + response = client.cancel_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) + assert response["KeyId"] == key["KeyMetadata"]["KeyId"] - result = client.describe_key(KeyId=key['KeyMetadata']['KeyId']) + result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) assert result["KeyMetadata"]["Enabled"] == False - assert result["KeyMetadata"]["KeyState"] == 'Disabled' - assert 'DeletionDate' not in result["KeyMetadata"] + assert result["KeyMetadata"]["KeyState"] == "Disabled" + assert "DeletionDate" not in result["KeyMetadata"] @mock_kms def test_update_key_description(): - client = boto3.client('kms', region_name='us-east-1') - key = client.create_key(Description='old_description') - key_id = key['KeyMetadata']['KeyId'] + client = boto3.client("kms", region_name="us-east-1") + key = client.create_key(Description="old_description") + key_id = key["KeyMetadata"]["KeyId"] - result = client.update_key_description(KeyId=key_id, Description='new_description') - assert 'ResponseMetadata' in result + result = client.update_key_description(KeyId=key_id, Description="new_description") + assert "ResponseMetadata" in result @mock_kms def test_tag_resource(): - client = boto3.client('kms', region_name='us-east-1') - key = client.create_key(Description='cancel-key-deletion') - response = client.schedule_key_deletion( - KeyId=key['KeyMetadata']['KeyId'] - ) + client = boto3.client("kms", region_name="us-east-1") + key = client.create_key(Description="cancel-key-deletion") + response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) - keyid = response['KeyId'] - response = client.tag_resource( - KeyId=keyid, - Tags=[ - { - 'TagKey': 'string', - 'TagValue': 'string' - }, - ] - ) + keyid = response["KeyId"] + response = client.tag_resource(KeyId=keyid, Tags=[{"TagKey": "string", "TagValue": "string"}]) # Shouldn't have any data, just header assert len(response.keys()) == 1 @@ -767,226 +765,158 @@ def test_tag_resource(): @mock_kms def test_list_resource_tags(): - client = boto3.client('kms', region_name='us-east-1') - key = client.create_key(Description='cancel-key-deletion') - response = client.schedule_key_deletion( - KeyId=key['KeyMetadata']['KeyId'] - ) + client = boto3.client("kms", region_name="us-east-1") + key = client.create_key(Description="cancel-key-deletion") + response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) - keyid = response['KeyId'] - response = client.tag_resource( - KeyId=keyid, - Tags=[ - { - 'TagKey': 'string', - 'TagValue': 'string' - }, - ] - ) + keyid = response["KeyId"] + response = client.tag_resource(KeyId=keyid, Tags=[{"TagKey": "string", "TagValue": "string"}]) response = client.list_resource_tags(KeyId=keyid) - assert response['Tags'][0]['TagKey'] == 'string' - assert response['Tags'][0]['TagValue'] == 'string' + assert response["Tags"][0]["TagKey"] == "string" + assert response["Tags"][0]["TagValue"] == "string" @mock_kms def test_generate_data_key_sizes(): - client = boto3.client('kms', region_name='us-east-1') - key = client.create_key(Description='generate-data-key-size') + client = boto3.client("kms", region_name="us-east-1") + key = client.create_key(Description="generate-data-key-size") - resp1 = client.generate_data_key( - KeyId=key['KeyMetadata']['KeyId'], - KeySpec='AES_256' - ) - resp2 = client.generate_data_key( - KeyId=key['KeyMetadata']['KeyId'], - KeySpec='AES_128' - ) - resp3 = client.generate_data_key( - KeyId=key['KeyMetadata']['KeyId'], - NumberOfBytes=64 - ) + resp1 = client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], KeySpec="AES_256") + resp2 = client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], KeySpec="AES_128") + resp3 = client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], NumberOfBytes=64) - assert len(resp1['Plaintext']) == 32 - assert len(resp2['Plaintext']) == 16 - assert len(resp3['Plaintext']) == 64 + assert len(resp1["Plaintext"]) == 32 + assert len(resp2["Plaintext"]) == 16 + assert len(resp3["Plaintext"]) == 64 @mock_kms def test_generate_data_key_decrypt(): - client = boto3.client('kms', region_name='us-east-1') - key = client.create_key(Description='generate-data-key-decrypt') + client = boto3.client("kms", region_name="us-east-1") + key = client.create_key(Description="generate-data-key-decrypt") - resp1 = client.generate_data_key( - KeyId=key['KeyMetadata']['KeyId'], - KeySpec='AES_256' - ) - resp2 = client.decrypt( - CiphertextBlob=resp1['CiphertextBlob'] - ) + resp1 = client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], KeySpec="AES_256") + resp2 = client.decrypt(CiphertextBlob=resp1["CiphertextBlob"]) - assert resp1['Plaintext'] == resp2['Plaintext'] + assert resp1["Plaintext"] == resp2["Plaintext"] @mock_kms def test_generate_data_key_invalid_size_params(): - client = boto3.client('kms', region_name='us-east-1') - key = client.create_key(Description='generate-data-key-size') + client = boto3.client("kms", region_name="us-east-1") + key = client.create_key(Description="generate-data-key-size") with assert_raises(botocore.exceptions.ClientError) as err: - client.generate_data_key( - KeyId=key['KeyMetadata']['KeyId'], - KeySpec='AES_257' - ) + client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], KeySpec="AES_257") with assert_raises(botocore.exceptions.ClientError) as err: - client.generate_data_key( - KeyId=key['KeyMetadata']['KeyId'], - KeySpec='AES_128', - NumberOfBytes=16 - ) + client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], KeySpec="AES_128", NumberOfBytes=16) with assert_raises(botocore.exceptions.ClientError) as err: - client.generate_data_key( - KeyId=key['KeyMetadata']['KeyId'], - NumberOfBytes=2048 - ) + client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], NumberOfBytes=2048) with assert_raises(botocore.exceptions.ClientError) as err: - client.generate_data_key( - KeyId=key['KeyMetadata']['KeyId'] - ) + client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"]) @mock_kms def test_generate_data_key_invalid_key(): - client = boto3.client('kms', region_name='us-east-1') - key = client.create_key(Description='generate-data-key-size') + client = boto3.client("kms", region_name="us-east-1") + key = client.create_key(Description="generate-data-key-size") with assert_raises(client.exceptions.NotFoundException): - client.generate_data_key( - KeyId='alias/randomnonexistantkey', - KeySpec='AES_256' - ) + client.generate_data_key(KeyId="alias/randomnonexistantkey", KeySpec="AES_256") with assert_raises(client.exceptions.NotFoundException): - client.generate_data_key( - KeyId=key['KeyMetadata']['KeyId'] + '4', - KeySpec='AES_256' - ) + client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"] + "4", KeySpec="AES_256") @mock_kms def test_generate_data_key_without_plaintext_decrypt(): - client = boto3.client('kms', region_name='us-east-1') - key = client.create_key(Description='generate-data-key-decrypt') + client = boto3.client("kms", region_name="us-east-1") + key = client.create_key(Description="generate-data-key-decrypt") - resp1 = client.generate_data_key_without_plaintext( - KeyId=key['KeyMetadata']['KeyId'], - KeySpec='AES_256' - ) + resp1 = client.generate_data_key_without_plaintext(KeyId=key["KeyMetadata"]["KeyId"], KeySpec="AES_256") - assert 'Plaintext' not in resp1 + assert "Plaintext" not in resp1 @mock_kms def test_enable_key_rotation_key_not_found(): - client = boto3.client('kms', region_name='us-east-1') + client = boto3.client("kms", region_name="us-east-1") with assert_raises(client.exceptions.NotFoundException): - client.enable_key_rotation( - KeyId='12366f9b-1230-123d-123e-123e6ae60c02' - ) + client.enable_key_rotation(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") @mock_kms def test_disable_key_rotation_key_not_found(): - client = boto3.client('kms', region_name='us-east-1') + client = boto3.client("kms", region_name="us-east-1") with assert_raises(client.exceptions.NotFoundException): - client.disable_key_rotation( - KeyId='12366f9b-1230-123d-123e-123e6ae60c02' - ) + client.disable_key_rotation(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") @mock_kms def test_enable_key_key_not_found(): - client = boto3.client('kms', region_name='us-east-1') + client = boto3.client("kms", region_name="us-east-1") with assert_raises(client.exceptions.NotFoundException): - client.enable_key( - KeyId='12366f9b-1230-123d-123e-123e6ae60c02' - ) + client.enable_key(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") @mock_kms def test_disable_key_key_not_found(): - client = boto3.client('kms', region_name='us-east-1') + client = boto3.client("kms", region_name="us-east-1") with assert_raises(client.exceptions.NotFoundException): - client.disable_key( - KeyId='12366f9b-1230-123d-123e-123e6ae60c02' - ) + client.disable_key(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") @mock_kms def test_cancel_key_deletion_key_not_found(): - client = boto3.client('kms', region_name='us-east-1') + client = boto3.client("kms", region_name="us-east-1") with assert_raises(client.exceptions.NotFoundException): - client.cancel_key_deletion( - KeyId='12366f9b-1230-123d-123e-123e6ae60c02' - ) + client.cancel_key_deletion(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") @mock_kms def test_schedule_key_deletion_key_not_found(): - client = boto3.client('kms', region_name='us-east-1') + client = boto3.client("kms", region_name="us-east-1") with assert_raises(client.exceptions.NotFoundException): - client.schedule_key_deletion( - KeyId='12366f9b-1230-123d-123e-123e6ae60c02' - ) + client.schedule_key_deletion(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") @mock_kms def test_get_key_rotation_status_key_not_found(): - client = boto3.client('kms', region_name='us-east-1') + client = boto3.client("kms", region_name="us-east-1") with assert_raises(client.exceptions.NotFoundException): - client.get_key_rotation_status( - KeyId='12366f9b-1230-123d-123e-123e6ae60c02' - ) + client.get_key_rotation_status(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") @mock_kms def test_get_key_policy_key_not_found(): - client = boto3.client('kms', region_name='us-east-1') + client = boto3.client("kms", region_name="us-east-1") with assert_raises(client.exceptions.NotFoundException): - client.get_key_policy( - KeyId='12366f9b-1230-123d-123e-123e6ae60c02', - PolicyName='default' - ) + client.get_key_policy(KeyId="12366f9b-1230-123d-123e-123e6ae60c02", PolicyName="default") @mock_kms def test_list_key_policies_key_not_found(): - client = boto3.client('kms', region_name='us-east-1') + client = boto3.client("kms", region_name="us-east-1") with assert_raises(client.exceptions.NotFoundException): - client.list_key_policies( - KeyId='12366f9b-1230-123d-123e-123e6ae60c02' - ) + client.list_key_policies(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") @mock_kms def test_put_key_policy_key_not_found(): - client = boto3.client('kms', region_name='us-east-1') + client = boto3.client("kms", region_name="us-east-1") with assert_raises(client.exceptions.NotFoundException): - client.put_key_policy( - KeyId='00000000-0000-0000-0000-000000000000', - PolicyName='default', - Policy='new policy' - ) + client.put_key_policy(KeyId="00000000-0000-0000-0000-000000000000", PolicyName="default", Policy="new policy") diff --git a/tests/test_kms/test_utils.py b/tests/test_kms/test_utils.py index 466c72ea9..73d7d3580 100644 --- a/tests/test_kms/test_utils.py +++ b/tests/test_kms/test_utils.py @@ -1,5 +1,6 @@ from __future__ import unicode_literals +import sure # noqa from nose.tools import assert_raises from parameterized import parameterized @@ -104,26 +105,23 @@ def test_encrypt_decrypt_cycle(encryption_context): def test_encrypt_unknown_key_id(): - assert_raises( - NotFoundException, encrypt, master_keys={}, key_id="anything", plaintext=b"secrets", encryption_context={} - ) + with assert_raises(NotFoundException): + encrypt(master_keys={}, key_id="anything", plaintext=b"secrets", encryption_context={}) def test_decrypt_invalid_ciphertext_format(): master_key = Key("nop", "nop", "nop", [], "nop") master_key_map = {master_key.id: master_key} - assert_raises( - InvalidCiphertextException, decrypt, master_keys=master_key_map, ciphertext_blob=b"", encryption_context={} - ) + with assert_raises(InvalidCiphertextException): + decrypt(master_keys=master_key_map, ciphertext_blob=b"", encryption_context={}) def test_decrypt_unknwown_key_id(): ciphertext_blob = b"d25652e4-d2d2-49f7-929a-671ccda580c6" b"123456789012" b"1234567890123456" b"some ciphertext" - assert_raises( - AccessDeniedException, decrypt, master_keys={}, ciphertext_blob=ciphertext_blob, encryption_context={} - ) + with assert_raises(AccessDeniedException): + decrypt(master_keys={}, ciphertext_blob=ciphertext_blob, encryption_context={}) def test_decrypt_invalid_ciphertext(): @@ -131,13 +129,12 @@ def test_decrypt_invalid_ciphertext(): master_key_map = {master_key.id: master_key} ciphertext_blob = master_key.id.encode("utf-8") + b"123456789012" b"1234567890123456" b"some ciphertext" - assert_raises( - InvalidCiphertextException, - decrypt, - master_keys=master_key_map, - ciphertext_blob=ciphertext_blob, - encryption_context={}, - ) + with assert_raises(InvalidCiphertextException): + decrypt( + master_keys=master_key_map, + ciphertext_blob=ciphertext_blob, + encryption_context={}, + ) def test_decrypt_invalid_encryption_context(): @@ -152,10 +149,9 @@ def test_decrypt_invalid_encryption_context(): encryption_context={"some": "encryption", "context": "here"}, ) - assert_raises( - InvalidCiphertextException, - decrypt, - master_keys=master_key_map, - ciphertext_blob=ciphertext_blob, - encryption_context={}, - ) + with assert_raises(InvalidCiphertextException): + decrypt( + master_keys=master_key_map, + ciphertext_blob=ciphertext_blob, + encryption_context={}, + )