Merge pull request #2734 from gruebel/fix-kms-create-key
Fix kms create key
This commit is contained in:
		
						commit
						32dc8f9fe7
					
				| @ -7,25 +7,33 @@ from datetime import datetime, timedelta | ||||
| from boto3 import Session | ||||
| 
 | ||||
| from moto.core import BaseBackend, BaseModel | ||||
| from moto.core.utils import iso_8601_datetime_without_milliseconds | ||||
| from moto.core.utils import unix_time | ||||
| 
 | ||||
| from moto.iam.models import ACCOUNT_ID | ||||
| 
 | ||||
| from .utils import decrypt, encrypt, generate_key_id, generate_master_key | ||||
| 
 | ||||
| 
 | ||||
| class Key(BaseModel): | ||||
|     def __init__(self, policy, key_usage, description, tags, region): | ||||
|     def __init__( | ||||
|         self, policy, key_usage, customer_master_key_spec, description, tags, region | ||||
|     ): | ||||
|         self.id = generate_key_id() | ||||
|         self.creation_date = unix_time() | ||||
|         self.policy = policy | ||||
|         self.key_usage = key_usage | ||||
|         self.key_state = "Enabled" | ||||
|         self.description = description | ||||
|         self.enabled = True | ||||
|         self.region = region | ||||
|         self.account_id = "012345678912" | ||||
|         self.account_id = ACCOUNT_ID | ||||
|         self.key_rotation_status = False | ||||
|         self.deletion_date = None | ||||
|         self.tags = tags or {} | ||||
|         self.key_material = generate_master_key() | ||||
|         self.origin = "AWS_KMS" | ||||
|         self.key_manager = "CUSTOMER" | ||||
|         self.customer_master_key_spec = customer_master_key_spec or "SYMMETRIC_DEFAULT" | ||||
| 
 | ||||
|     @property | ||||
|     def physical_resource_id(self): | ||||
| @ -37,23 +45,55 @@ class Key(BaseModel): | ||||
|             self.region, self.account_id, self.id | ||||
|         ) | ||||
| 
 | ||||
|     @property | ||||
|     def encryption_algorithms(self): | ||||
|         if self.key_usage == "SIGN_VERIFY": | ||||
|             return None | ||||
|         elif self.customer_master_key_spec == "SYMMETRIC_DEFAULT": | ||||
|             return ["SYMMETRIC_DEFAULT"] | ||||
|         else: | ||||
|             return ["RSAES_OAEP_SHA_1", "RSAES_OAEP_SHA_256"] | ||||
| 
 | ||||
|     @property | ||||
|     def signing_algorithms(self): | ||||
|         if self.key_usage == "ENCRYPT_DECRYPT": | ||||
|             return None | ||||
|         elif self.customer_master_key_spec in ["ECC_NIST_P256", "ECC_SECG_P256K1"]: | ||||
|             return ["ECDSA_SHA_256"] | ||||
|         elif self.customer_master_key_spec == "ECC_NIST_P384": | ||||
|             return ["ECDSA_SHA_384"] | ||||
|         elif self.customer_master_key_spec == "ECC_NIST_P521": | ||||
|             return ["ECDSA_SHA_512"] | ||||
|         else: | ||||
|             return [ | ||||
|                 "RSASSA_PKCS1_V1_5_SHA_256", | ||||
|                 "RSASSA_PKCS1_V1_5_SHA_384", | ||||
|                 "RSASSA_PKCS1_V1_5_SHA_512", | ||||
|                 "RSASSA_PSS_SHA_256", | ||||
|                 "RSASSA_PSS_SHA_384", | ||||
|                 "RSASSA_PSS_SHA_512", | ||||
|             ] | ||||
| 
 | ||||
|     def to_dict(self): | ||||
|         key_dict = { | ||||
|             "KeyMetadata": { | ||||
|                 "AWSAccountId": self.account_id, | ||||
|                 "Arn": self.arn, | ||||
|                 "CreationDate": iso_8601_datetime_without_milliseconds(datetime.now()), | ||||
|                 "CreationDate": self.creation_date, | ||||
|                 "CustomerMasterKeySpec": self.customer_master_key_spec, | ||||
|                 "Description": self.description, | ||||
|                 "Enabled": self.enabled, | ||||
|                 "EncryptionAlgorithms": self.encryption_algorithms, | ||||
|                 "KeyId": self.id, | ||||
|                 "KeyManager": self.key_manager, | ||||
|                 "KeyUsage": self.key_usage, | ||||
|                 "KeyState": self.key_state, | ||||
|                 "Origin": self.origin, | ||||
|                 "SigningAlgorithms": self.signing_algorithms, | ||||
|             } | ||||
|         } | ||||
|         if self.key_state == "PendingDeletion": | ||||
|             key_dict["KeyMetadata"][ | ||||
|                 "DeletionDate" | ||||
|             ] = iso_8601_datetime_without_milliseconds(self.deletion_date) | ||||
|             key_dict["KeyMetadata"]["DeletionDate"] = unix_time(self.deletion_date) | ||||
|         return key_dict | ||||
| 
 | ||||
