KMS: Return proper metadata when replicating a key (#6788)

This commit is contained in:
Brett Buford 2023-09-08 08:23:09 -07:00 committed by GitHub
parent 1d3ddc9a19
commit a8ab8011ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 4 deletions

View File

@ -322,14 +322,16 @@ class KmsBackend(BaseBackend):
#
# In our implementation with just create a copy of all the properties once without any protection from change,
# as the exact implementation is currently infeasible.
def replicate_key(self, key_id: str, replica_region: str) -> None:
def replicate_key(self, key_id: str, replica_region: str) -> Key:
# Using copy() instead of deepcopy(), as the latter results in exception:
# TypeError: cannot pickle '_cffi_backend.FFI' object
# Since we only update top level properties, copy() should suffice.
replica_key = copy(self.keys[key_id])
replica_key.region = replica_region
replica_key.arn = replica_key.arn.replace(self.region_name, replica_region)
to_region_backend = kms_backends[self.account_id][replica_region]
to_region_backend.keys[replica_key.id] = replica_key
return replica_key
def update_key_description(self, key_id: str, description: str) -> None:
key = self.keys[self.get_key_id(key_id)]

View File

@ -128,11 +128,17 @@ class KmsResponse(BaseResponse):
)
return json.dumps(key.to_dict())
def replicate_key(self) -> None:
def replicate_key(self) -> str:
key_id = self._get_param("KeyId")
self._validate_key_id(key_id)
replica_region = self._get_param("ReplicaRegion")
self.kms_backend.replicate_key(key_id, replica_region)
replica_key = self.kms_backend.replicate_key(key_id, replica_region)
return json.dumps(
{
"ReplicaKeyMetadata": replica_key.to_dict()["KeyMetadata"],
"ReplicaPolicy": replica_key.generate_default_policy(),
}
)
def update_key_description(self) -> str:
"""https://docs.aws.amazon.com/kms/latest/APIReference/API_UpdateKeyDescription.html"""

View File

@ -171,12 +171,16 @@ def test_replicate_key():
to_region_client.describe_key(KeyId=key_id)
with mock.patch.object(rsa, "generate_private_key", return_value=""):
from_region_client.replicate_key(
replica_response = from_region_client.replicate_key(
KeyId=key_id, ReplicaRegion=region_to_replicate_to
)
to_region_client.describe_key(KeyId=key_id)
from_region_client.describe_key(KeyId=key_id)
assert "ReplicaKeyMetadata" in replica_response
assert region_to_replicate_to in replica_response["ReplicaKeyMetadata"]["Arn"]
assert "ReplicaPolicy" in replica_response
@mock_kms
def test_create_key_deprecated_master_custom_key_spec():