add kms:ReEncrypt and tests
This commit is contained in:
parent
d5ac5453b3
commit
9ffb9d3d0a
@ -214,6 +214,17 @@ class KmsBackend(BaseBackend):
|
|||||||
arn = self.keys[key_id].arn
|
arn = self.keys[key_id].arn
|
||||||
return plaintext, arn
|
return plaintext, arn
|
||||||
|
|
||||||
|
def re_encrypt(
|
||||||
|
self, ciphertext_blob, source_encryption_context, destination_key_id, destination_encryption_context
|
||||||
|
):
|
||||||
|
plaintext, decrypting_arn = self.decrypt(
|
||||||
|
ciphertext_blob=ciphertext_blob, encryption_context=source_encryption_context
|
||||||
|
)
|
||||||
|
new_ciphertext_blob, encrypting_arn = self.encrypt(
|
||||||
|
key_id=destination_key_id, plaintext=plaintext, encryption_context=destination_encryption_context
|
||||||
|
)
|
||||||
|
return new_ciphertext_blob, decrypting_arn, encrypting_arn
|
||||||
|
|
||||||
def generate_data_key(self, key_id, encryption_context, number_of_bytes, key_spec, grant_tokens):
|
def generate_data_key(self, key_id, encryption_context, number_of_bytes, key_spec, grant_tokens):
|
||||||
key_id = self.any_id_to_key_id(key_id)
|
key_id = self.any_id_to_key_id(key_id)
|
||||||
|
|
||||||
|
@ -260,6 +260,25 @@ class KmsResponse(BaseResponse):
|
|||||||
|
|
||||||
return json.dumps({"Plaintext": plaintext_response, 'KeyId': arn})
|
return json.dumps({"Plaintext": plaintext_response, 'KeyId': arn})
|
||||||
|
|
||||||
|
def re_encrypt(self):
|
||||||
|
ciphertext_blob = self.parameters.get("CiphertextBlob")
|
||||||
|
source_encryption_context = self.parameters.get("SourceEncryptionContext", {})
|
||||||
|
destination_key_id = self.parameters.get("DestinationKeyId")
|
||||||
|
destination_encryption_context = self.parameters.get("DestinationEncryptionContext", {})
|
||||||
|
|
||||||
|
new_ciphertext_blob, decrypting_arn, encrypting_arn = self.kms_backend.re_encrypt(
|
||||||
|
ciphertext_blob=ciphertext_blob,
|
||||||
|
source_encryption_context=source_encryption_context,
|
||||||
|
destination_key_id=destination_key_id,
|
||||||
|
destination_encryption_context=destination_encryption_context,
|
||||||
|
)
|
||||||
|
|
||||||
|
response_ciphertext_blob = base64.b64encode(new_ciphertext_blob).decode("utf-8")
|
||||||
|
|
||||||
|
return json.dumps(
|
||||||
|
{"CiphertextBlob": response_ciphertext_blob, "KeyId": encrypting_arn, "SourceKeyId": decrypting_arn}
|
||||||
|
)
|
||||||
|
|
||||||
def disable_key(self):
|
def disable_key(self):
|
||||||
key_id = self.parameters.get('KeyId')
|
key_id = self.parameters.get('KeyId')
|
||||||
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
|
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
|
||||||
|
@ -840,6 +840,55 @@ def test_generate_data_key_without_plaintext_decrypt():
|
|||||||
assert "Plaintext" not in resp1
|
assert "Plaintext" not in resp1
|
||||||
|
|
||||||
|
|
||||||
|
@parameterized(PLAINTEXT_VECTORS)
|
||||||
|
@mock_kms
|
||||||
|
def test_re_encrypt_decrypt(plaintext):
|
||||||
|
client = boto3.client("kms", region_name="us-west-2")
|
||||||
|
|
||||||
|
key_1 = client.create_key(Description="key 1")
|
||||||
|
key_1_id = key_1["KeyMetadata"]["KeyId"]
|
||||||
|
key_1_arn = key_1["KeyMetadata"]["Arn"]
|
||||||
|
key_2 = client.create_key(Description="key 2")
|
||||||
|
key_2_id = key_2["KeyMetadata"]["KeyId"]
|
||||||
|
key_2_arn = key_2["KeyMetadata"]["Arn"]
|
||||||
|
|
||||||
|
encrypt_response = client.encrypt(
|
||||||
|
KeyId=key_1_id,
|
||||||
|
Plaintext=plaintext,
|
||||||
|
EncryptionContext={"encryption": "context"},
|
||||||
|
)
|
||||||
|
|
||||||
|
re_encrypt_response = client.re_encrypt(
|
||||||
|
CiphertextBlob=encrypt_response["CiphertextBlob"],
|
||||||
|
SourceEncryptionContext={"encryption": "context"},
|
||||||
|
DestinationKeyId=key_2_id,
|
||||||
|
DestinationEncryptionContext={"another": "context"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# CiphertextBlob must NOT be base64-encoded
|
||||||
|
with assert_raises(Exception):
|
||||||
|
base64.b64decode(re_encrypt_response["CiphertextBlob"], validate=True)
|
||||||
|
|
||||||
|
re_encrypt_response["SourceKeyId"].should.equal(key_1_arn)
|
||||||
|
re_encrypt_response["KeyId"].should.equal(key_2_arn)
|
||||||
|
|
||||||
|
decrypt_response_1 = client.decrypt(
|
||||||
|
CiphertextBlob=encrypt_response["CiphertextBlob"],
|
||||||
|
EncryptionContext={"encryption": "context"},
|
||||||
|
)
|
||||||
|
decrypt_response_1["Plaintext"].should.equal(plaintext)
|
||||||
|
decrypt_response_1["KeyId"].should.equal(key_1_arn)
|
||||||
|
|
||||||
|
decrypt_response_2 = client.decrypt(
|
||||||
|
CiphertextBlob=re_encrypt_response["CiphertextBlob"],
|
||||||
|
EncryptionContext={"another": "context"},
|
||||||
|
)
|
||||||
|
decrypt_response_2["Plaintext"].should.equal(plaintext)
|
||||||
|
decrypt_response_2["KeyId"].should.equal(key_2_arn)
|
||||||
|
|
||||||
|
decrypt_response_1["Plaintext"].should.equal(decrypt_response_2["Plaintext"])
|
||||||
|
|
||||||
|
|
||||||
@mock_kms
|
@mock_kms
|
||||||
def test_enable_key_rotation_key_not_found():
|
def test_enable_key_rotation_key_not_found():
|
||||||
client = boto3.client("kms", region_name="us-east-1")
|
client = boto3.client("kms", region_name="us-east-1")
|
||||||
|
Loading…
Reference in New Issue
Block a user