|     def delete(self, region_name): | ||||
| @ -69,6 +109,7 @@ class Key(BaseModel): | ||||
|         key = kms_backend.create_key( | ||||
|             policy=properties["KeyPolicy"], | ||||
|             key_usage="ENCRYPT_DECRYPT", | ||||
|             customer_master_key_spec="SYMMETRIC_DEFAULT", | ||||
|             description=properties["Description"], | ||||
|             tags=properties.get("Tags"), | ||||
|             region=region_name, | ||||
| @ -90,8 +131,12 @@ class KmsBackend(BaseBackend): | ||||
|         self.keys = {} | ||||
|         self.key_to_aliases = defaultdict(set) | ||||
| 
 | ||||
|     def create_key(self, policy, key_usage, description, tags, region): | ||||
|         key = Key(policy, key_usage, description, tags, region) | ||||
|     def create_key( | ||||
|         self, policy, key_usage, customer_master_key_spec, description, tags, region | ||||
|     ): | ||||
|         key = Key( | ||||
|             policy, key_usage, customer_master_key_spec, description, tags, region | ||||
|         ) | ||||
|         self.keys[key.id] = key | ||||
|         return key | ||||
| 
 | ||||
| @ -215,9 +260,7 @@ class KmsBackend(BaseBackend): | ||||
|             self.keys[key_id].deletion_date = datetime.now() + timedelta( | ||||
|                 days=pending_window_in_days | ||||
|             ) | ||||
|             return iso_8601_datetime_without_milliseconds( | ||||
|                 self.keys[key_id].deletion_date | ||||
|             ) | ||||
|             return unix_time(self.keys[key_id].deletion_date) | ||||
| 
 | ||||
|     def encrypt(self, key_id, plaintext, encryption_context): | ||||
|         key_id = self.any_id_to_key_id(key_id) | ||||
|  | ||||
| @ -118,11 +118,12 @@ class KmsResponse(BaseResponse): | ||||
|         """https://docs.aws.amazon.com/kms/latest/APIReference/API_CreateKey.html""" | ||||
|         policy = self.parameters.get("Policy") | ||||
|         key_usage = self.parameters.get("KeyUsage") | ||||
|         customer_master_key_spec = self.parameters.get("CustomerMasterKeySpec") | ||||
|         description = self.parameters.get("Description") | ||||
|         tags = self.parameters.get("Tags") | ||||
| 
 | ||||
|         key = self.kms_backend.create_key( | ||||
|             policy, key_usage, description, tags, self.region | ||||
|             policy, key_usage, customer_master_key_spec, description, tags, self.region | ||||
|         ) | ||||
|         return json.dumps(key.to_dict()) | ||||
| 
 | ||||
|  | ||||
| @ -1,25 +1,18 @@ | ||||
| # -*- coding: utf-8 -*- | ||||
| from __future__ import unicode_literals | ||||
| from datetime import date | ||||
| from datetime import datetime | ||||
| from dateutil.tz import tzutc | ||||
| import base64 | ||||
| import os | ||||
| import re | ||||
| 
 | ||||
| import boto3 | ||||
| import boto.kms | ||||
| import botocore.exceptions | ||||
| import six | ||||
| import sure  # noqa | ||||
| from boto.exception import JSONResponseError | ||||
| from boto.kms.exceptions import AlreadyExistsException, NotFoundException | ||||
| from freezegun import freeze_time | ||||
| from nose.tools import assert_raises | ||||
| from parameterized import parameterized | ||||
| 
 | ||||
| from moto.kms.exceptions import NotFoundException as MotoNotFoundException | ||||
| from moto import mock_kms, mock_kms_deprecated | ||||
| from moto import mock_kms_deprecated | ||||
| 
 | ||||
| PLAINTEXT_VECTORS = ( | ||||
|     (b"some encodeable plaintext",), | ||||
| @ -35,23 +28,6 @@ def _get_encoded_value(plaintext): | ||||
|     return plaintext.encode("utf-8") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_create_key(): | ||||
|     conn = boto3.client("kms", region_name="us-east-1") | ||||
|     with freeze_time("2015-01-01 00:00:00"): | ||||
|         key = conn.create_key( | ||||
|             Policy="my policy", | ||||
|             Description="my key", | ||||
|             KeyUsage="ENCRYPT_DECRYPT", | ||||
|             Tags=[{"TagKey": "project", "TagValue": "moto"}], | ||||
|         ) | ||||
| 
 | ||||
|         key["KeyMetadata"]["Description"].should.equal("my key") | ||||
|         key["KeyMetadata"]["KeyUsage"].should.equal("ENCRYPT_DECRYPT") | ||||
|         key["KeyMetadata"]["Enabled"].should.equal(True) | ||||
|         key["KeyMetadata"]["CreationDate"].should.be.a(date) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms_deprecated | ||||
| def test_describe_key(): | ||||
|     conn = boto.kms.connect_to_region("us-west-2") | ||||
| @ -96,22 +72,6 @@ def test_describe_key_via_alias_not_found(): | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized( | ||||
|     ( | ||||
|         ("alias/does-not-exist",), | ||||
|         ("arn:aws:kms:us-east-1:012345678912:alias/does-not-exist",), | ||||
|         ("invalid",), | ||||
|     ) | ||||
| ) | ||||
| @mock_kms | ||||
| def test_describe_key_via_alias_invalid_alias(key_id): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     client.create_key(Description="key") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.describe_key(KeyId=key_id) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms_deprecated | ||||
| def test_describe_key_via_arn(): | ||||
|     conn = boto.kms.connect_to_region("us-west-2") | ||||
| @ -239,71 +199,6 @@ def test_generate_data_key(): | ||||
|     response["KeyId"].should.equal(key_arn) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_boto3_generate_data_key(): | ||||
|     kms = boto3.client("kms", region_name="us-west-2") | ||||
| 
 | ||||
|     key = kms.create_key() | ||||
|     key_id = key["KeyMetadata"]["KeyId"] | ||||
|     key_arn = key["KeyMetadata"]["Arn"] | ||||
| 
 | ||||
|     response = kms.generate_data_key(KeyId=key_id, NumberOfBytes=32) | ||||
| 
 | ||||
|     # CiphertextBlob must NOT be base64-encoded | ||||
|     with assert_raises(Exception): | ||||
|         base64.b64decode(response["CiphertextBlob"], validate=True) | ||||
|     # Plaintext must NOT be base64-encoded | ||||
|     with assert_raises(Exception): | ||||
|         base64.b64decode(response["Plaintext"], validate=True) | ||||
| 
 | ||||
|     response["KeyId"].should.equal(key_arn) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized(PLAINTEXT_VECTORS) | ||||
| @mock_kms | ||||
| def test_encrypt(plaintext): | ||||
|     client = boto3.client("kms", region_name="us-west-2") | ||||
| 
 | ||||
|     key = client.create_key(Description="key") | ||||
|     key_id = key["KeyMetadata"]["KeyId"] | ||||
|     key_arn = key["KeyMetadata"]["Arn"] | ||||
| 
 | ||||
|     response = client.encrypt(KeyId=key_id, Plaintext=plaintext) | ||||
|     response["CiphertextBlob"].should_not.equal(plaintext) | ||||
| 
 | ||||
|     # CiphertextBlob must NOT be base64-encoded | ||||
|     with assert_raises(Exception): | ||||
|         base64.b64decode(response["CiphertextBlob"], validate=True) | ||||
| 
 | ||||
|     response["KeyId"].should.equal(key_arn) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized(PLAINTEXT_VECTORS) | ||||
| @mock_kms | ||||
| def test_decrypt(plaintext): | ||||
|     client = boto3.client("kms", region_name="us-west-2") | ||||
| 
 | ||||
|     key = client.create_key(Description="key") | ||||
|     key_id = key["KeyMetadata"]["KeyId"] | ||||
|     key_arn = key["KeyMetadata"]["Arn"] | ||||
| 
 | ||||
|     encrypt_response = client.encrypt(KeyId=key_id, Plaintext=plaintext) | ||||
| 
 | ||||
|     client.create_key(Description="key") | ||||
|     # CiphertextBlob must NOT be base64-encoded | ||||
|     with assert_raises(Exception): | ||||
|         base64.b64decode(encrypt_response["CiphertextBlob"], validate=True) | ||||
| 
 | ||||
|     decrypt_response = client.decrypt(CiphertextBlob=encrypt_response["CiphertextBlob"]) | ||||
| 
 | ||||
|     # Plaintext must NOT be base64-encoded | ||||
|     with assert_raises(Exception): | ||||
|         base64.b64decode(decrypt_response["Plaintext"], validate=True) | ||||
| 
 | ||||
|     decrypt_response["Plaintext"].should.equal(_get_encoded_value(plaintext)) | ||||
|     decrypt_response["KeyId"].should.equal(key_arn) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms_deprecated | ||||
| def test_disable_key_rotation_with_missing_key(): | ||||
|     conn = boto.kms.connect_to_region("us-west-2") | ||||
| @ -774,25 +669,6 @@ def test__list_aliases(): | ||||
|     len(aliases).should.equal(7) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized( | ||||
|     ( | ||||
|         ("not-a-uuid",), | ||||
|         ("alias/DoesNotExist",), | ||||
|         ("arn:aws:kms:us-east-1:012345678912:alias/DoesNotExist",), | ||||
|         ("d25652e4-d2d2-49f7-929a-671ccda580c6",), | ||||
|         ( | ||||
|             "arn:aws:kms:us-east-1:012345678912:key/d25652e4-d2d2-49f7-929a-671ccda580c6", | ||||
|         ), | ||||
|     ) | ||||
| ) | ||||
| @mock_kms | ||||
| def test_invalid_key_ids(key_id): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.generate_data_key(KeyId=key_id, NumberOfBytes=5) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms_deprecated | ||||
| def test__assert_default_policy(): | ||||
|     from moto.kms.responses import _assert_default_policy | ||||
| @ -803,421 +679,3 @@ def test__assert_default_policy(): | ||||
|     _assert_default_policy.when.called_with("default").should_not.throw( | ||||
|         MotoNotFoundException | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized(PLAINTEXT_VECTORS) | ||||
| @mock_kms | ||||
| def test_kms_encrypt_boto3(plaintext): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="key") | ||||
|     response = client.encrypt(KeyId=key["KeyMetadata"]["KeyId"], Plaintext=plaintext) | ||||
| 
 | ||||
|     response = client.decrypt(CiphertextBlob=response["CiphertextBlob"]) | ||||
|     response["Plaintext"].should.equal(_get_encoded_value(plaintext)) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_disable_key(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="disable-key") | ||||
|     client.disable_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
| 
 | ||||
|     result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     assert result["KeyMetadata"]["Enabled"] == False | ||||
|     assert result["KeyMetadata"]["KeyState"] == "Disabled" | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_enable_key(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="enable-key") | ||||
|     client.disable_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     client.enable_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
| 
 | ||||
|     result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     assert result["KeyMetadata"]["Enabled"] == True | ||||
|     assert result["KeyMetadata"]["KeyState"] == "Enabled" | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_schedule_key_deletion(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="schedule-key-deletion") | ||||
|     if os.environ.get("TEST_SERVER_MODE", "false").lower() == "false": | ||||
|         with freeze_time("2015-01-01 12:00:00"): | ||||
|             response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|             assert response["KeyId"] == key["KeyMetadata"]["KeyId"] | ||||
|             assert response["DeletionDate"] == datetime( | ||||
|                 2015, 1, 31, 12, 0, tzinfo=tzutc() | ||||
|             ) | ||||
|     else: | ||||
|         # Can't manipulate time in server mode | ||||
|         response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|         assert response["KeyId"] == key["KeyMetadata"]["KeyId"] | ||||
| 
 | ||||
|     result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     assert result["KeyMetadata"]["Enabled"] == False | ||||
|     assert result["KeyMetadata"]["KeyState"] == "PendingDeletion" | ||||
|     assert "DeletionDate" in result["KeyMetadata"] | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_schedule_key_deletion_custom(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="schedule-key-deletion") | ||||
|     if os.environ.get("TEST_SERVER_MODE", "false").lower() == "false": | ||||
|         with freeze_time("2015-01-01 12:00:00"): | ||||
|             response = client.schedule_key_deletion( | ||||
|                 KeyId=key["KeyMetadata"]["KeyId"], PendingWindowInDays=7 | ||||
|             ) | ||||
|             assert response["KeyId"] == key["KeyMetadata"]["KeyId"] | ||||
|             assert response["DeletionDate"] == datetime( | ||||
|                 2015, 1, 8, 12, 0, tzinfo=tzutc() | ||||
|             ) | ||||
|     else: | ||||
|         # Can't manipulate time in server mode | ||||
|         response = client.schedule_key_deletion( | ||||
|             KeyId=key["KeyMetadata"]["KeyId"], PendingWindowInDays=7 | ||||
|         ) | ||||
|         assert response["KeyId"] == key["KeyMetadata"]["KeyId"] | ||||
| 
 | ||||
|     result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     assert result["KeyMetadata"]["Enabled"] == False | ||||
|     assert result["KeyMetadata"]["KeyState"] == "PendingDeletion" | ||||
|     assert "DeletionDate" in result["KeyMetadata"] | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_cancel_key_deletion(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="cancel-key-deletion") | ||||
|     client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     response = client.cancel_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     assert response["KeyId"] == key["KeyMetadata"]["KeyId"] | ||||
| 
 | ||||
|     result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     assert result["KeyMetadata"]["Enabled"] == False | ||||
|     assert result["KeyMetadata"]["KeyState"] == "Disabled" | ||||
|     assert "DeletionDate" not in result["KeyMetadata"] | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_update_key_description(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="old_description") | ||||
|     key_id = key["KeyMetadata"]["KeyId"] | ||||
| 
 | ||||
|     result = client.update_key_description(KeyId=key_id, Description="new_description") | ||||
|     assert "ResponseMetadata" in result | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_tag_resource(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="cancel-key-deletion") | ||||
|     response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
| 
 | ||||
|     keyid = response["KeyId"] | ||||
|     response = client.tag_resource( | ||||
|         KeyId=keyid, Tags=[{"TagKey": "string", "TagValue": "string"}] | ||||
|     ) | ||||
| 
 | ||||
|     # Shouldn't have any data, just header | ||||
|     assert len(response.keys()) == 1 | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_list_resource_tags(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="cancel-key-deletion") | ||||
|     response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
| 
 | ||||
|     keyid = response["KeyId"] | ||||
|     response = client.tag_resource( | ||||
|         KeyId=keyid, Tags=[{"TagKey": "string", "TagValue": "string"}] | ||||
|     ) | ||||
| 
 | ||||
|     response = client.list_resource_tags(KeyId=keyid) | ||||
|     assert response["Tags"][0]["TagKey"] == "string" | ||||
|     assert response["Tags"][0]["TagValue"] == "string" | ||||
| 
 | ||||
| 
 | ||||
| @parameterized( | ||||
|     ( | ||||
|         (dict(KeySpec="AES_256"), 32), | ||||
|         (dict(KeySpec="AES_128"), 16), | ||||
|         (dict(NumberOfBytes=64), 64), | ||||
|         (dict(NumberOfBytes=1), 1), | ||||
|         (dict(NumberOfBytes=1024), 1024), | ||||
|     ) | ||||
| ) | ||||
| @mock_kms | ||||
| def test_generate_data_key_sizes(kwargs, expected_key_length): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="generate-data-key-size") | ||||
| 
 | ||||
|     response = client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], **kwargs) | ||||
| 
 | ||||
