Add KMS Sign/Verify (#5243)

This commit is contained in:
José Coelho 2022-06-22 05:18:51 +12:00 committed by GitHub
parent e99e8a883c
commit 25aad70481
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 363 additions and 26 deletions

View File

@ -3572,14 +3572,14 @@
- [X] retire_grant
- [X] revoke_grant
- [X] schedule_key_deletion
- [ ] sign
- [X] sign
- [X] tag_resource
- [X] untag_resource
- [ ] update_alias
- [ ] update_custom_key_store
- [X] update_key_description
- [ ] update_primary_region
- [ ] verify
- [X] verify
- [ ] verify_mac
</details>

View File

@ -482,7 +482,7 @@ class Table(CloudFormationModel):
key = kms.create_key(
policy="",
key_usage="ENCRYPT_DECRYPT",
customer_master_key_spec="SYMMETRIC_DEFAULT",
key_spec="SYMMETRIC_DEFAULT",
description="Default master key that protects my DynamoDB table storage",
tags=None,
region=region,

View File

@ -410,7 +410,7 @@ class EBSBackend:
key = kms.create_key(
policy="",
key_usage="ENCRYPT_DECRYPT",
customer_master_key_spec="SYMMETRIC_DEFAULT",
key_spec="SYMMETRIC_DEFAULT",
description="Default master key that protects my EBS volumes when no other key is defined",
tags=None,
region=self.region_name,

View File

@ -2,6 +2,11 @@ import json
import os
from collections import defaultdict
from datetime import datetime, timedelta
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from moto.apigateway.exceptions import ValidationException
from moto.core import get_account_id, BaseBackend, BaseModel, CloudFormationModel
from moto.core.utils import get_random_hex, unix_time, BackendDict
@ -14,6 +19,7 @@ from .utils import (
encrypt,
generate_key_id,
generate_master_key,
generate_private_key,
)
@ -49,9 +55,7 @@ class Grant(BaseModel):
class Key(CloudFormationModel):
def __init__(
self, policy, key_usage, customer_master_key_spec, description, region
):
def __init__(self, policy, key_usage, key_spec, description, region):
self.id = generate_key_id()
self.creation_date = unix_time()
self.policy = policy or self.generate_default_policy()
@ -64,9 +68,10 @@ class Key(CloudFormationModel):
self.key_rotation_status = False
self.deletion_date = None
self.key_material = generate_master_key()
self.private_key = generate_private_key()
self.origin = "AWS_KMS"
self.key_manager = "CUSTOMER"
self.customer_master_key_spec = customer_master_key_spec or "SYMMETRIC_DEFAULT"
self.key_spec = key_spec or "SYMMETRIC_DEFAULT"
self.grants = dict()
@ -139,7 +144,7 @@ class Key(CloudFormationModel):
def encryption_algorithms(self):
if self.key_usage == "SIGN_VERIFY":
return None
elif self.customer_master_key_spec == "SYMMETRIC_DEFAULT":
elif self.key_spec == "SYMMETRIC_DEFAULT":
return ["SYMMETRIC_DEFAULT"]
else:
return ["RSAES_OAEP_SHA_1", "RSAES_OAEP_SHA_256"]
@ -148,11 +153,11 @@ class Key(CloudFormationModel):
def signing_algorithms(self):
if self.key_usage == "ENCRYPT_DECRYPT":
return None
elif self.customer_master_key_spec in ["ECC_NIST_P256", "ECC_SECG_P256K1"]:
elif self.key_spec in ["ECC_NIST_P256", "ECC_SECG_P256K1"]:
return ["ECDSA_SHA_256"]
elif self.customer_master_key_spec == "ECC_NIST_P384":
elif self.key_spec == "ECC_NIST_P384":
return ["ECDSA_SHA_384"]
elif self.customer_master_key_spec == "ECC_NIST_P521":
elif self.key_spec == "ECC_NIST_P521":
return ["ECDSA_SHA_512"]
else:
return [
@ -170,7 +175,8 @@ class Key(CloudFormationModel):
"AWSAccountId": self.account_id,
"Arn": self.arn,
"CreationDate": self.creation_date,
"CustomerMasterKeySpec": self.customer_master_key_spec,
"CustomerMasterKeySpec": self.key_spec,
"KeySpec": self.key_spec,
"Description": self.description,
"Enabled": self.enabled,
"EncryptionAlgorithms": self.encryption_algorithms,
@ -208,7 +214,7 @@ class Key(CloudFormationModel):
key = kms_backend.create_key(
policy=properties["KeyPolicy"],
key_usage="ENCRYPT_DECRYPT",
customer_master_key_spec="SYMMETRIC_DEFAULT",
key_spec="SYMMETRIC_DEFAULT",
description=properties["Description"],
tags=properties.get("Tags", []),
region=region_name,
@ -258,10 +264,8 @@ class KmsBackend(BaseBackend):
self.add_alias(key.id, alias_name)
return key.id
def create_key(
self, policy, key_usage, customer_master_key_spec, description, tags, region
):
key = Key(policy, key_usage, customer_master_key_spec, description, region)
def create_key(self, policy, key_usage, key_spec, description, tags, region):
key = Key(policy, key_usage, key_spec, description, region)
self.keys[key.id] = key
if tags is not None and len(tags) > 0:
self.tag_resource(key.id, tags)
@ -516,5 +520,93 @@ class KmsBackend(BaseBackend):
key = self.describe_key(key_id)
key.retire_grant(grant_id)
def __ensure_valid_sign_and_verify_key(self, key: Key):
if key.key_usage != "SIGN_VERIFY":
raise ValidationException(
(
"1 validation error detected: Value '{key_id}' at 'KeyId' failed "
"to satisfy constraint: Member must point to a key with usage: 'SIGN_VERIFY'"
).format(key_id=key.id)
)
def __ensure_valid_signing_augorithm(self, key: Key, signing_algorithm):
if signing_algorithm not in key.signing_algorithms:
raise ValidationException(
(
"1 validation error detected: Value '{signing_algorithm}' at 'SigningAlgorithm' failed "
"to satisfy constraint: Member must satisfy enum value set: "
"{valid_sign_algorithms}"
).format(
signing_algorithm=signing_algorithm,
valid_sign_algorithms=key.signing_algorithms,
)
)
def sign(self, key_id, message, signing_algorithm):
"""Sign message using generated private key.
NOTES:
- signing_algorithm is ignored and hardcoded to RSASSA_PSS_SHA_256
- message_type DIGEST is not implemented
- grant_tokens are not implemented
"""
key = self.describe_key(key_id)
self.__ensure_valid_sign_and_verify_key(key)
self.__ensure_valid_signing_augorithm(key, signing_algorithm)
# TODO: support more than one hardcoded algorithm based on KeySpec
signature = key.private_key.sign(
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256(),
)
return key.arn, signature, signing_algorithm
def verify(self, key_id, message, signature, signing_algorithm):
"""Verify message using public key from generated private key.
NOTES:
- signing_algorithm is ignored and hardcoded to RSASSA_PSS_SHA_256
- message_type DIGEST is not implemented
- grant_tokens are not implemented
"""
key = self.describe_key(key_id)
self.__ensure_valid_sign_and_verify_key(key)
self.__ensure_valid_signing_augorithm(key, signing_algorithm)
if signing_algorithm not in key.signing_algorithms:
raise ValidationException(
(
"1 validation error detected: Value '{signing_algorithm}' at 'SigningAlgorithm' failed "
"to satisfy constraint: Member must satisfy enum value set: "
"{valid_sign_algorithms}"
).format(
signing_algorithm=signing_algorithm,
valid_sign_algorithms=key.signing_algorithms,
)
)
public_key = key.private_key.public_key()
try:
# TODO: support more than one hardcoded algorithm based on KeySpec
public_key.verify(
signature,
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH,
),
hashes.SHA256(),
)
return key.arn, True, signing_algorithm
except InvalidSignature:
return key.arn, False, signing_algorithm
kms_backends = BackendDict(KmsBackend, "kms")

View File

@ -2,6 +2,7 @@ import base64
import json
import os
import re
import warnings
from moto.core import get_account_id
from moto.core.responses import BaseResponse
@ -108,12 +109,14 @@ class KmsResponse(BaseResponse):
"""https://docs.aws.amazon.com/kms/latest/APIReference/API_CreateKey.html"""
policy = self.parameters.get("Policy")
key_usage = self.parameters.get("KeyUsage")
customer_master_key_spec = self.parameters.get("CustomerMasterKeySpec")
key_spec = self.parameters.get("KeySpec") or self.parameters.get(
"CustomerMasterKeySpec"
)
description = self.parameters.get("Description")
tags = self.parameters.get("Tags")
key = self.kms_backend.create_key(
policy, key_usage, customer_master_key_spec, description, tags, self.region
policy, key_usage, key_spec, description, tags, self.region
)
return json.dumps(key.to_dict())
@ -599,6 +602,119 @@ class KmsResponse(BaseResponse):
return json.dumps({"Plaintext": response_entropy})
def sign(self):
"""https://docs.aws.amazon.com/kms/latest/APIReference/API_Sign.html"""
key_id = self.parameters.get("KeyId")
message = self.parameters.get("Message")
message_type = self.parameters.get("MessageType")
grant_tokens = self.parameters.get("GrantTokens")
signing_algorithm = self.parameters.get("SigningAlgorithm")
self._validate_key_id(key_id)
if grant_tokens:
warnings.warn(
"The GrantTokens-parameter is not yet implemented for client.sign()"
)
if message_type == "DIGEST":
warnings.warn(
"The MessageType-parameter DIGEST is not yet implemented for client.sign()"
)
if signing_algorithm != "RSASSA_PSS_SHA_256":
warnings.warn(
"The SigningAlgorithm-parameter is ignored hardcoded to RSASSA_PSS_SHA_256 for client.sign()"
)
if isinstance(message, str):
message = message.encode("utf-8")
if message == b"":
raise ValidationException(
"1 validation error detected: Value at 'Message' failed to satisfy constraint: Member must have length greater than or equal to 1"
)
if not message_type:
message_type = "RAW"
key_id, signature, signing_algorithm = self.kms_backend.sign(
key_id=key_id,
message=message,
signing_algorithm=signing_algorithm,
)
signature_blob_response = base64.b64encode(signature).decode("utf-8")
return json.dumps(
{
"KeyId": key_id,
"Signature": signature_blob_response,
"SigningAlgorithm": signing_algorithm,
}
)
def verify(self):
"""https://docs.aws.amazon.com/kms/latest/APIReference/API_Verify.html"""
key_id = self.parameters.get("KeyId")
message = self.parameters.get("Message")
message_type = self.parameters.get("MessageType")
signature = self.parameters.get("Signature")
signing_algorithm = self.parameters.get("SigningAlgorithm")
grant_tokens = self.parameters.get("GrantTokens")
self._validate_key_id(key_id)
if grant_tokens:
warnings.warn(
"The GrantTokens-parameter is not yet implemented for client.verify()"
)
if message_type == "DIGEST":
warnings.warn(
"The MessageType-parameter DIGEST is not yet implemented for client.verify()"
)
if signing_algorithm != "RSASSA_PSS_SHA_256":
warnings.warn(
"The SigningAlgorithm-parameter is ignored hardcoded to RSASSA_PSS_SHA_256 for client.verify()"
)
if not message_type:
message_type = "RAW"
if isinstance(message, str):
message = message.encode("utf-8")
if message == b"":
raise ValidationException(
"1 validation error detected: Value at 'Message' failed to satisfy constraint: Member must have length greater than or equal to 1"
)
if isinstance(signature, str):
# we return base64 signatures, when signing
signature = base64.b64decode(signature.encode("utf-8"))
if signature == b"":
raise ValidationException(
"1 validation error detected: Value at 'Signature' failed to satisfy constraint: Member must have length greater than or equal to 1"
)
key_arn, signature_valid, signing_algorithm = self.kms_backend.verify(
key_id=key_id,
message=message,
signature=signature,
signing_algorithm=signing_algorithm,
)
return json.dumps(
{
"KeyId": key_arn,
"SignatureValid": signature_valid,
"SigningAlgorithm": signing_algorithm,
}
)
def _assert_default_policy(policy_name):
if policy_name != "default":

View File

@ -6,6 +6,7 @@ import uuid
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes
from cryptography.hazmat.primitives.asymmetric import rsa
from .exceptions import (
InvalidCiphertextException,
@ -58,6 +59,18 @@ def generate_master_key():
return generate_data_key(MASTER_KEY_LEN)
def generate_private_key():
"""Generate a private key to be used on asymmetric sign/verify.
NOTE: KeySpec is not taken into consideration and the key is always RSA_2048
this could be improved to support multiple key types
"""
return rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
def _serialize_ciphertext_blob(ciphertext):
"""Serialize Ciphertext object into a ciphertext blob.

View File

@ -71,6 +71,7 @@ def test_create_key():
key["KeyMetadata"]["AWSAccountId"].should.equal(ACCOUNT_ID)
key["KeyMetadata"]["CreationDate"].should.be.a(datetime)
key["KeyMetadata"]["CustomerMasterKeySpec"].should.equal("SYMMETRIC_DEFAULT")
key["KeyMetadata"]["KeySpec"].should.equal("SYMMETRIC_DEFAULT")
key["KeyMetadata"]["Description"].should.equal("my key")
key["KeyMetadata"]["Enabled"].should.equal(True)
key["KeyMetadata"]["EncryptionAlgorithms"].should.equal(["SYMMETRIC_DEFAULT"])
@ -81,14 +82,14 @@ def test_create_key():
key["KeyMetadata"]["Origin"].should.equal("AWS_KMS")
key["KeyMetadata"].should_not.have.key("SigningAlgorithms")
key = conn.create_key(KeyUsage="ENCRYPT_DECRYPT", CustomerMasterKeySpec="RSA_2048")
key = conn.create_key(KeyUsage="ENCRYPT_DECRYPT", KeySpec="RSA_2048")
sorted(key["KeyMetadata"]["EncryptionAlgorithms"]).should.equal(
["RSAES_OAEP_SHA_1", "RSAES_OAEP_SHA_256"]
)
key["KeyMetadata"].should_not.have.key("SigningAlgorithms")
key = conn.create_key(KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="RSA_2048")
key = conn.create_key(KeyUsage="SIGN_VERIFY", KeySpec="RSA_2048")
key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms")
sorted(key["KeyMetadata"]["SigningAlgorithms"]).should.equal(
@ -102,23 +103,33 @@ def test_create_key():
]
)
key = conn.create_key(
KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_SECG_P256K1"
)
key = conn.create_key(KeyUsage="SIGN_VERIFY", KeySpec="ECC_SECG_P256K1")
key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms")
key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_256"])
key = conn.create_key(KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_NIST_P384")
key = conn.create_key(KeyUsage="SIGN_VERIFY", KeySpec="ECC_NIST_P384")
key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms")
key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_384"])
key = conn.create_key(KeyUsage="SIGN_VERIFY", KeySpec="ECC_NIST_P521")
key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms")
key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_512"])
@mock_kms
def test_create_key_deprecated_master_custom_key_spec():
conn = boto3.client("kms", region_name="us-east-1")
key = conn.create_key(KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_NIST_P521")
key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms")
key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_512"])
key["KeyMetadata"]["CustomerMasterKeySpec"].should.equal("ECC_NIST_P521")
key["KeyMetadata"]["KeySpec"].should.equal("ECC_NIST_P521")
@pytest.mark.parametrize("id_or_arn", ["KeyId", "Arn"])
@mock_kms
@ -132,6 +143,7 @@ def test_describe_key(id_or_arn):
response["KeyMetadata"]["AWSAccountId"].should.equal("123456789012")
response["KeyMetadata"]["CreationDate"].should.be.a(datetime)
response["KeyMetadata"]["CustomerMasterKeySpec"].should.equal("SYMMETRIC_DEFAULT")
response["KeyMetadata"]["KeySpec"].should.equal("SYMMETRIC_DEFAULT")
response["KeyMetadata"]["Description"].should.equal("my key")
response["KeyMetadata"]["Enabled"].should.equal(True)
response["KeyMetadata"]["EncryptionAlgorithms"].should.equal(["SYMMETRIC_DEFAULT"])
@ -1109,3 +1121,107 @@ def test_key_tag_added_arn_based_happy():
]
client.tag_resource(KeyId=key_id, Tags=tags)
_check_tags(key_id, tags, client)
@pytest.mark.parametrize("plaintext", PLAINTEXT_VECTORS)
@mock_kms
def test_sign_happy(plaintext):
client = boto3.client("kms", region_name="us-west-2")
key = client.create_key(Description="sign-key", KeyUsage="SIGN_VERIFY")
key_id = key["KeyMetadata"]["KeyId"]
key_arn = key["KeyMetadata"]["Arn"]
signing_algorithm = "RSASSA_PSS_SHA_256"
sign_response = client.sign(
KeyId=key_id, Message=plaintext, SigningAlgorithm=signing_algorithm
)
sign_response["Signature"].should_not.equal(plaintext)
sign_response["SigningAlgorithm"].should.equal(signing_algorithm)
sign_response["KeyId"].should.equal(key_arn)
@mock_kms
def test_sign_invalid_signing_algorithm():
client = boto3.client("kms", region_name="us-west-2")
key = client.create_key(Description="sign-key", KeyUsage="SIGN_VERIFY")
key_id = key["KeyMetadata"]["KeyId"]
message = "My message"
signing_algorithm = "INVALID"
with pytest.raises(ClientError) as ex:
client.sign(KeyId=key_id, Message=message, SigningAlgorithm=signing_algorithm)
err = ex.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"1 validation error detected: Value 'INVALID' at 'SigningAlgorithm' failed to satisfy constraint: Member must satisfy enum value set: ['RSASSA_PKCS1_V1_5_SHA_256', 'RSASSA_PKCS1_V1_5_SHA_384', 'RSASSA_PKCS1_V1_5_SHA_512', 'RSASSA_PSS_SHA_256', 'RSASSA_PSS_SHA_384', 'RSASSA_PSS_SHA_512']"
)
@mock_kms
def test_sign_invalid_key_usage():
client = boto3.client("kms", region_name="us-west-2")
key = client.create_key(Description="sign-key", KeyUsage="ENCRYPT_DECRYPT")
key_id = key["KeyMetadata"]["KeyId"]
message = "My message"
signing_algorithm = "RSASSA_PSS_SHA_256"
with pytest.raises(ClientError) as ex:
client.sign(KeyId=key_id, Message=message, SigningAlgorithm=signing_algorithm)
err = ex.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
f"1 validation error detected: Value '{key_id}' at 'KeyId' failed to satisfy constraint: Member must point to a key with usage: 'SIGN_VERIFY'"
)
@mock_kms
def test_sign_invalid_message():
client = boto3.client("kms", region_name="us-west-2")
key = client.create_key(Description="sign-key", KeyUsage="SIGN_VERIFY")
key_id = key["KeyMetadata"]["KeyId"]
message = ""
signing_algorithm = "RSASSA_PSS_SHA_256"
with pytest.raises(ClientError) as ex:
client.sign(KeyId=key_id, Message=message, SigningAlgorithm=signing_algorithm)
err = ex.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"1 validation error detected: Value at 'Message' failed to satisfy constraint: Member must have length greater than or equal to 1"
)
@pytest.mark.parametrize("plaintext", PLAINTEXT_VECTORS)
@mock_kms
def test_verify_happy(plaintext):
client = boto3.client("kms", region_name="us-west-2")
key = client.create_key(Description="sign-key", KeyUsage="SIGN_VERIFY")
key_id = key["KeyMetadata"]["KeyId"]
key_arn = key["KeyMetadata"]["Arn"]
signing_algorithm = "RSASSA_PSS_SHA_256"
sign_response = client.sign(
KeyId=key_id, Message=plaintext, SigningAlgorithm=signing_algorithm
)
signature = sign_response["Signature"]
verify_response = client.verify(
KeyId=key_id,
Message=plaintext,
Signature=signature,
SigningAlgorithm=signing_algorithm,
)
verify_response["SigningAlgorithm"].should.equal(signing_algorithm)
verify_response["KeyId"].should.equal(key_arn)
verify_response["SignatureValid"].should.equal(True)