diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index 29ab81ad2..7333b46a0 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -3572,14 +3572,14 @@ - [X] retire_grant - [X] revoke_grant - [X] schedule_key_deletion -- [ ] sign +- [X] sign - [X] tag_resource - [X] untag_resource - [ ] update_alias - [ ] update_custom_key_store - [X] update_key_description - [ ] update_primary_region -- [ ] verify +- [X] verify - [ ] verify_mac diff --git a/moto/dynamodb/models/__init__.py b/moto/dynamodb/models/__init__.py index 69c6bd36e..de9c51cea 100644 --- a/moto/dynamodb/models/__init__.py +++ b/moto/dynamodb/models/__init__.py @@ -482,7 +482,7 @@ class Table(CloudFormationModel): key = kms.create_key( policy="", key_usage="ENCRYPT_DECRYPT", - customer_master_key_spec="SYMMETRIC_DEFAULT", + key_spec="SYMMETRIC_DEFAULT", description="Default master key that protects my DynamoDB table storage", tags=None, region=region, diff --git a/moto/ec2/models/elastic_block_store.py b/moto/ec2/models/elastic_block_store.py index 48498ef23..cc471fc21 100644 --- a/moto/ec2/models/elastic_block_store.py +++ b/moto/ec2/models/elastic_block_store.py @@ -410,7 +410,7 @@ class EBSBackend: key = kms.create_key( policy="", key_usage="ENCRYPT_DECRYPT", - customer_master_key_spec="SYMMETRIC_DEFAULT", + key_spec="SYMMETRIC_DEFAULT", description="Default master key that protects my EBS volumes when no other key is defined", tags=None, region=self.region_name, diff --git a/moto/kms/models.py b/moto/kms/models.py index 470b25f7a..22059ad7f 100644 --- a/moto/kms/models.py +++ b/moto/kms/models.py @@ -2,6 +2,11 @@ import json import os from collections import defaultdict from datetime import datetime, timedelta +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives import hashes + +from cryptography.hazmat.primitives.asymmetric import padding +from moto.apigateway.exceptions import ValidationException from moto.core import get_account_id, BaseBackend, BaseModel, CloudFormationModel from moto.core.utils import get_random_hex, unix_time, BackendDict @@ -14,6 +19,7 @@ from .utils import ( encrypt, generate_key_id, generate_master_key, + generate_private_key, ) @@ -49,9 +55,7 @@ class Grant(BaseModel): class Key(CloudFormationModel): - def __init__( - self, policy, key_usage, customer_master_key_spec, description, region - ): + def __init__(self, policy, key_usage, key_spec, description, region): self.id = generate_key_id() self.creation_date = unix_time() self.policy = policy or self.generate_default_policy() @@ -64,9 +68,10 @@ class Key(CloudFormationModel): self.key_rotation_status = False self.deletion_date = None self.key_material = generate_master_key() + self.private_key = generate_private_key() self.origin = "AWS_KMS" self.key_manager = "CUSTOMER" - self.customer_master_key_spec = customer_master_key_spec or "SYMMETRIC_DEFAULT" + self.key_spec = key_spec or "SYMMETRIC_DEFAULT" self.grants = dict() @@ -139,7 +144,7 @@ class Key(CloudFormationModel): def encryption_algorithms(self): if self.key_usage == "SIGN_VERIFY": return None - elif self.customer_master_key_spec == "SYMMETRIC_DEFAULT": + elif self.key_spec == "SYMMETRIC_DEFAULT": return ["SYMMETRIC_DEFAULT"] else: return ["RSAES_OAEP_SHA_1", "RSAES_OAEP_SHA_256"] @@ -148,11 +153,11 @@ class Key(CloudFormationModel): def signing_algorithms(self): if self.key_usage == "ENCRYPT_DECRYPT": return None - elif self.customer_master_key_spec in ["ECC_NIST_P256", "ECC_SECG_P256K1"]: + elif self.key_spec in ["ECC_NIST_P256", "ECC_SECG_P256K1"]: return ["ECDSA_SHA_256"] - elif self.customer_master_key_spec == "ECC_NIST_P384": + elif self.key_spec == "ECC_NIST_P384": return ["ECDSA_SHA_384"] - elif self.customer_master_key_spec == "ECC_NIST_P521": + elif self.key_spec == "ECC_NIST_P521": return ["ECDSA_SHA_512"] else: return [ @@ -170,7 +175,8 @@ class Key(CloudFormationModel): "AWSAccountId": self.account_id, "Arn": self.arn, "CreationDate": self.creation_date, - "CustomerMasterKeySpec": self.customer_master_key_spec, + "CustomerMasterKeySpec": self.key_spec, + "KeySpec": self.key_spec, "Description": self.description, "Enabled": self.enabled, "EncryptionAlgorithms": self.encryption_algorithms, @@ -208,7 +214,7 @@ class Key(CloudFormationModel): key = kms_backend.create_key( policy=properties["KeyPolicy"], key_usage="ENCRYPT_DECRYPT", - customer_master_key_spec="SYMMETRIC_DEFAULT", + key_spec="SYMMETRIC_DEFAULT", description=properties["Description"], tags=properties.get("Tags", []), region=region_name, @@ -258,10 +264,8 @@ class KmsBackend(BaseBackend): self.add_alias(key.id, alias_name) return key.id - def create_key( - self, policy, key_usage, customer_master_key_spec, description, tags, region - ): - key = Key(policy, key_usage, customer_master_key_spec, description, region) + def create_key(self, policy, key_usage, key_spec, description, tags, region): + key = Key(policy, key_usage, key_spec, description, region) self.keys[key.id] = key if tags is not None and len(tags) > 0: self.tag_resource(key.id, tags) @@ -516,5 +520,93 @@ class KmsBackend(BaseBackend): key = self.describe_key(key_id) key.retire_grant(grant_id) + def __ensure_valid_sign_and_verify_key(self, key: Key): + if key.key_usage != "SIGN_VERIFY": + raise ValidationException( + ( + "1 validation error detected: Value '{key_id}' at 'KeyId' failed " + "to satisfy constraint: Member must point to a key with usage: 'SIGN_VERIFY'" + ).format(key_id=key.id) + ) + + def __ensure_valid_signing_augorithm(self, key: Key, signing_algorithm): + if signing_algorithm not in key.signing_algorithms: + raise ValidationException( + ( + "1 validation error detected: Value '{signing_algorithm}' at 'SigningAlgorithm' failed " + "to satisfy constraint: Member must satisfy enum value set: " + "{valid_sign_algorithms}" + ).format( + signing_algorithm=signing_algorithm, + valid_sign_algorithms=key.signing_algorithms, + ) + ) + + def sign(self, key_id, message, signing_algorithm): + """Sign message using generated private key. + + NOTES: + - signing_algorithm is ignored and hardcoded to RSASSA_PSS_SHA_256 + - message_type DIGEST is not implemented + - grant_tokens are not implemented + """ + key = self.describe_key(key_id) + + self.__ensure_valid_sign_and_verify_key(key) + self.__ensure_valid_signing_augorithm(key, signing_algorithm) + + # TODO: support more than one hardcoded algorithm based on KeySpec + signature = key.private_key.sign( + message, + padding.PSS( + mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH + ), + hashes.SHA256(), + ) + + return key.arn, signature, signing_algorithm + + def verify(self, key_id, message, signature, signing_algorithm): + """Verify message using public key from generated private key. + + NOTES: + - signing_algorithm is ignored and hardcoded to RSASSA_PSS_SHA_256 + - message_type DIGEST is not implemented + - grant_tokens are not implemented + """ + key = self.describe_key(key_id) + + self.__ensure_valid_sign_and_verify_key(key) + self.__ensure_valid_signing_augorithm(key, signing_algorithm) + + if signing_algorithm not in key.signing_algorithms: + raise ValidationException( + ( + "1 validation error detected: Value '{signing_algorithm}' at 'SigningAlgorithm' failed " + "to satisfy constraint: Member must satisfy enum value set: " + "{valid_sign_algorithms}" + ).format( + signing_algorithm=signing_algorithm, + valid_sign_algorithms=key.signing_algorithms, + ) + ) + + public_key = key.private_key.public_key() + + try: + # TODO: support more than one hardcoded algorithm based on KeySpec + public_key.verify( + signature, + message, + padding.PSS( + mgf=padding.MGF1(hashes.SHA256()), + salt_length=padding.PSS.MAX_LENGTH, + ), + hashes.SHA256(), + ) + return key.arn, True, signing_algorithm + except InvalidSignature: + return key.arn, False, signing_algorithm + kms_backends = BackendDict(KmsBackend, "kms") diff --git a/moto/kms/responses.py b/moto/kms/responses.py index 97bd26e6c..ff0a43f0a 100644 --- a/moto/kms/responses.py +++ b/moto/kms/responses.py @@ -2,6 +2,7 @@ import base64 import json import os import re +import warnings from moto.core import get_account_id from moto.core.responses import BaseResponse @@ -108,12 +109,14 @@ class KmsResponse(BaseResponse): """https://docs.aws.amazon.com/kms/latest/APIReference/API_CreateKey.html""" policy = self.parameters.get("Policy") key_usage = self.parameters.get("KeyUsage") - customer_master_key_spec = self.parameters.get("CustomerMasterKeySpec") + key_spec = self.parameters.get("KeySpec") or self.parameters.get( + "CustomerMasterKeySpec" + ) description = self.parameters.get("Description") tags = self.parameters.get("Tags") key = self.kms_backend.create_key( - policy, key_usage, customer_master_key_spec, description, tags, self.region + policy, key_usage, key_spec, description, tags, self.region ) return json.dumps(key.to_dict()) @@ -599,6 +602,119 @@ class KmsResponse(BaseResponse): return json.dumps({"Plaintext": response_entropy}) + def sign(self): + """https://docs.aws.amazon.com/kms/latest/APIReference/API_Sign.html""" + key_id = self.parameters.get("KeyId") + message = self.parameters.get("Message") + message_type = self.parameters.get("MessageType") + grant_tokens = self.parameters.get("GrantTokens") + signing_algorithm = self.parameters.get("SigningAlgorithm") + + self._validate_key_id(key_id) + + if grant_tokens: + warnings.warn( + "The GrantTokens-parameter is not yet implemented for client.sign()" + ) + + if message_type == "DIGEST": + warnings.warn( + "The MessageType-parameter DIGEST is not yet implemented for client.sign()" + ) + + if signing_algorithm != "RSASSA_PSS_SHA_256": + warnings.warn( + "The SigningAlgorithm-parameter is ignored hardcoded to RSASSA_PSS_SHA_256 for client.sign()" + ) + + if isinstance(message, str): + message = message.encode("utf-8") + + if message == b"": + raise ValidationException( + "1 validation error detected: Value at 'Message' failed to satisfy constraint: Member must have length greater than or equal to 1" + ) + + if not message_type: + message_type = "RAW" + + key_id, signature, signing_algorithm = self.kms_backend.sign( + key_id=key_id, + message=message, + signing_algorithm=signing_algorithm, + ) + + signature_blob_response = base64.b64encode(signature).decode("utf-8") + + return json.dumps( + { + "KeyId": key_id, + "Signature": signature_blob_response, + "SigningAlgorithm": signing_algorithm, + } + ) + + def verify(self): + """https://docs.aws.amazon.com/kms/latest/APIReference/API_Verify.html""" + key_id = self.parameters.get("KeyId") + message = self.parameters.get("Message") + message_type = self.parameters.get("MessageType") + signature = self.parameters.get("Signature") + signing_algorithm = self.parameters.get("SigningAlgorithm") + grant_tokens = self.parameters.get("GrantTokens") + + self._validate_key_id(key_id) + + if grant_tokens: + warnings.warn( + "The GrantTokens-parameter is not yet implemented for client.verify()" + ) + + if message_type == "DIGEST": + warnings.warn( + "The MessageType-parameter DIGEST is not yet implemented for client.verify()" + ) + + if signing_algorithm != "RSASSA_PSS_SHA_256": + warnings.warn( + "The SigningAlgorithm-parameter is ignored hardcoded to RSASSA_PSS_SHA_256 for client.verify()" + ) + + if not message_type: + message_type = "RAW" + + if isinstance(message, str): + message = message.encode("utf-8") + + if message == b"": + raise ValidationException( + "1 validation error detected: Value at 'Message' failed to satisfy constraint: Member must have length greater than or equal to 1" + ) + + if isinstance(signature, str): + # we return base64 signatures, when signing + signature = base64.b64decode(signature.encode("utf-8")) + + if signature == b"": + raise ValidationException( + "1 validation error detected: Value at 'Signature' failed to satisfy constraint: Member must have length greater than or equal to 1" + ) + + key_arn, signature_valid, signing_algorithm = self.kms_backend.verify( + key_id=key_id, + message=message, + signature=signature, + signing_algorithm=signing_algorithm, + ) + + return json.dumps( + { + "KeyId": key_arn, + "SignatureValid": signature_valid, + "SigningAlgorithm": signing_algorithm, + } + ) + def _assert_default_policy(policy_name): if policy_name != "default": diff --git a/moto/kms/utils.py b/moto/kms/utils.py index 61b568388..651e0abd7 100644 --- a/moto/kms/utils.py +++ b/moto/kms/utils.py @@ -6,6 +6,7 @@ import uuid from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes +from cryptography.hazmat.primitives.asymmetric import rsa from .exceptions import ( InvalidCiphertextException, @@ -58,6 +59,18 @@ def generate_master_key(): return generate_data_key(MASTER_KEY_LEN) +def generate_private_key(): + """Generate a private key to be used on asymmetric sign/verify. + + NOTE: KeySpec is not taken into consideration and the key is always RSA_2048 + this could be improved to support multiple key types + """ + return rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + + def _serialize_ciphertext_blob(ciphertext): """Serialize Ciphertext object into a ciphertext blob. diff --git a/tests/test_kms/test_kms_boto3.py b/tests/test_kms/test_kms_boto3.py index 57fd9d4ed..f9e45aaf6 100644 --- a/tests/test_kms/test_kms_boto3.py +++ b/tests/test_kms/test_kms_boto3.py @@ -71,6 +71,7 @@ def test_create_key(): key["KeyMetadata"]["AWSAccountId"].should.equal(ACCOUNT_ID) key["KeyMetadata"]["CreationDate"].should.be.a(datetime) key["KeyMetadata"]["CustomerMasterKeySpec"].should.equal("SYMMETRIC_DEFAULT") + key["KeyMetadata"]["KeySpec"].should.equal("SYMMETRIC_DEFAULT") key["KeyMetadata"]["Description"].should.equal("my key") key["KeyMetadata"]["Enabled"].should.equal(True) key["KeyMetadata"]["EncryptionAlgorithms"].should.equal(["SYMMETRIC_DEFAULT"]) @@ -81,14 +82,14 @@ def test_create_key(): key["KeyMetadata"]["Origin"].should.equal("AWS_KMS") key["KeyMetadata"].should_not.have.key("SigningAlgorithms") - key = conn.create_key(KeyUsage="ENCRYPT_DECRYPT", CustomerMasterKeySpec="RSA_2048") + key = conn.create_key(KeyUsage="ENCRYPT_DECRYPT", KeySpec="RSA_2048") sorted(key["KeyMetadata"]["EncryptionAlgorithms"]).should.equal( ["RSAES_OAEP_SHA_1", "RSAES_OAEP_SHA_256"] ) key["KeyMetadata"].should_not.have.key("SigningAlgorithms") - key = conn.create_key(KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="RSA_2048") + key = conn.create_key(KeyUsage="SIGN_VERIFY", KeySpec="RSA_2048") key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms") sorted(key["KeyMetadata"]["SigningAlgorithms"]).should.equal( @@ -102,23 +103,33 @@ def test_create_key(): ] ) - key = conn.create_key( - KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_SECG_P256K1" - ) + key = conn.create_key(KeyUsage="SIGN_VERIFY", KeySpec="ECC_SECG_P256K1") key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms") key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_256"]) - key = conn.create_key(KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_NIST_P384") + key = conn.create_key(KeyUsage="SIGN_VERIFY", KeySpec="ECC_NIST_P384") key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms") key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_384"]) + key = conn.create_key(KeyUsage="SIGN_VERIFY", KeySpec="ECC_NIST_P521") + + key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms") + key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_512"]) + + +@mock_kms +def test_create_key_deprecated_master_custom_key_spec(): + conn = boto3.client("kms", region_name="us-east-1") key = conn.create_key(KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_NIST_P521") key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms") key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_512"]) + key["KeyMetadata"]["CustomerMasterKeySpec"].should.equal("ECC_NIST_P521") + key["KeyMetadata"]["KeySpec"].should.equal("ECC_NIST_P521") + @pytest.mark.parametrize("id_or_arn", ["KeyId", "Arn"]) @mock_kms @@ -132,6 +143,7 @@ def test_describe_key(id_or_arn): response["KeyMetadata"]["AWSAccountId"].should.equal("123456789012") response["KeyMetadata"]["CreationDate"].should.be.a(datetime) response["KeyMetadata"]["CustomerMasterKeySpec"].should.equal("SYMMETRIC_DEFAULT") + response["KeyMetadata"]["KeySpec"].should.equal("SYMMETRIC_DEFAULT") response["KeyMetadata"]["Description"].should.equal("my key") response["KeyMetadata"]["Enabled"].should.equal(True) response["KeyMetadata"]["EncryptionAlgorithms"].should.equal(["SYMMETRIC_DEFAULT"]) @@ -1109,3 +1121,107 @@ def test_key_tag_added_arn_based_happy(): ] client.tag_resource(KeyId=key_id, Tags=tags) _check_tags(key_id, tags, client) + + +@pytest.mark.parametrize("plaintext", PLAINTEXT_VECTORS) +@mock_kms +def test_sign_happy(plaintext): + client = boto3.client("kms", region_name="us-west-2") + + key = client.create_key(Description="sign-key", KeyUsage="SIGN_VERIFY") + key_id = key["KeyMetadata"]["KeyId"] + key_arn = key["KeyMetadata"]["Arn"] + signing_algorithm = "RSASSA_PSS_SHA_256" + + sign_response = client.sign( + KeyId=key_id, Message=plaintext, SigningAlgorithm=signing_algorithm + ) + + sign_response["Signature"].should_not.equal(plaintext) + sign_response["SigningAlgorithm"].should.equal(signing_algorithm) + sign_response["KeyId"].should.equal(key_arn) + + +@mock_kms +def test_sign_invalid_signing_algorithm(): + client = boto3.client("kms", region_name="us-west-2") + + key = client.create_key(Description="sign-key", KeyUsage="SIGN_VERIFY") + key_id = key["KeyMetadata"]["KeyId"] + + message = "My message" + signing_algorithm = "INVALID" + + with pytest.raises(ClientError) as ex: + client.sign(KeyId=key_id, Message=message, SigningAlgorithm=signing_algorithm) + err = ex.value.response["Error"] + err["Code"].should.equal("ValidationException") + err["Message"].should.equal( + "1 validation error detected: Value 'INVALID' at 'SigningAlgorithm' failed to satisfy constraint: Member must satisfy enum value set: ['RSASSA_PKCS1_V1_5_SHA_256', 'RSASSA_PKCS1_V1_5_SHA_384', 'RSASSA_PKCS1_V1_5_SHA_512', 'RSASSA_PSS_SHA_256', 'RSASSA_PSS_SHA_384', 'RSASSA_PSS_SHA_512']" + ) + + +@mock_kms +def test_sign_invalid_key_usage(): + client = boto3.client("kms", region_name="us-west-2") + + key = client.create_key(Description="sign-key", KeyUsage="ENCRYPT_DECRYPT") + key_id = key["KeyMetadata"]["KeyId"] + + message = "My message" + signing_algorithm = "RSASSA_PSS_SHA_256" + + with pytest.raises(ClientError) as ex: + client.sign(KeyId=key_id, Message=message, SigningAlgorithm=signing_algorithm) + err = ex.value.response["Error"] + err["Code"].should.equal("ValidationException") + err["Message"].should.equal( + f"1 validation error detected: Value '{key_id}' at 'KeyId' failed to satisfy constraint: Member must point to a key with usage: 'SIGN_VERIFY'" + ) + + +@mock_kms +def test_sign_invalid_message(): + client = boto3.client("kms", region_name="us-west-2") + + key = client.create_key(Description="sign-key", KeyUsage="SIGN_VERIFY") + key_id = key["KeyMetadata"]["KeyId"] + + message = "" + signing_algorithm = "RSASSA_PSS_SHA_256" + + with pytest.raises(ClientError) as ex: + client.sign(KeyId=key_id, Message=message, SigningAlgorithm=signing_algorithm) + err = ex.value.response["Error"] + err["Code"].should.equal("ValidationException") + err["Message"].should.equal( + "1 validation error detected: Value at 'Message' failed to satisfy constraint: Member must have length greater than or equal to 1" + ) + + +@pytest.mark.parametrize("plaintext", PLAINTEXT_VECTORS) +@mock_kms +def test_verify_happy(plaintext): + client = boto3.client("kms", region_name="us-west-2") + + key = client.create_key(Description="sign-key", KeyUsage="SIGN_VERIFY") + key_id = key["KeyMetadata"]["KeyId"] + key_arn = key["KeyMetadata"]["Arn"] + signing_algorithm = "RSASSA_PSS_SHA_256" + + sign_response = client.sign( + KeyId=key_id, Message=plaintext, SigningAlgorithm=signing_algorithm + ) + + signature = sign_response["Signature"] + + verify_response = client.verify( + KeyId=key_id, + Message=plaintext, + Signature=signature, + SigningAlgorithm=signing_algorithm, + ) + + verify_response["SigningAlgorithm"].should.equal(signing_algorithm) + verify_response["KeyId"].should.equal(key_arn) + verify_response["SignatureValid"].should.equal(True)