|     assert len(response["Plaintext"]) == expected_key_length | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_generate_data_key_decrypt(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="generate-data-key-decrypt") | ||||
| 
 | ||||
|     resp1 = client.generate_data_key( | ||||
|         KeyId=key["KeyMetadata"]["KeyId"], KeySpec="AES_256" | ||||
|     ) | ||||
|     resp2 = client.decrypt(CiphertextBlob=resp1["CiphertextBlob"]) | ||||
| 
 | ||||
|     assert resp1["Plaintext"] == resp2["Plaintext"] | ||||
| 
 | ||||
| 
 | ||||
| @parameterized( | ||||
|     ( | ||||
|         (dict(KeySpec="AES_257"),), | ||||
|         (dict(KeySpec="AES_128", NumberOfBytes=16),), | ||||
|         (dict(NumberOfBytes=2048),), | ||||
|         (dict(NumberOfBytes=0),), | ||||
|         (dict(),), | ||||
|     ) | ||||
| ) | ||||
| @mock_kms | ||||
| def test_generate_data_key_invalid_size_params(kwargs): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="generate-data-key-size") | ||||
| 
 | ||||
|     with assert_raises( | ||||
|         (botocore.exceptions.ClientError, botocore.exceptions.ParamValidationError) | ||||
|     ) as err: | ||||
|         client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized( | ||||
|     ( | ||||
|         ("alias/DoesNotExist",), | ||||
|         ("arn:aws:kms:us-east-1:012345678912:alias/DoesNotExist",), | ||||
|         ("d25652e4-d2d2-49f7-929a-671ccda580c6",), | ||||
|         ( | ||||
|             "arn:aws:kms:us-east-1:012345678912:key/d25652e4-d2d2-49f7-929a-671ccda580c6", | ||||
|         ), | ||||
|     ) | ||||
| ) | ||||
| @mock_kms | ||||
| def test_generate_data_key_invalid_key(key_id): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.generate_data_key(KeyId=key_id, KeySpec="AES_256") | ||||
| 
 | ||||
