KMS: Add signing by ECDSA Private key (#6737)
This commit is contained in:
parent
7cf432b412
commit
f0b8fedd84
@ -167,12 +167,12 @@ class Key(CloudFormationModel):
|
||||
return None # type: ignore[return-value]
|
||||
elif self.key_spec in KeySpec.ecc_key_specs():
|
||||
if self.key_spec == KeySpec.ECC_NIST_P384:
|
||||
return [SigningAlgorithm.ECDSA_SHA_384]
|
||||
elif self.key_spec == KeySpec.ECC_NIST_P512:
|
||||
return [SigningAlgorithm.ECDSA_SHA_512]
|
||||
return [SigningAlgorithm.ECDSA_SHA_384.value]
|
||||
elif self.key_spec == KeySpec.ECC_NIST_P521:
|
||||
return [SigningAlgorithm.ECDSA_SHA_512.value]
|
||||
else:
|
||||
# key_spec is 'ECC_NIST_P256' or 'ECC_SECG_P256K1'
|
||||
return [SigningAlgorithm.ECDSA_SHA_256]
|
||||
return [SigningAlgorithm.ECDSA_SHA_256.value]
|
||||
elif self.key_spec in KeySpec.rsa_key_specs():
|
||||
return SigningAlgorithm.rsa_signing_algorithms()
|
||||
elif self.key_spec == KeySpec.SM2:
|
||||
|
@ -12,7 +12,7 @@ from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa, padding, ec
|
||||
|
||||
|
||||
from .exceptions import (
|
||||
@ -62,7 +62,7 @@ class KeySpec(str, Enum):
|
||||
ECC_NIST_P256 = "ECC_NIST_P256"
|
||||
ECC_SECG_P256K1 = "ECC_SECG_P256K1"
|
||||
ECC_NIST_P384 = "ECC_NIST_P384"
|
||||
ECC_NIST_P512 = "ECC_NIST_P521"
|
||||
ECC_NIST_P521 = "ECC_NIST_P521"
|
||||
SM2 = "SM2" # China Regions only
|
||||
# Symmetric key specs
|
||||
SYMMETRIC_DEFAULT = "SYMMETRIC_DEFAULT"
|
||||
@ -165,6 +165,16 @@ def validate_signing_algorithm(
|
||||
)
|
||||
|
||||
|
||||
def validate_key_spec(target_key_spec: str, valid_key_specs: List[str]) -> None:
|
||||
if target_key_spec not in valid_key_specs:
|
||||
raise ValidationException(
|
||||
(
|
||||
"1 validation error detected: Value at 'key_spec' failed "
|
||||
"to satisfy constraint: Member must satisfy enum value set: {valid_key_specs}"
|
||||
).format(valid_key_specs=valid_key_specs)
|
||||
)
|
||||
|
||||
|
||||
class RSAPrivateKey(AbstractPrivateKey):
|
||||
# See https://docs.aws.amazon.com/kms/latest/cryptographic-details/crypto-primitives.html
|
||||
__supported_key_sizes = [2048, 3072, 4096]
|
||||
@ -237,6 +247,57 @@ class RSAPrivateKey(AbstractPrivateKey):
|
||||
)
|
||||
|
||||
|
||||
class ECDSAPrivateKey(AbstractPrivateKey):
|
||||
def __init__(self, key_spec: str):
|
||||
validate_key_spec(key_spec, KeySpec.ecc_key_specs())
|
||||
|
||||
if key_spec == KeySpec.ECC_NIST_P256:
|
||||
curve = ec.SECP256R1() # type: ec.EllipticCurve
|
||||
valid_signing_algorithms = ["ECDSA_SHA_256"] # type: List[str]
|
||||
elif key_spec == KeySpec.ECC_SECG_P256K1:
|
||||
curve = ec.SECP256K1()
|
||||
valid_signing_algorithms = ["ECDSA_SHA_256"]
|
||||
elif key_spec == KeySpec.ECC_NIST_P384:
|
||||
curve = ec.SECP384R1()
|
||||
valid_signing_algorithms = ["ECDSA_SHA_384"]
|
||||
else:
|
||||
curve = ec.SECP521R1()
|
||||
valid_signing_algorithms = ["ECDSA_SHA_512"]
|
||||
|
||||
self.private_key = ec.generate_private_key(curve)
|
||||
self.valid_signing_algorithms = valid_signing_algorithms
|
||||
|
||||
def __hash_algorithm(self, signing_algorithm: str) -> hashes.HashAlgorithm:
|
||||
if signing_algorithm == SigningAlgorithm.ECDSA_SHA_256:
|
||||
algorithm = hashes.SHA256() # type: Any
|
||||
elif signing_algorithm == SigningAlgorithm.ECDSA_SHA_384:
|
||||
algorithm = hashes.SHA384()
|
||||
else:
|
||||
algorithm = hashes.SHA512()
|
||||
return algorithm
|
||||
|
||||
def sign(self, message: bytes, signing_algorithm: str) -> bytes:
|
||||
validate_signing_algorithm(signing_algorithm, self.valid_signing_algorithms)
|
||||
hash_algorithm = self.__hash_algorithm(signing_algorithm)
|
||||
return self.private_key.sign(message, ec.ECDSA(hash_algorithm))
|
||||
|
||||
def verify(self, message: bytes, signature: bytes, signing_algorithm: str) -> bool:
|
||||
validate_signing_algorithm(signing_algorithm, self.valid_signing_algorithms)
|
||||
hash_algorithm = self.__hash_algorithm(signing_algorithm)
|
||||
public_key = self.private_key.public_key()
|
||||
try:
|
||||
public_key.verify(signature, message, ec.ECDSA(hash_algorithm))
|
||||
return True
|
||||
except InvalidSignature:
|
||||
return False
|
||||
|
||||
def public_key(self) -> bytes:
|
||||
return self.private_key.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.DER,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||
)
|
||||
|
||||
|
||||
def generate_private_key(key_spec: str) -> AbstractPrivateKey:
|
||||
"""Generate a private key to be used on asymmetric sign/verify."""
|
||||
if key_spec == KeySpec.RSA_2048:
|
||||
@ -245,6 +306,8 @@ def generate_private_key(key_spec: str) -> AbstractPrivateKey:
|
||||
return RSAPrivateKey(key_size=3072)
|
||||
elif key_spec == KeySpec.RSA_4096:
|
||||
return RSAPrivateKey(key_size=4096)
|
||||
elif key_spec in KeySpec.ecc_key_specs():
|
||||
return ECDSAPrivateKey(key_spec)
|
||||
else:
|
||||
return RSAPrivateKey(key_size=2048)
|
||||
|
||||
|
@ -1252,6 +1252,133 @@ def test_fail_verify_digest_message_type_RSA(
|
||||
assert verify_response["SignatureValid"] is False
|
||||
|
||||
|
||||
@mock_kms
|
||||
@pytest.mark.parametrize(
|
||||
"key_spec, signing_algorithm",
|
||||
[
|
||||
("ECC_NIST_P256", "ECDSA_SHA_256"),
|
||||
("ECC_SECG_P256K1", "ECDSA_SHA_256"),
|
||||
("ECC_NIST_P384", "ECDSA_SHA_384"),
|
||||
("ECC_NIST_P521", "ECDSA_SHA_512"),
|
||||
],
|
||||
)
|
||||
def test_sign_and_verify_digest_message_type_ECDSA(key_spec, signing_algorithm):
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
key = client.create_key(
|
||||
Description="sign-key", KeyUsage="SIGN_VERIFY", KeySpec=key_spec
|
||||
)
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
|
||||
digest = hashes.Hash(hashes.SHA256())
|
||||
digest.update(b"this works")
|
||||
digest.update(b"as well")
|
||||
message = digest.finalize()
|
||||
|
||||
sign_response = client.sign(
|
||||
KeyId=key_id,
|
||||
Message=message,
|
||||
SigningAlgorithm=signing_algorithm,
|
||||
MessageType="DIGEST",
|
||||
)
|
||||
|
||||
verify_response = client.verify(
|
||||
KeyId=key_id,
|
||||
Message=message,
|
||||
Signature=sign_response["Signature"],
|
||||
SigningAlgorithm=signing_algorithm,
|
||||
)
|
||||
|
||||
assert verify_response["SignatureValid"] is True
|
||||
|
||||
|
||||
@mock_kms
|
||||
@pytest.mark.parametrize(
|
||||
"key_spec, signing_algorithm, valid_signing_algorithms",
|
||||
[
|
||||
("ECC_NIST_P256", "ECDSA_SHA_384", ["ECDSA_SHA_256"]),
|
||||
("ECC_SECG_P256K1", "ECDSA_SHA_512", ["ECDSA_SHA_256"]),
|
||||
("ECC_NIST_P384", "ECDSA_SHA_256", ["ECDSA_SHA_384"]),
|
||||
("ECC_NIST_P521", "ECDSA_SHA_384", ["ECDSA_SHA_512"]),
|
||||
],
|
||||
)
|
||||
def test_invalid_signing_algorithm_for_key_spec_type_ECDSA(
|
||||
key_spec, signing_algorithm, valid_signing_algorithms
|
||||
):
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
key = client.create_key(
|
||||
Description="sign-key", KeyUsage="SIGN_VERIFY", KeySpec=key_spec
|
||||
)
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
|
||||
digest = hashes.Hash(hashes.SHA256())
|
||||
digest.update(b"this works")
|
||||
digest.update(b"as well")
|
||||
message = digest.finalize()
|
||||
|
||||
with pytest.raises(ClientError) as ex:
|
||||
_ = client.sign(
|
||||
KeyId=key_id,
|
||||
Message=message,
|
||||
SigningAlgorithm=signing_algorithm,
|
||||
MessageType="DIGEST",
|
||||
)
|
||||
err = ex.value.response["Error"]
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert err["Message"] == (
|
||||
"1 validation error detected: Value '{signing_algorithm}' at 'SigningAlgorithm' failed "
|
||||
"to satisfy constraint: Member must satisfy enum value set: "
|
||||
"{valid_sign_algorithms}"
|
||||
).format(
|
||||
signing_algorithm=signing_algorithm,
|
||||
valid_sign_algorithms=valid_signing_algorithms,
|
||||
)
|
||||
|
||||
|
||||
@mock_kms
|
||||
@pytest.mark.parametrize(
|
||||
"key_spec, signing_algorithm",
|
||||
[
|
||||
("ECC_NIST_P256", "ECDSA_SHA_256"),
|
||||
("ECC_SECG_P256K1", "ECDSA_SHA_256"),
|
||||
("ECC_NIST_P384", "ECDSA_SHA_384"),
|
||||
("ECC_NIST_P521", "ECDSA_SHA_512"),
|
||||
],
|
||||
)
|
||||
def test_fail_verify_digest_message_type_ECDSA(key_spec, signing_algorithm):
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
key = client.create_key(
|
||||
Description="sign-key", KeyUsage="SIGN_VERIFY", KeySpec=key_spec
|
||||
)
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
|
||||
digest = hashes.Hash(hashes.SHA256())
|
||||
digest.update(b"this works")
|
||||
digest.update(b"as well")
|
||||
falsified_digest = digest.copy()
|
||||
message = digest.finalize()
|
||||
falsified_digest.update(b"This sentence has been falsified")
|
||||
falsified_message = falsified_digest.finalize()
|
||||
|
||||
sign_response = client.sign(
|
||||
KeyId=key_id,
|
||||
Message=message,
|
||||
SigningAlgorithm=signing_algorithm,
|
||||
MessageType="DIGEST",
|
||||
)
|
||||
|
||||
verify_response = client.verify(
|
||||
KeyId=key_id,
|
||||
Message=falsified_message,
|
||||
Signature=sign_response["Signature"],
|
||||
SigningAlgorithm=signing_algorithm,
|
||||
)
|
||||
|
||||
assert verify_response["SignatureValid"] is False
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_sign_invalid_key_usage():
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
@ -20,6 +20,7 @@ from moto.kms.utils import (
|
||||
KeySpec,
|
||||
SigningAlgorithm,
|
||||
RSAPrivateKey,
|
||||
ECDSAPrivateKey,
|
||||
)
|
||||
|
||||
ENCRYPTION_CONTEXT_VECTORS = [
|
||||
@ -69,7 +70,7 @@ def test_KeySpec_Enum():
|
||||
KeySpec.ECC_NIST_P256,
|
||||
KeySpec.ECC_SECG_P256K1,
|
||||
KeySpec.ECC_NIST_P384,
|
||||
KeySpec.ECC_NIST_P512,
|
||||
KeySpec.ECC_NIST_P521,
|
||||
]
|
||||
)
|
||||
assert KeySpec.hmac_key_specs() == sorted(
|
||||
@ -106,6 +107,15 @@ def test_RSAPrivateKey_invalid_key_size():
|
||||
)
|
||||
|
||||
|
||||
def test_ECDSAPrivateKey_invalid_key_spec():
|
||||
with pytest.raises(ValidationException) as ex:
|
||||
_ = ECDSAPrivateKey(key_spec="InvalidKeySpec")
|
||||
assert (
|
||||
ex.value.message
|
||||
== "1 validation error detected: Value at 'key_spec' failed to satisfy constraint: Member must satisfy enum value set: ['ECC_NIST_P256', 'ECC_NIST_P384', 'ECC_NIST_P521', 'ECC_SECG_P256K1']"
|
||||
)
|
||||
|
||||
|
||||
def test_generate_data_key():
|
||||
test = generate_data_key(123)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user