From f0b8fedd846a3913f36c47c7b4f5c9b24817c9c4 Mon Sep 17 00:00:00 2001 From: Akira Noda <61897166+tsugumi-sys@users.noreply.github.com> Date: Mon, 28 Aug 2023 22:12:01 +0900 Subject: [PATCH] KMS: Add signing by ECDSA Private key (#6737) --- moto/kms/models.py | 8 +- moto/kms/utils.py | 67 +++++++++++++++- tests/test_kms/test_kms_boto3.py | 127 +++++++++++++++++++++++++++++++ tests/test_kms/test_utils.py | 12 ++- 4 files changed, 207 insertions(+), 7 deletions(-) diff --git a/moto/kms/models.py b/moto/kms/models.py index 461ce5c15..db6c20ae1 100644 --- a/moto/kms/models.py +++ b/moto/kms/models.py @@ -167,12 +167,12 @@ class Key(CloudFormationModel): return None # type: ignore[return-value] elif self.key_spec in KeySpec.ecc_key_specs(): if self.key_spec == KeySpec.ECC_NIST_P384: - return [SigningAlgorithm.ECDSA_SHA_384] - elif self.key_spec == KeySpec.ECC_NIST_P512: - return [SigningAlgorithm.ECDSA_SHA_512] + return [SigningAlgorithm.ECDSA_SHA_384.value] + elif self.key_spec == KeySpec.ECC_NIST_P521: + return [SigningAlgorithm.ECDSA_SHA_512.value] else: # key_spec is 'ECC_NIST_P256' or 'ECC_SECG_P256K1' - return [SigningAlgorithm.ECDSA_SHA_256] + return [SigningAlgorithm.ECDSA_SHA_256.value] elif self.key_spec in KeySpec.rsa_key_specs(): return SigningAlgorithm.rsa_signing_algorithms() elif self.key_spec == KeySpec.SM2: diff --git a/moto/kms/utils.py b/moto/kms/utils.py index 46fff7faf..0e6b322cd 100644 --- a/moto/kms/utils.py +++ b/moto/kms/utils.py @@ -12,7 +12,7 @@ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding -from cryptography.hazmat.primitives.asymmetric import rsa, padding +from cryptography.hazmat.primitives.asymmetric import rsa, padding, ec from .exceptions import ( @@ -62,7 +62,7 @@ class KeySpec(str, Enum): ECC_NIST_P256 = "ECC_NIST_P256" ECC_SECG_P256K1 = "ECC_SECG_P256K1" ECC_NIST_P384 = "ECC_NIST_P384" - ECC_NIST_P512 = "ECC_NIST_P521" + ECC_NIST_P521 = "ECC_NIST_P521" SM2 = "SM2" # China Regions only # Symmetric key specs SYMMETRIC_DEFAULT = "SYMMETRIC_DEFAULT" @@ -165,6 +165,16 @@ def validate_signing_algorithm( ) +def validate_key_spec(target_key_spec: str, valid_key_specs: List[str]) -> None: + if target_key_spec not in valid_key_specs: + raise ValidationException( + ( + "1 validation error detected: Value at 'key_spec' failed " + "to satisfy constraint: Member must satisfy enum value set: {valid_key_specs}" + ).format(valid_key_specs=valid_key_specs) + ) + + class RSAPrivateKey(AbstractPrivateKey): # See https://docs.aws.amazon.com/kms/latest/cryptographic-details/crypto-primitives.html __supported_key_sizes = [2048, 3072, 4096] @@ -237,6 +247,57 @@ class RSAPrivateKey(AbstractPrivateKey): ) +class ECDSAPrivateKey(AbstractPrivateKey): + def __init__(self, key_spec: str): + validate_key_spec(key_spec, KeySpec.ecc_key_specs()) + + if key_spec == KeySpec.ECC_NIST_P256: + curve = ec.SECP256R1() # type: ec.EllipticCurve + valid_signing_algorithms = ["ECDSA_SHA_256"] # type: List[str] + elif key_spec == KeySpec.ECC_SECG_P256K1: + curve = ec.SECP256K1() + valid_signing_algorithms = ["ECDSA_SHA_256"] + elif key_spec == KeySpec.ECC_NIST_P384: + curve = ec.SECP384R1() + valid_signing_algorithms = ["ECDSA_SHA_384"] + else: + curve = ec.SECP521R1() + valid_signing_algorithms = ["ECDSA_SHA_512"] + + self.private_key = ec.generate_private_key(curve) + self.valid_signing_algorithms = valid_signing_algorithms + + def __hash_algorithm(self, signing_algorithm: str) -> hashes.HashAlgorithm: + if signing_algorithm == SigningAlgorithm.ECDSA_SHA_256: + algorithm = hashes.SHA256() # type: Any + elif signing_algorithm == SigningAlgorithm.ECDSA_SHA_384: + algorithm = hashes.SHA384() + else: + algorithm = hashes.SHA512() + return algorithm + + def sign(self, message: bytes, signing_algorithm: str) -> bytes: + validate_signing_algorithm(signing_algorithm, self.valid_signing_algorithms) + hash_algorithm = self.__hash_algorithm(signing_algorithm) + return self.private_key.sign(message, ec.ECDSA(hash_algorithm)) + + def verify(self, message: bytes, signature: bytes, signing_algorithm: str) -> bool: + validate_signing_algorithm(signing_algorithm, self.valid_signing_algorithms) + hash_algorithm = self.__hash_algorithm(signing_algorithm) + public_key = self.private_key.public_key() + try: + public_key.verify(signature, message, ec.ECDSA(hash_algorithm)) + return True + except InvalidSignature: + return False + + def public_key(self) -> bytes: + return self.private_key.public_key().public_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + + def generate_private_key(key_spec: str) -> AbstractPrivateKey: """Generate a private key to be used on asymmetric sign/verify.""" if key_spec == KeySpec.RSA_2048: @@ -245,6 +306,8 @@ def generate_private_key(key_spec: str) -> AbstractPrivateKey: return RSAPrivateKey(key_size=3072) elif key_spec == KeySpec.RSA_4096: return RSAPrivateKey(key_size=4096) + elif key_spec in KeySpec.ecc_key_specs(): + return ECDSAPrivateKey(key_spec) else: return RSAPrivateKey(key_size=2048) diff --git a/tests/test_kms/test_kms_boto3.py b/tests/test_kms/test_kms_boto3.py index c0f9682a3..8ec794cbe 100644 --- a/tests/test_kms/test_kms_boto3.py +++ b/tests/test_kms/test_kms_boto3.py @@ -1252,6 +1252,133 @@ def test_fail_verify_digest_message_type_RSA( assert verify_response["SignatureValid"] is False +@mock_kms +@pytest.mark.parametrize( + "key_spec, signing_algorithm", + [ + ("ECC_NIST_P256", "ECDSA_SHA_256"), + ("ECC_SECG_P256K1", "ECDSA_SHA_256"), + ("ECC_NIST_P384", "ECDSA_SHA_384"), + ("ECC_NIST_P521", "ECDSA_SHA_512"), + ], +) +def test_sign_and_verify_digest_message_type_ECDSA(key_spec, signing_algorithm): + client = boto3.client("kms", region_name="us-west-2") + + key = client.create_key( + Description="sign-key", KeyUsage="SIGN_VERIFY", KeySpec=key_spec + ) + key_id = key["KeyMetadata"]["KeyId"] + + digest = hashes.Hash(hashes.SHA256()) + digest.update(b"this works") + digest.update(b"as well") + message = digest.finalize() + + sign_response = client.sign( + KeyId=key_id, + Message=message, + SigningAlgorithm=signing_algorithm, + MessageType="DIGEST", + ) + + verify_response = client.verify( + KeyId=key_id, + Message=message, + Signature=sign_response["Signature"], + SigningAlgorithm=signing_algorithm, + ) + + assert verify_response["SignatureValid"] is True + + +@mock_kms +@pytest.mark.parametrize( + "key_spec, signing_algorithm, valid_signing_algorithms", + [ + ("ECC_NIST_P256", "ECDSA_SHA_384", ["ECDSA_SHA_256"]), + ("ECC_SECG_P256K1", "ECDSA_SHA_512", ["ECDSA_SHA_256"]), + ("ECC_NIST_P384", "ECDSA_SHA_256", ["ECDSA_SHA_384"]), + ("ECC_NIST_P521", "ECDSA_SHA_384", ["ECDSA_SHA_512"]), + ], +) +def test_invalid_signing_algorithm_for_key_spec_type_ECDSA( + key_spec, signing_algorithm, valid_signing_algorithms +): + client = boto3.client("kms", region_name="us-west-2") + + key = client.create_key( + Description="sign-key", KeyUsage="SIGN_VERIFY", KeySpec=key_spec + ) + key_id = key["KeyMetadata"]["KeyId"] + + digest = hashes.Hash(hashes.SHA256()) + digest.update(b"this works") + digest.update(b"as well") + message = digest.finalize() + + with pytest.raises(ClientError) as ex: + _ = client.sign( + KeyId=key_id, + Message=message, + SigningAlgorithm=signing_algorithm, + MessageType="DIGEST", + ) + err = ex.value.response["Error"] + assert err["Code"] == "ValidationException" + assert err["Message"] == ( + "1 validation error detected: Value '{signing_algorithm}' at 'SigningAlgorithm' failed " + "to satisfy constraint: Member must satisfy enum value set: " + "{valid_sign_algorithms}" + ).format( + signing_algorithm=signing_algorithm, + valid_sign_algorithms=valid_signing_algorithms, + ) + + +@mock_kms +@pytest.mark.parametrize( + "key_spec, signing_algorithm", + [ + ("ECC_NIST_P256", "ECDSA_SHA_256"), + ("ECC_SECG_P256K1", "ECDSA_SHA_256"), + ("ECC_NIST_P384", "ECDSA_SHA_384"), + ("ECC_NIST_P521", "ECDSA_SHA_512"), + ], +) +def test_fail_verify_digest_message_type_ECDSA(key_spec, signing_algorithm): + client = boto3.client("kms", region_name="us-west-2") + + key = client.create_key( + Description="sign-key", KeyUsage="SIGN_VERIFY", KeySpec=key_spec + ) + key_id = key["KeyMetadata"]["KeyId"] + + digest = hashes.Hash(hashes.SHA256()) + digest.update(b"this works") + digest.update(b"as well") + falsified_digest = digest.copy() + message = digest.finalize() + falsified_digest.update(b"This sentence has been falsified") + falsified_message = falsified_digest.finalize() + + sign_response = client.sign( + KeyId=key_id, + Message=message, + SigningAlgorithm=signing_algorithm, + MessageType="DIGEST", + ) + + verify_response = client.verify( + KeyId=key_id, + Message=falsified_message, + Signature=sign_response["Signature"], + SigningAlgorithm=signing_algorithm, + ) + + assert verify_response["SignatureValid"] is False + + @mock_kms def test_sign_invalid_key_usage(): client = boto3.client("kms", region_name="us-west-2") diff --git a/tests/test_kms/test_utils.py b/tests/test_kms/test_utils.py index 039b7ab36..03e8b8d4e 100644 --- a/tests/test_kms/test_utils.py +++ b/tests/test_kms/test_utils.py @@ -20,6 +20,7 @@ from moto.kms.utils import ( KeySpec, SigningAlgorithm, RSAPrivateKey, + ECDSAPrivateKey, ) ENCRYPTION_CONTEXT_VECTORS = [ @@ -69,7 +70,7 @@ def test_KeySpec_Enum(): KeySpec.ECC_NIST_P256, KeySpec.ECC_SECG_P256K1, KeySpec.ECC_NIST_P384, - KeySpec.ECC_NIST_P512, + KeySpec.ECC_NIST_P521, ] ) assert KeySpec.hmac_key_specs() == sorted( @@ -106,6 +107,15 @@ def test_RSAPrivateKey_invalid_key_size(): ) +def test_ECDSAPrivateKey_invalid_key_spec(): + with pytest.raises(ValidationException) as ex: + _ = ECDSAPrivateKey(key_spec="InvalidKeySpec") + assert ( + ex.value.message + == "1 validation error detected: Value at 'key_spec' failed to satisfy constraint: Member must satisfy enum value set: ['ECC_NIST_P256', 'ECC_NIST_P384', 'ECC_NIST_P521', 'ECC_SECG_P256K1']" + ) + + def test_generate_data_key(): test = generate_data_key(123)