| 
 | ||||
| @parameterized( | ||||
|     ( | ||||
|         ("alias/DoesExist", False), | ||||
|         ("arn:aws:kms:us-east-1:012345678912:alias/DoesExist", False), | ||||
|         ("", True), | ||||
|         ("arn:aws:kms:us-east-1:012345678912:key/", True), | ||||
|     ) | ||||
| ) | ||||
| @mock_kms | ||||
| def test_generate_data_key_all_valid_key_ids(prefix, append_key_id): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key() | ||||
|     key_id = key["KeyMetadata"]["KeyId"] | ||||
|     client.create_alias(AliasName="alias/DoesExist", TargetKeyId=key_id) | ||||
| 
 | ||||
|     target_id = prefix | ||||
|     if append_key_id: | ||||
|         target_id += key_id | ||||
| 
 | ||||
|     client.generate_data_key(KeyId=key_id, NumberOfBytes=32) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_generate_data_key_without_plaintext_decrypt(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="generate-data-key-decrypt") | ||||
| 
 | ||||
|     resp1 = client.generate_data_key_without_plaintext( | ||||
|         KeyId=key["KeyMetadata"]["KeyId"], KeySpec="AES_256" | ||||
|     ) | ||||
| 
 | ||||
|     assert "Plaintext" not in resp1 | ||||
| 
 | ||||
| 
 | ||||
| @parameterized(PLAINTEXT_VECTORS) | ||||
| @mock_kms | ||||
| def test_re_encrypt_decrypt(plaintext): | ||||
|     client = boto3.client("kms", region_name="us-west-2") | ||||
| 
 | ||||
|     key_1 = client.create_key(Description="key 1") | ||||
|     key_1_id = key_1["KeyMetadata"]["KeyId"] | ||||
|     key_1_arn = key_1["KeyMetadata"]["Arn"] | ||||
|     key_2 = client.create_key(Description="key 2") | ||||
|     key_2_id = key_2["KeyMetadata"]["KeyId"] | ||||
|     key_2_arn = key_2["KeyMetadata"]["Arn"] | ||||
| 
 | ||||
|     encrypt_response = client.encrypt( | ||||
|         KeyId=key_1_id, Plaintext=plaintext, EncryptionContext={"encryption": "context"} | ||||
|     ) | ||||
| 
 | ||||
|     re_encrypt_response = client.re_encrypt( | ||||
|         CiphertextBlob=encrypt_response["CiphertextBlob"], | ||||
|         SourceEncryptionContext={"encryption": "context"}, | ||||
|         DestinationKeyId=key_2_id, | ||||
|         DestinationEncryptionContext={"another": "context"}, | ||||
|     ) | ||||
| 
 | ||||
|     # CiphertextBlob must NOT be base64-encoded | ||||
|     with assert_raises(Exception): | ||||
|         base64.b64decode(re_encrypt_response["CiphertextBlob"], validate=True) | ||||
| 
 | ||||
|     re_encrypt_response["SourceKeyId"].should.equal(key_1_arn) | ||||
|     re_encrypt_response["KeyId"].should.equal(key_2_arn) | ||||
| 
 | ||||
|     decrypt_response_1 = client.decrypt( | ||||
|         CiphertextBlob=encrypt_response["CiphertextBlob"], | ||||
|         EncryptionContext={"encryption": "context"}, | ||||
|     ) | ||||
|     decrypt_response_1["Plaintext"].should.equal(_get_encoded_value(plaintext)) | ||||
|     decrypt_response_1["KeyId"].should.equal(key_1_arn) | ||||
| 
 | ||||
|     decrypt_response_2 = client.decrypt( | ||||
|         CiphertextBlob=re_encrypt_response["CiphertextBlob"], | ||||
|         EncryptionContext={"another": "context"}, | ||||
|     ) | ||||
|     decrypt_response_2["Plaintext"].should.equal(_get_encoded_value(plaintext)) | ||||
|     decrypt_response_2["KeyId"].should.equal(key_2_arn) | ||||
| 
 | ||||
|     decrypt_response_1["Plaintext"].should.equal(decrypt_response_2["Plaintext"]) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_re_encrypt_to_invalid_destination(): | ||||
|     client = boto3.client("kms", region_name="us-west-2") | ||||
| 
 | ||||
|     key = client.create_key(Description="key 1") | ||||
|     key_id = key["KeyMetadata"]["KeyId"] | ||||
| 
 | ||||
