From 898f0928a45aac62342d57028a532de2ddeef833 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Fri, 27 May 2022 22:11:09 +0000 Subject: [PATCH] KMS - add Grants API (#5177) --- IMPLEMENTATION_COVERAGE.md | 12 +- docs/docs/services/kms.rst | 10 +- moto/kms/models.py | 118 ++++++++++- moto/kms/responses.py | 58 ++++++ .../terraform-tests.success.txt | 6 + tests/test_kms/test_kms_grants.py | 194 ++++++++++++++++++ 6 files changed, 384 insertions(+), 14 deletions(-) create mode 100644 tests/test_kms/test_kms_grants.py diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index 44f2401fe..68fe2688b 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -3412,13 +3412,13 @@ ## kms
-42% implemented +52% implemented - [X] cancel_key_deletion - [ ] connect_custom_key_store - [ ] create_alias - [ ] create_custom_key_store -- [ ] create_grant +- [X] create_grant - [X] create_key - [X] decrypt - [X] delete_alias @@ -3444,16 +3444,16 @@ - [ ] get_public_key - [ ] import_key_material - [ ] list_aliases -- [ ] list_grants +- [X] list_grants - [ ] list_key_policies - [X] list_keys - [X] list_resource_tags -- [ ] list_retirable_grants +- [X] list_retirable_grants - [X] put_key_policy - [X] re_encrypt - [ ] replicate_key -- [ ] retire_grant -- [ ] revoke_grant +- [X] retire_grant +- [X] revoke_grant - [X] schedule_key_deletion - [ ] sign - [X] tag_resource diff --git a/docs/docs/services/kms.rst b/docs/docs/services/kms.rst index db33d4a66..7b878330b 100644 --- a/docs/docs/services/kms.rst +++ b/docs/docs/services/kms.rst @@ -29,7 +29,7 @@ kms - [ ] connect_custom_key_store - [ ] create_alias - [ ] create_custom_key_store -- [ ] create_grant +- [X] create_grant - [X] create_key - [X] decrypt - [X] delete_alias @@ -57,16 +57,16 @@ kms - [ ] get_public_key - [ ] import_key_material - [ ] list_aliases -- [ ] list_grants +- [X] list_grants - [ ] list_key_policies - [X] list_keys - [X] list_resource_tags -- [ ] list_retirable_grants +- [X] list_retirable_grants - [X] put_key_policy - [X] re_encrypt - [ ] replicate_key -- [ ] retire_grant -- [ ] revoke_grant +- [X] retire_grant +- [X] revoke_grant - [X] schedule_key_deletion - [ ] sign - [X] tag_resource diff --git a/moto/kms/models.py b/moto/kms/models.py index 4fe19e205..17304d094 100644 --- a/moto/kms/models.py +++ b/moto/kms/models.py @@ -3,8 +3,8 @@ import os from collections import defaultdict from datetime import datetime, timedelta -from moto.core import get_account_id, BaseBackend, CloudFormationModel -from moto.core.utils import unix_time, BackendDict +from moto.core import get_account_id, BaseBackend, BaseModel, CloudFormationModel +from moto.core.utils import get_random_hex, unix_time, BackendDict from moto.utilities.tagging_service import TaggingService from moto.core.exceptions import JsonRESTError @@ -17,6 +17,37 @@ from .utils import ( ) +class Grant(BaseModel): + def __init__( + self, + key_id, + name, + grantee_principal, + operations, + constraints, + retiring_principal, + ): + self.key_id = key_id + self.name = name + self.grantee_principal = grantee_principal + self.retiring_principal = retiring_principal + self.operations = operations + self.constraints = constraints + self.id = get_random_hex() + self.token = get_random_hex() + + def to_json(self): + return { + "KeyId": self.key_id, + "GrantId": self.id, + "Name": self.name, + "GranteePrincipal": self.grantee_principal, + "RetiringPrincipal": self.retiring_principal, + "Operations": self.operations, + "Constraints": self.constraints, + } + + class Key(CloudFormationModel): def __init__( self, policy, key_usage, customer_master_key_spec, description, region @@ -37,6 +68,46 @@ class Key(CloudFormationModel): self.key_manager = "CUSTOMER" self.customer_master_key_spec = customer_master_key_spec or "SYMMETRIC_DEFAULT" + self.grants = dict() + + def add_grant( + self, name, grantee_principal, operations, constraints, retiring_principal + ) -> Grant: + grant = Grant( + self.id, + name, + grantee_principal, + operations, + constraints=constraints, + retiring_principal=retiring_principal, + ) + self.grants[grant.id] = grant + return grant + + def list_grants(self, grant_id) -> [Grant]: + grant_ids = [grant_id] if grant_id else self.grants.keys() + return [grant for _id, grant in self.grants.items() if _id in grant_ids] + + def list_retirable_grants(self, retiring_principal) -> [Grant]: + return [ + grant + for grant in self.grants.values() + if grant.retiring_principal == retiring_principal + ] + + def revoke_grant(self, grant_id) -> None: + self.grants.pop(grant_id, None) + + def retire_grant(self, grant_id) -> None: + self.grants.pop(grant_id, None) + + def retire_grant_by_token(self, grant_token) -> None: + self.grants = { + _id: grant + for _id, grant in self.grants.items() + if grant.token != grant_token + } + def generate_default_policy(self): return json.dumps( { @@ -214,7 +285,7 @@ class KmsBackend(BaseBackend): return self.keys.pop(key_id) - def describe_key(self, key_id): + def describe_key(self, key_id) -> Key: # allow the different methods (alias, ARN :key/, keyId, ARN alias) to # describe key not just KeyId key_id = self.get_key_id(key_id) @@ -410,5 +481,46 @@ class KmsBackend(BaseBackend): "The request was rejected because the specified entity or resource could not be found.", ) + def create_grant( + self, + key_id, + grantee_principal, + operations, + name, + constraints, + retiring_principal, + ): + key = self.describe_key(key_id) + grant = key.add_grant( + name, + grantee_principal, + operations, + constraints=constraints, + retiring_principal=retiring_principal, + ) + return grant.id, grant.token + + def list_grants(self, key_id, grant_id) -> [Grant]: + key = self.describe_key(key_id) + return key.list_grants(grant_id) + + def list_retirable_grants(self, retiring_principal): + grants = [] + for key in self.keys.values(): + grants.extend(key.list_retirable_grants(retiring_principal)) + return grants + + def revoke_grant(self, key_id, grant_id) -> None: + key = self.describe_key(key_id) + key.revoke_grant(grant_id) + + def retire_grant(self, key_id, grant_id, grant_token) -> None: + if grant_token: + for key in self.keys.values(): + key.retire_grant_by_token(grant_token) + else: + key = self.describe_key(key_id) + key.retire_grant(grant_id) + kms_backends = BackendDict(KmsBackend, "kms") diff --git a/moto/kms/responses.py b/moto/kms/responses.py index 93e2608e1..97bd26e6c 100644 --- a/moto/kms/responses.py +++ b/moto/kms/responses.py @@ -284,6 +284,64 @@ class KmsResponse(BaseResponse): return json.dumps({"Truncated": False, "Aliases": response_aliases}) + def create_grant(self): + key_id = self.parameters.get("KeyId") + grantee_principal = self.parameters.get("GranteePrincipal") + retiring_principal = self.parameters.get("RetiringPrincipal") + operations = self.parameters.get("Operations") + name = self.parameters.get("Name") + constraints = self.parameters.get("Constraints") + + grant_id, grant_token = self.kms_backend.create_grant( + key_id, + grantee_principal, + operations, + name, + constraints=constraints, + retiring_principal=retiring_principal, + ) + return json.dumps({"GrantId": grant_id, "GrantToken": grant_token}) + + def list_grants(self): + key_id = self.parameters.get("KeyId") + grant_id = self.parameters.get("GrantId") + + grants = self.kms_backend.list_grants(key_id=key_id, grant_id=grant_id) + return json.dumps( + { + "Grants": [gr.to_json() for gr in grants], + "GrantCount": len(grants), + "Truncated": False, + } + ) + + def list_retirable_grants(self): + retiring_principal = self.parameters.get("RetiringPrincipal") + + grants = self.kms_backend.list_retirable_grants(retiring_principal) + return json.dumps( + { + "Grants": [gr.to_json() for gr in grants], + "GrantCount": len(grants), + "Truncated": False, + } + ) + + def revoke_grant(self): + key_id = self.parameters.get("KeyId") + grant_id = self.parameters.get("GrantId") + + self.kms_backend.revoke_grant(key_id, grant_id) + return "{}" + + def retire_grant(self): + key_id = self.parameters.get("KeyId") + grant_id = self.parameters.get("GrantId") + grant_token = self.parameters.get("GrantToken") + + self.kms_backend.retire_grant(key_id, grant_id, grant_token) + return "{}" + def enable_key_rotation(self): """https://docs.aws.amazon.com/kms/latest/APIReference/API_EnableKeyRotation.html""" key_id = self.parameters.get("KeyId") diff --git a/tests/terraformtests/terraform-tests.success.txt b/tests/terraformtests/terraform-tests.success.txt index 27bcfb979..53ae0f830 100644 --- a/tests/terraformtests/terraform-tests.success.txt +++ b/tests/terraformtests/terraform-tests.success.txt @@ -112,6 +112,12 @@ iot: - TestAccIoTEndpointDataSource kms: - TestAccKMSAlias + - TestAccKMSGrant_arn + - TestAccKMSGrant_asymmetricKey + - TestAccKMSGrant_basic + - TestAccKMSGrant_bare + - TestAccKMSGrant_withConstraints + - TestAccKMSGrant_withRetiringPrincipal - TestAccKMSKey_Policy_basic - TestAccKMSKey_Policy_iamRole - TestAccKMSKey_Policy_iamRoleOrder diff --git a/tests/test_kms/test_kms_grants.py b/tests/test_kms/test_kms_grants.py new file mode 100644 index 000000000..d8258ae2a --- /dev/null +++ b/tests/test_kms/test_kms_grants.py @@ -0,0 +1,194 @@ +import boto3 +import sure # noqa # pylint: disable=unused-import + +from moto import mock_kms +from moto.core import ACCOUNT_ID + + +grantee_principal = ( + f"arn:aws:iam::{ACCOUNT_ID}:role/service-role/tf-acc-test-7071877926602081451" +) + + +@mock_kms +def test_create_grant(): + client = boto3.client("kms", region_name="us-east-1") + key_id = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"] + + resp = client.create_grant( + KeyId=key_id, + GranteePrincipal=grantee_principal, + Operations=["DECRYPT"], + Name="testgrant", + ) + resp.should.have.key("GrantId") + resp.should.have.key("GrantToken") + + +@mock_kms +def test_list_grants(): + client = boto3.client("kms", region_name="us-east-1") + key_id = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"] + + client.list_grants(KeyId=key_id).should.have.key("Grants").equals([]) + + grant_id1 = client.create_grant( + KeyId=key_id, + GranteePrincipal=grantee_principal, + Operations=["DECRYPT"], + Name="testgrant", + )["GrantId"] + + grant_id2 = client.create_grant( + KeyId=key_id, + GranteePrincipal=grantee_principal, + Operations=["DECRYPT", "ENCRYPT"], + Constraints={"EncryptionContextSubset": {"baz": "kaz", "foo": "bar"}}, + )["GrantId"] + + # List all + grants = client.list_grants(KeyId=key_id)["Grants"] + grants.should.have.length_of(2) + grant_1 = [grant for grant in grants if grant["GrantId"] == grant_id1][0] + grant_2 = [grant for grant in grants if grant["GrantId"] == grant_id2][0] + + grant_1.should.have.key("KeyId").equals(key_id) + grant_1.should.have.key("GrantId").equals(grant_id1) + grant_1.should.have.key("Name").equals("testgrant") + grant_1.should.have.key("GranteePrincipal").equals(grantee_principal) + grant_1.should.have.key("Operations").equals(["DECRYPT"]) + + grant_2.should.have.key("KeyId").equals(key_id) + grant_2.should.have.key("GrantId").equals(grant_id2) + grant_2.shouldnt.have.key("Name") + grant_2.should.have.key("GranteePrincipal").equals(grantee_principal) + grant_2.should.have.key("Operations").equals(["DECRYPT", "ENCRYPT"]) + grant_2.should.have.key("Constraints").equals( + {"EncryptionContextSubset": {"baz": "kaz", "foo": "bar"}} + ) + + # List by grant_id + grants = client.list_grants(KeyId=key_id, GrantId=grant_id2)["Grants"] + grants.should.have.length_of(1) + grants[0]["GrantId"].should.equal(grant_id2) + + # List by unknown grant_id + grants = client.list_grants(KeyId=key_id, GrantId="unknown")["Grants"] + grants.should.have.length_of(0) + + +@mock_kms +def test_list_retirable_grants(): + client = boto3.client("kms", region_name="us-east-1") + key_id1 = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"] + key_id2 = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"] + + client.create_grant( + KeyId=key_id1, + GranteePrincipal=grantee_principal, + Operations=["DECRYPT"], + ) + + client.create_grant( + KeyId=key_id1, + GranteePrincipal=grantee_principal, + RetiringPrincipal="sth else", + Operations=["DECRYPT"], + ) + + client.create_grant( + KeyId=key_id2, + GranteePrincipal=grantee_principal, + Operations=["DECRYPT"], + ) + + grant2_key2 = client.create_grant( + KeyId=key_id2, + GranteePrincipal=grantee_principal, + RetiringPrincipal="principal", + Operations=["DECRYPT"], + )["GrantId"] + + # List only the grants from the retiring principal + grants = client.list_retirable_grants(RetiringPrincipal="principal")["Grants"] + grants.should.have.length_of(1) + grants[0]["KeyId"].should.equal(key_id2) + grants[0]["GrantId"].should.equal(grant2_key2) + + +@mock_kms +def test_revoke_grant(): + + client = boto3.client("kms", region_name="us-east-1") + key_id = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"] + + client.list_grants(KeyId=key_id).should.have.key("Grants").equals([]) + + grant_id = client.create_grant( + KeyId=key_id, + GranteePrincipal=grantee_principal, + Operations=["DECRYPT"], + Name="testgrant", + )["GrantId"] + + client.revoke_grant(KeyId=key_id, GrantId=grant_id) + + client.list_grants(KeyId=key_id)["Grants"].should.have.length_of(0) + + +@mock_kms +def test_revoke_grant_by_token(): + + client = boto3.client("kms", region_name="us-east-1") + key_id = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"] + + client.list_grants(KeyId=key_id).should.have.key("Grants").equals([]) + + grant_id = client.create_grant( + KeyId=key_id, + GranteePrincipal=grantee_principal, + Operations=["DECRYPT"], + Name="testgrant", + )["GrantId"] + + client.revoke_grant(KeyId=key_id, GrantId=grant_id) + + client.list_grants(KeyId=key_id)["Grants"].should.have.length_of(0) + + +@mock_kms +def test_retire_grant_by_token(): + + client = boto3.client("kms", region_name="us-east-1") + key_id = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"] + + for idx in range(0, 3): + grant_token = client.create_grant( + KeyId=key_id, + GranteePrincipal=grantee_principal, + Operations=["DECRYPT"], + Name=f"testgrant{idx}", + )["GrantToken"] + + client.retire_grant(GrantToken=grant_token) + + client.list_grants(KeyId=key_id)["Grants"].should.have.length_of(2) + + +@mock_kms +def test_retire_grant_by_grant_id(): + + client = boto3.client("kms", region_name="us-east-1") + key_id = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"] + + for idx in range(0, 3): + grant_id = client.create_grant( + KeyId=key_id, + GranteePrincipal=grantee_principal, + Operations=["DECRYPT"], + Name=f"testgrant{idx}", + )["GrantId"] + + client.retire_grant(KeyId=key_id, GrantId=grant_id) + + client.list_grants(KeyId=key_id)["Grants"].should.have.length_of(2)