|     encrypt_response = client.encrypt(KeyId=key_id, Plaintext=b"some plaintext") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.re_encrypt( | ||||
|             CiphertextBlob=encrypt_response["CiphertextBlob"], | ||||
|             DestinationKeyId="alias/DoesNotExist", | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized(((12,), (44,), (91,), (1,), (1024,))) | ||||
| @mock_kms | ||||
| def test_generate_random(number_of_bytes): | ||||
|     client = boto3.client("kms", region_name="us-west-2") | ||||
| 
 | ||||
|     response = client.generate_random(NumberOfBytes=number_of_bytes) | ||||
| 
 | ||||
|     response["Plaintext"].should.be.a(bytes) | ||||
|     len(response["Plaintext"]).should.equal(number_of_bytes) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized( | ||||
|     ( | ||||
|         (2048, botocore.exceptions.ClientError), | ||||
|         (1025, botocore.exceptions.ClientError), | ||||
|         (0, botocore.exceptions.ParamValidationError), | ||||
|         (-1, botocore.exceptions.ParamValidationError), | ||||
|         (-1024, botocore.exceptions.ParamValidationError), | ||||
|     ) | ||||
| ) | ||||
| @mock_kms | ||||
| def test_generate_random_invalid_number_of_bytes(number_of_bytes, error_type): | ||||
|     client = boto3.client("kms", region_name="us-west-2") | ||||
| 
 | ||||
|     with assert_raises(error_type): | ||||
|         client.generate_random(NumberOfBytes=number_of_bytes) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_enable_key_rotation_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.enable_key_rotation(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_disable_key_rotation_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.disable_key_rotation(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_enable_key_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.enable_key(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_disable_key_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.disable_key(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_cancel_key_deletion_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.cancel_key_deletion(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_schedule_key_deletion_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.schedule_key_deletion(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_get_key_rotation_status_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.get_key_rotation_status(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_get_key_policy_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.get_key_policy( | ||||
|             KeyId="12366f9b-1230-123d-123e-123e6ae60c02", PolicyName="default" | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_list_key_policies_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.list_key_policies(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_put_key_policy_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.put_key_policy( | ||||
|             KeyId="00000000-0000-0000-0000-000000000000", | ||||
|             PolicyName="default", | ||||
|             Policy="new policy", | ||||
|         ) | ||||
|  | ||||
							
								
								
									
										638
									
								
								tests/test_kms/test_kms_boto3.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										638
									
								
								tests/test_kms/test_kms_boto3.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,638 @@ | ||||
| # -*- coding: utf-8 -*- | ||||
| from __future__ import unicode_literals | ||||
| from datetime import datetime | ||||
| from dateutil.tz import tzutc | ||||
| import base64 | ||||
| import os | ||||
| 
 | ||||
| import boto3 | ||||
| import botocore.exceptions | ||||
| import six | ||||
| import sure  # noqa | ||||
| from freezegun import freeze_time | ||||
| from nose.tools import assert_raises | ||||
| from parameterized import parameterized | ||||
| 
 | ||||
| from moto import mock_kms | ||||
| 
 | ||||
| PLAINTEXT_VECTORS = ( | ||||
|     (b"some encodeable plaintext",), | ||||
|     (b"some unencodeable plaintext \xec\x8a\xcf\xb6r\xe9\xb5\xeb\xff\xa23\x16",), | ||||
|     ("some unicode characters ø˚∆øˆˆ∆ßçøˆˆçßøˆ¨¥",), | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| def _get_encoded_value(plaintext): | ||||
|     if isinstance(plaintext, six.binary_type): | ||||
|         return plaintext | ||||
| 
 | ||||
|     return plaintext.encode("utf-8") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_create_key(): | ||||
|     conn = boto3.client("kms", region_name="us-east-1") | ||||
|     key = conn.create_key( | ||||
|         Policy="my policy", | ||||
|         Description="my key", | ||||
|         KeyUsage="ENCRYPT_DECRYPT", | ||||
|         Tags=[{"TagKey": "project", "TagValue": "moto"}], | ||||
|     ) | ||||
| 
 | ||||
|     key["KeyMetadata"]["Arn"].should.equal( | ||||
|         "arn:aws:kms:us-east-1:123456789012:key/{}".format(key["KeyMetadata"]["KeyId"]) | ||||
|     ) | ||||
|     key["KeyMetadata"]["AWSAccountId"].should.equal("123456789012") | ||||
|     key["KeyMetadata"]["CreationDate"].should.be.a(datetime) | ||||
|     key["KeyMetadata"]["CustomerMasterKeySpec"].should.equal("SYMMETRIC_DEFAULT") | ||||
|     key["KeyMetadata"]["Description"].should.equal("my key") | ||||
|     key["KeyMetadata"]["Enabled"].should.be.ok | ||||
|     key["KeyMetadata"]["EncryptionAlgorithms"].should.equal(["SYMMETRIC_DEFAULT"]) | ||||
|     key["KeyMetadata"]["KeyId"].should_not.be.empty | ||||
|     key["KeyMetadata"]["KeyManager"].should.equal("CUSTOMER") | ||||
|     key["KeyMetadata"]["KeyState"].should.equal("Enabled") | ||||
|     key["KeyMetadata"]["KeyUsage"].should.equal("ENCRYPT_DECRYPT") | ||||
|     key["KeyMetadata"]["Origin"].should.equal("AWS_KMS") | ||||
|     key["KeyMetadata"].should_not.have.key("SigningAlgorithms") | ||||
| 
 | ||||
|     key = conn.create_key(KeyUsage="ENCRYPT_DECRYPT", CustomerMasterKeySpec="RSA_2048",) | ||||
| 
 | ||||
|     sorted(key["KeyMetadata"]["EncryptionAlgorithms"]).should.equal( | ||||
|         ["RSAES_OAEP_SHA_1", "RSAES_OAEP_SHA_256"] | ||||
|     ) | ||||
|     key["KeyMetadata"].should_not.have.key("SigningAlgorithms") | ||||
| 
 | ||||
|     key = conn.create_key(KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="RSA_2048",) | ||||
| 
 | ||||
|     key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms") | ||||
|     sorted(key["KeyMetadata"]["SigningAlgorithms"]).should.equal( | ||||
|         [ | ||||
|             "RSASSA_PKCS1_V1_5_SHA_256", | ||||
|             "RSASSA_PKCS1_V1_5_SHA_384", | ||||
|             "RSASSA_PKCS1_V1_5_SHA_512", | ||||
|             "RSASSA_PSS_SHA_256", | ||||
|             "RSASSA_PSS_SHA_384", | ||||
|             "RSASSA_PSS_SHA_512", | ||||
|         ] | ||||
|     ) | ||||
| 
 | ||||
|     key = conn.create_key( | ||||
|         KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_SECG_P256K1", | ||||
|     ) | ||||
| 
 | ||||
|     key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms") | ||||
|     key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_256"]) | ||||
| 
 | ||||
|     key = conn.create_key( | ||||
|         KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_NIST_P384", | ||||
|     ) | ||||
| 
 | ||||
|     key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms") | ||||
|     key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_384"]) | ||||
| 
 | ||||
|     key = conn.create_key( | ||||
|         KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_NIST_P521", | ||||
|     ) | ||||
| 
 | ||||
|     key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms") | ||||
|     key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_512"]) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_describe_key(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     response = client.create_key(Description="my key", KeyUsage="ENCRYPT_DECRYPT",) | ||||
|     key_id = response["KeyMetadata"]["KeyId"] | ||||
| 
 | ||||
|     response = client.describe_key(KeyId=key_id) | ||||
| 
 | ||||
|     response["KeyMetadata"]["AWSAccountId"].should.equal("123456789012") | ||||
|     response["KeyMetadata"]["CreationDate"].should.be.a(datetime) | ||||
|     response["KeyMetadata"]["CustomerMasterKeySpec"].should.equal("SYMMETRIC_DEFAULT") | ||||
|     response["KeyMetadata"]["Description"].should.equal("my key") | ||||
|     response["KeyMetadata"]["Enabled"].should.be.ok | ||||
|     response["KeyMetadata"]["EncryptionAlgorithms"].should.equal(["SYMMETRIC_DEFAULT"]) | ||||
|     response["KeyMetadata"]["KeyId"].should_not.be.empty | ||||
|     response["KeyMetadata"]["KeyManager"].should.equal("CUSTOMER") | ||||
|     response["KeyMetadata"]["KeyState"].should.equal("Enabled") | ||||
|     response["KeyMetadata"]["KeyUsage"].should.equal("ENCRYPT_DECRYPT") | ||||
|     response["KeyMetadata"]["Origin"].should.equal("AWS_KMS") | ||||
|     response["KeyMetadata"].should_not.have.key("SigningAlgorithms") | ||||
| 
 | ||||
| 
 | ||||
| @parameterized( | ||||
|     ( | ||||
|         ("alias/does-not-exist",), | ||||
|         ("arn:aws:kms:us-east-1:012345678912:alias/does-not-exist",), | ||||
|         ("invalid",), | ||||
|     ) | ||||
| ) | ||||
| @mock_kms | ||||
| def test_describe_key_via_alias_invalid_alias(key_id): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     client.create_key(Description="key") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.describe_key(KeyId=key_id) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_generate_data_key(): | ||||
|     kms = boto3.client("kms", region_name="us-west-2") | ||||
| 
 | ||||
|     key = kms.create_key() | ||||
|     key_id = key["KeyMetadata"]["KeyId"] | ||||
|     key_arn = key["KeyMetadata"]["Arn"] | ||||
| 
 | ||||
|     response = kms.generate_data_key(KeyId=key_id, NumberOfBytes=32) | ||||
| 
 | ||||
|     # CiphertextBlob must NOT be base64-encoded | ||||
|     with assert_raises(Exception): | ||||
|         base64.b64decode(response["CiphertextBlob"], validate=True) | ||||
|     # Plaintext must NOT be base64-encoded | ||||
|     with assert_raises(Exception): | ||||
|         base64.b64decode(response["Plaintext"], validate=True) | ||||
| 
 | ||||
|     response["KeyId"].should.equal(key_arn) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized(PLAINTEXT_VECTORS) | ||||
| @mock_kms | ||||
| def test_encrypt(plaintext): | ||||
|     client = boto3.client("kms", region_name="us-west-2") | ||||
| 
 | ||||
|     key = client.create_key(Description="key") | ||||
|     key_id = key["KeyMetadata"]["KeyId"] | ||||
|     key_arn = key["KeyMetadata"]["Arn"] | ||||
| 
 | ||||
|     response = client.encrypt(KeyId=key_id, Plaintext=plaintext) | ||||
|     response["CiphertextBlob"].should_not.equal(plaintext) | ||||
| 
 | ||||
|     # CiphertextBlob must NOT be base64-encoded | ||||
|     with assert_raises(Exception): | ||||
|         base64.b64decode(response["CiphertextBlob"], validate=True) | ||||
| 
 | ||||
|     response["KeyId"].should.equal(key_arn) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized(PLAINTEXT_VECTORS) | ||||
| @mock_kms | ||||
| def test_decrypt(plaintext): | ||||
|     client = boto3.client("kms", region_name="us-west-2") | ||||
| 
 | ||||
|     key = client.create_key(Description="key") | ||||
|     key_id = key["KeyMetadata"]["KeyId"] | ||||
|     key_arn = key["KeyMetadata"]["Arn"] | ||||
| 
 | ||||
|     encrypt_response = client.encrypt(KeyId=key_id, Plaintext=plaintext) | ||||
| 
 | ||||
|     client.create_key(Description="key") | ||||
|     # CiphertextBlob must NOT be base64-encoded | ||||
|     with assert_raises(Exception): | ||||
|         base64.b64decode(encrypt_response["CiphertextBlob"], validate=True) | ||||
| 
 | ||||
|     decrypt_response = client.decrypt(CiphertextBlob=encrypt_response["CiphertextBlob"]) | ||||
| 
 | ||||
|     # Plaintext must NOT be base64-encoded | ||||
|     with assert_raises(Exception): | ||||
|         base64.b64decode(decrypt_response["Plaintext"], validate=True) | ||||
| 
 | ||||
|     decrypt_response["Plaintext"].should.equal(_get_encoded_value(plaintext)) | ||||
|     decrypt_response["KeyId"].should.equal(key_arn) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized( | ||||
|     ( | ||||
|         ("not-a-uuid",), | ||||
|         ("alias/DoesNotExist",), | ||||
|         ("arn:aws:kms:us-east-1:012345678912:alias/DoesNotExist",), | ||||
|         ("d25652e4-d2d2-49f7-929a-671ccda580c6",), | ||||
|         ( | ||||
|             "arn:aws:kms:us-east-1:012345678912:key/d25652e4-d2d2-49f7-929a-671ccda580c6", | ||||
|         ), | ||||
|     ) | ||||
| ) | ||||
| @mock_kms | ||||
| def test_invalid_key_ids(key_id): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.generate_data_key(KeyId=key_id, NumberOfBytes=5) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized(PLAINTEXT_VECTORS) | ||||
| @mock_kms | ||||
| def test_kms_encrypt(plaintext): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="key") | ||||
|     response = client.encrypt(KeyId=key["KeyMetadata"]["KeyId"], Plaintext=plaintext) | ||||
| 
 | ||||
|     response = client.decrypt(CiphertextBlob=response["CiphertextBlob"]) | ||||
|     response["Plaintext"].should.equal(_get_encoded_value(plaintext)) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_disable_key(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="disable-key") | ||||
|     client.disable_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
| 
 | ||||
|     result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     assert result["KeyMetadata"]["Enabled"] == False | ||||
|     assert result["KeyMetadata"]["KeyState"] == "Disabled" | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_enable_key(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="enable-key") | ||||
|     client.disable_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     client.enable_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
| 
 | ||||
|     result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     assert result["KeyMetadata"]["Enabled"] == True | ||||
|     assert result["KeyMetadata"]["KeyState"] == "Enabled" | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_schedule_key_deletion(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="schedule-key-deletion") | ||||
|     if os.environ.get("TEST_SERVER_MODE", "false").lower() == "false": | ||||
|         with freeze_time("2015-01-01 12:00:00"): | ||||
|             response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|             assert response["KeyId"] == key["KeyMetadata"]["KeyId"] | ||||
|             assert response["DeletionDate"] == datetime( | ||||
|                 2015, 1, 31, 12, 0, tzinfo=tzutc() | ||||
|             ) | ||||
|     else: | ||||
|         # Can't manipulate time in server mode | ||||
|         response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|         assert response["KeyId"] == key["KeyMetadata"]["KeyId"] | ||||
| 
 | ||||
|     result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     assert result["KeyMetadata"]["Enabled"] == False | ||||
|     assert result["KeyMetadata"]["KeyState"] == "PendingDeletion" | ||||
|     assert "DeletionDate" in result["KeyMetadata"] | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_schedule_key_deletion_custom(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="schedule-key-deletion") | ||||
|     if os.environ.get("TEST_SERVER_MODE", "false").lower() == "false": | ||||
|         with freeze_time("2015-01-01 12:00:00"): | ||||
|             response = client.schedule_key_deletion( | ||||
|                 KeyId=key["KeyMetadata"]["KeyId"], PendingWindowInDays=7 | ||||
|             ) | ||||
|             assert response["KeyId"] == key["KeyMetadata"]["KeyId"] | ||||
|             assert response["DeletionDate"] == datetime( | ||||
|                 2015, 1, 8, 12, 0, tzinfo=tzutc() | ||||
|             ) | ||||
|     else: | ||||
|         # Can't manipulate time in server mode | ||||
|         response = client.schedule_key_deletion( | ||||
|             KeyId=key["KeyMetadata"]["KeyId"], PendingWindowInDays=7 | ||||
|         ) | ||||
|         assert response["KeyId"] == key["KeyMetadata"]["KeyId"] | ||||
| 
 | ||||
|     result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     assert result["KeyMetadata"]["Enabled"] == False | ||||
|     assert result["KeyMetadata"]["KeyState"] == "PendingDeletion" | ||||
|     assert "DeletionDate" in result["KeyMetadata"] | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_cancel_key_deletion(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="cancel-key-deletion") | ||||
|     client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     response = client.cancel_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     assert response["KeyId"] == key["KeyMetadata"]["KeyId"] | ||||
| 
 | ||||
|     result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
|     assert result["KeyMetadata"]["Enabled"] == False | ||||
|     assert result["KeyMetadata"]["KeyState"] == "Disabled" | ||||
|     assert "DeletionDate" not in result["KeyMetadata"] | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_update_key_description(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="old_description") | ||||
|     key_id = key["KeyMetadata"]["KeyId"] | ||||
| 
 | ||||
|     result = client.update_key_description(KeyId=key_id, Description="new_description") | ||||
|     assert "ResponseMetadata" in result | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_tag_resource(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="cancel-key-deletion") | ||||
|     response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
| 
 | ||||
|     keyid = response["KeyId"] | ||||
|     response = client.tag_resource( | ||||
|         KeyId=keyid, Tags=[{"TagKey": "string", "TagValue": "string"}] | ||||
|     ) | ||||
| 
 | ||||
|     # Shouldn't have any data, just header | ||||
|     assert len(response.keys()) == 1 | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_list_resource_tags(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="cancel-key-deletion") | ||||
|     response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"]) | ||||
| 
 | ||||
|     keyid = response["KeyId"] | ||||
|     response = client.tag_resource( | ||||
|         KeyId=keyid, Tags=[{"TagKey": "string", "TagValue": "string"}] | ||||
|     ) | ||||
| 
 | ||||
|     response = client.list_resource_tags(KeyId=keyid) | ||||
|     assert response["Tags"][0]["TagKey"] == "string" | ||||
|     assert response["Tags"][0]["TagValue"] == "string" | ||||
| 
 | ||||
| 
 | ||||
| @parameterized( | ||||
|     ( | ||||
|         (dict(KeySpec="AES_256"), 32), | ||||
|         (dict(KeySpec="AES_128"), 16), | ||||
|         (dict(NumberOfBytes=64), 64), | ||||
|         (dict(NumberOfBytes=1), 1), | ||||
|         (dict(NumberOfBytes=1024), 1024), | ||||
|     ) | ||||
| ) | ||||
| @mock_kms | ||||
| def test_generate_data_key_sizes(kwargs, expected_key_length): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="generate-data-key-size") | ||||
| 
 | ||||
|     response = client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], **kwargs) | ||||
| 
 | ||||
|     assert len(response["Plaintext"]) == expected_key_length | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_generate_data_key_decrypt(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="generate-data-key-decrypt") | ||||
| 
 | ||||
|     resp1 = client.generate_data_key( | ||||
|         KeyId=key["KeyMetadata"]["KeyId"], KeySpec="AES_256" | ||||
|     ) | ||||
|     resp2 = client.decrypt(CiphertextBlob=resp1["CiphertextBlob"]) | ||||
| 
 | ||||
|     assert resp1["Plaintext"] == resp2["Plaintext"] | ||||
| 
 | ||||
| 
 | ||||
| @parameterized( | ||||
|     ( | ||||
|         (dict(KeySpec="AES_257"),), | ||||
|         (dict(KeySpec="AES_128", NumberOfBytes=16),), | ||||
|         (dict(NumberOfBytes=2048),), | ||||
|         (dict(NumberOfBytes=0),), | ||||
|         (dict(),), | ||||
|     ) | ||||
| ) | ||||
| @mock_kms | ||||
| def test_generate_data_key_invalid_size_params(kwargs): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="generate-data-key-size") | ||||
| 
 | ||||
|     with assert_raises( | ||||
|         (botocore.exceptions.ClientError, botocore.exceptions.ParamValidationError) | ||||
|     ) as err: | ||||
|         client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized( | ||||
|     ( | ||||
|         ("alias/DoesNotExist",), | ||||
|         ("arn:aws:kms:us-east-1:012345678912:alias/DoesNotExist",), | ||||
|         ("d25652e4-d2d2-49f7-929a-671ccda580c6",), | ||||
|         ( | ||||
|             "arn:aws:kms:us-east-1:012345678912:key/d25652e4-d2d2-49f7-929a-671ccda580c6", | ||||
|         ), | ||||
|     ) | ||||
| ) | ||||
| @mock_kms | ||||
| def test_generate_data_key_invalid_key(key_id): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.generate_data_key(KeyId=key_id, KeySpec="AES_256") | ||||
| 
 | ||||
| 
 | ||||
| @parameterized( | ||||
|     ( | ||||
|         ("alias/DoesExist", False), | ||||
|         ("arn:aws:kms:us-east-1:012345678912:alias/DoesExist", False), | ||||
|         ("", True), | ||||
|         ("arn:aws:kms:us-east-1:012345678912:key/", True), | ||||
|     ) | ||||
| ) | ||||
| @mock_kms | ||||
| def test_generate_data_key_all_valid_key_ids(prefix, append_key_id): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key() | ||||
|     key_id = key["KeyMetadata"]["KeyId"] | ||||
|     client.create_alias(AliasName="alias/DoesExist", TargetKeyId=key_id) | ||||
| 
 | ||||
|     target_id = prefix | ||||
|     if append_key_id: | ||||
|         target_id += key_id | ||||
| 
 | ||||
|     client.generate_data_key(KeyId=key_id, NumberOfBytes=32) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_generate_data_key_without_plaintext_decrypt(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
|     key = client.create_key(Description="generate-data-key-decrypt") | ||||
| 
 | ||||
|     resp1 = client.generate_data_key_without_plaintext( | ||||
|         KeyId=key["KeyMetadata"]["KeyId"], KeySpec="AES_256" | ||||
|     ) | ||||
| 
 | ||||
|     assert "Plaintext" not in resp1 | ||||
| 
 | ||||
| 
 | ||||
| @parameterized(PLAINTEXT_VECTORS) | ||||
| @mock_kms | ||||
| def test_re_encrypt_decrypt(plaintext): | ||||
|     client = boto3.client("kms", region_name="us-west-2") | ||||
| 
 | ||||
|     key_1 = client.create_key(Description="key 1") | ||||
|     key_1_id = key_1["KeyMetadata"]["KeyId"] | ||||
|     key_1_arn = key_1["KeyMetadata"]["Arn"] | ||||
|     key_2 = client.create_key(Description="key 2") | ||||
|     key_2_id = key_2["KeyMetadata"]["KeyId"] | ||||
|     key_2_arn = key_2["KeyMetadata"]["Arn"] | ||||
| 
 | ||||
|     encrypt_response = client.encrypt( | ||||
|         KeyId=key_1_id, Plaintext=plaintext, EncryptionContext={"encryption": "context"} | ||||
|     ) | ||||
| 
 | ||||
|     re_encrypt_response = client.re_encrypt( | ||||
|         CiphertextBlob=encrypt_response["CiphertextBlob"], | ||||
|         SourceEncryptionContext={"encryption": "context"}, | ||||
|         DestinationKeyId=key_2_id, | ||||
|         DestinationEncryptionContext={"another": "context"}, | ||||
|     ) | ||||
| 
 | ||||
|     # CiphertextBlob must NOT be base64-encoded | ||||
|     with assert_raises(Exception): | ||||
|         base64.b64decode(re_encrypt_response["CiphertextBlob"], validate=True) | ||||
| 
 | ||||
|     re_encrypt_response["SourceKeyId"].should.equal(key_1_arn) | ||||
|     re_encrypt_response["KeyId"].should.equal(key_2_arn) | ||||
| 
 | ||||
|     decrypt_response_1 = client.decrypt( | ||||
|         CiphertextBlob=encrypt_response["CiphertextBlob"], | ||||
|         EncryptionContext={"encryption": "context"}, | ||||
|     ) | ||||
|     decrypt_response_1["Plaintext"].should.equal(_get_encoded_value(plaintext)) | ||||
|     decrypt_response_1["KeyId"].should.equal(key_1_arn) | ||||
| 
 | ||||
|     decrypt_response_2 = client.decrypt( | ||||
|         CiphertextBlob=re_encrypt_response["CiphertextBlob"], | ||||
|         EncryptionContext={"another": "context"}, | ||||
|     ) | ||||
|     decrypt_response_2["Plaintext"].should.equal(_get_encoded_value(plaintext)) | ||||
|     decrypt_response_2["KeyId"].should.equal(key_2_arn) | ||||
| 
 | ||||
|     decrypt_response_1["Plaintext"].should.equal(decrypt_response_2["Plaintext"]) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_re_encrypt_to_invalid_destination(): | ||||
|     client = boto3.client("kms", region_name="us-west-2") | ||||
| 
 | ||||
|     key = client.create_key(Description="key 1") | ||||
|     key_id = key["KeyMetadata"]["KeyId"] | ||||
| 
 | ||||
|     encrypt_response = client.encrypt(KeyId=key_id, Plaintext=b"some plaintext") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.re_encrypt( | ||||
|             CiphertextBlob=encrypt_response["CiphertextBlob"], | ||||
|             DestinationKeyId="alias/DoesNotExist", | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized(((12,), (44,), (91,), (1,), (1024,))) | ||||
| @mock_kms | ||||
| def test_generate_random(number_of_bytes): | ||||
|     client = boto3.client("kms", region_name="us-west-2") | ||||
| 
 | ||||
|     response = client.generate_random(NumberOfBytes=number_of_bytes) | ||||
| 
 | ||||
|     response["Plaintext"].should.be.a(bytes) | ||||
|     len(response["Plaintext"]).should.equal(number_of_bytes) | ||||
| 
 | ||||
| 
 | ||||
| @parameterized( | ||||
|     ( | ||||
|         (2048, botocore.exceptions.ClientError), | ||||
|         (1025, botocore.exceptions.ClientError), | ||||
|         (0, botocore.exceptions.ParamValidationError), | ||||
|         (-1, botocore.exceptions.ParamValidationError), | ||||
|         (-1024, botocore.exceptions.ParamValidationError), | ||||
|     ) | ||||
| ) | ||||
| @mock_kms | ||||
| def test_generate_random_invalid_number_of_bytes(number_of_bytes, error_type): | ||||
|     client = boto3.client("kms", region_name="us-west-2") | ||||
| 
 | ||||
|     with assert_raises(error_type): | ||||
|         client.generate_random(NumberOfBytes=number_of_bytes) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_enable_key_rotation_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.enable_key_rotation(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_disable_key_rotation_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.disable_key_rotation(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_enable_key_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.enable_key(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_disable_key_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.disable_key(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_cancel_key_deletion_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.cancel_key_deletion(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_schedule_key_deletion_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.schedule_key_deletion(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_get_key_rotation_status_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.get_key_rotation_status(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_get_key_policy_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.get_key_policy( | ||||
|             KeyId="12366f9b-1230-123d-123e-123e6ae60c02", PolicyName="default" | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_list_key_policies_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.list_key_policies(KeyId="12366f9b-1230-123d-123e-123e6ae60c02") | ||||
| 
 | ||||
| 
 | ||||
| @mock_kms | ||||
| def test_put_key_policy_key_not_found(): | ||||
|     client = boto3.client("kms", region_name="us-east-1") | ||||
| 
 | ||||
|     with assert_raises(client.exceptions.NotFoundException): | ||||
|         client.put_key_policy( | ||||
|             KeyId="00000000-0000-0000-0000-000000000000", | ||||
|             PolicyName="default", | ||||
|             Policy="new policy", | ||||
|         ) | ||||
| @ -102,7 +102,7 @@ def test_deserialize_ciphertext_blob(raw, serialized): | ||||
| @parameterized(((ec[0],) for ec in ENCRYPTION_CONTEXT_VECTORS)) | ||||
| def test_encrypt_decrypt_cycle(encryption_context): | ||||
|     plaintext = b"some secret plaintext" | ||||
|     master_key = Key("nop", "nop", "nop", [], "nop") | ||||
|     master_key = Key("nop", "nop", "nop", "nop", [], "nop") | ||||
|     master_key_map = {master_key.id: master_key} | ||||
| 
 | ||||
|     ciphertext_blob = encrypt( | ||||
| @ -133,7 +133,7 @@ def test_encrypt_unknown_key_id(): | ||||
| 
 | ||||
| 
 | ||||
| def test_decrypt_invalid_ciphertext_format(): | ||||
|     master_key = Key("nop", "nop", "nop", [], "nop") | ||||
|     master_key = Key("nop", "nop", "nop", "nop", [], "nop") | ||||
|     master_key_map = {master_key.id: master_key} | ||||
| 
 | ||||
|     with assert_raises(InvalidCiphertextException): | ||||
| @ -153,7 +153,7 @@ def test_decrypt_unknwown_key_id(): | ||||
| 
 | ||||
| 
 | ||||
| def test_decrypt_invalid_ciphertext(): | ||||
|     master_key = Key("nop", "nop", "nop", [], "nop") | ||||
|     master_key = Key("nop", "nop", "nop", "nop", [], "nop") | ||||
|     master_key_map = {master_key.id: master_key} | ||||
|     ciphertext_blob = ( | ||||
|         master_key.id.encode("utf-8") + b"123456789012" | ||||
| @ -171,7 +171,7 @@ def test_decrypt_invalid_ciphertext(): | ||||
| 
 | ||||
| def test_decrypt_invalid_encryption_context(): | ||||
|     plaintext = b"some secret plaintext" | ||||
|     master_key = Key("nop", "nop", "nop", [], "nop") | ||||
|     master_key = Key("nop", "nop", "nop", "nop", [], "nop") | ||||
|     master_key_map = {master_key.id: master_key} | ||||
| 
 | ||||
|     ciphertext_blob = encrypt( | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user