KMS - add Grants API (#5177)

This commit is contained in:
Bert Blommers 2022-05-27 22:11:09 +00:00 committed by GitHub
parent adeaea7c70
commit 898f0928a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 384 additions and 14 deletions

View File

@ -3412,13 +3412,13 @@
## kms ## kms
<details> <details>
<summary>42% implemented</summary> <summary>52% implemented</summary>
- [X] cancel_key_deletion - [X] cancel_key_deletion
- [ ] connect_custom_key_store - [ ] connect_custom_key_store
- [ ] create_alias - [ ] create_alias
- [ ] create_custom_key_store - [ ] create_custom_key_store
- [ ] create_grant - [X] create_grant
- [X] create_key - [X] create_key
- [X] decrypt - [X] decrypt
- [X] delete_alias - [X] delete_alias
@ -3444,16 +3444,16 @@
- [ ] get_public_key - [ ] get_public_key
- [ ] import_key_material - [ ] import_key_material
- [ ] list_aliases - [ ] list_aliases
- [ ] list_grants - [X] list_grants
- [ ] list_key_policies - [ ] list_key_policies
- [X] list_keys - [X] list_keys
- [X] list_resource_tags - [X] list_resource_tags
- [ ] list_retirable_grants - [X] list_retirable_grants
- [X] put_key_policy - [X] put_key_policy
- [X] re_encrypt - [X] re_encrypt
- [ ] replicate_key - [ ] replicate_key
- [ ] retire_grant - [X] retire_grant
- [ ] revoke_grant - [X] revoke_grant
- [X] schedule_key_deletion - [X] schedule_key_deletion
- [ ] sign - [ ] sign
- [X] tag_resource - [X] tag_resource

View File

@ -29,7 +29,7 @@ kms
- [ ] connect_custom_key_store - [ ] connect_custom_key_store
- [ ] create_alias - [ ] create_alias
- [ ] create_custom_key_store - [ ] create_custom_key_store
- [ ] create_grant - [X] create_grant
- [X] create_key - [X] create_key
- [X] decrypt - [X] decrypt
- [X] delete_alias - [X] delete_alias
@ -57,16 +57,16 @@ kms
- [ ] get_public_key - [ ] get_public_key
- [ ] import_key_material - [ ] import_key_material
- [ ] list_aliases - [ ] list_aliases
- [ ] list_grants - [X] list_grants
- [ ] list_key_policies - [ ] list_key_policies
- [X] list_keys - [X] list_keys
- [X] list_resource_tags - [X] list_resource_tags
- [ ] list_retirable_grants - [X] list_retirable_grants
- [X] put_key_policy - [X] put_key_policy
- [X] re_encrypt - [X] re_encrypt
- [ ] replicate_key - [ ] replicate_key
- [ ] retire_grant - [X] retire_grant
- [ ] revoke_grant - [X] revoke_grant
- [X] schedule_key_deletion - [X] schedule_key_deletion
- [ ] sign - [ ] sign
- [X] tag_resource - [X] tag_resource

View File

@ -3,8 +3,8 @@ import os
from collections import defaultdict from collections import defaultdict
from datetime import datetime, timedelta from datetime import datetime, timedelta
from moto.core import get_account_id, BaseBackend, CloudFormationModel from moto.core import get_account_id, BaseBackend, BaseModel, CloudFormationModel
from moto.core.utils import unix_time, BackendDict from moto.core.utils import get_random_hex, unix_time, BackendDict
from moto.utilities.tagging_service import TaggingService from moto.utilities.tagging_service import TaggingService
from moto.core.exceptions import JsonRESTError from moto.core.exceptions import JsonRESTError
@ -17,6 +17,37 @@ from .utils import (
) )
class Grant(BaseModel):
def __init__(
self,
key_id,
name,
grantee_principal,
operations,
constraints,
retiring_principal,
):
self.key_id = key_id
self.name = name
self.grantee_principal = grantee_principal
self.retiring_principal = retiring_principal
self.operations = operations
self.constraints = constraints
self.id = get_random_hex()
self.token = get_random_hex()
def to_json(self):
return {
"KeyId": self.key_id,
"GrantId": self.id,
"Name": self.name,
"GranteePrincipal": self.grantee_principal,
"RetiringPrincipal": self.retiring_principal,
"Operations": self.operations,
"Constraints": self.constraints,
}
class Key(CloudFormationModel): class Key(CloudFormationModel):
def __init__( def __init__(
self, policy, key_usage, customer_master_key_spec, description, region self, policy, key_usage, customer_master_key_spec, description, region
@ -37,6 +68,46 @@ class Key(CloudFormationModel):
self.key_manager = "CUSTOMER" self.key_manager = "CUSTOMER"
self.customer_master_key_spec = customer_master_key_spec or "SYMMETRIC_DEFAULT" self.customer_master_key_spec = customer_master_key_spec or "SYMMETRIC_DEFAULT"
self.grants = dict()
def add_grant(
self, name, grantee_principal, operations, constraints, retiring_principal
) -> Grant:
grant = Grant(
self.id,
name,
grantee_principal,
operations,
constraints=constraints,
retiring_principal=retiring_principal,
)
self.grants[grant.id] = grant
return grant
def list_grants(self, grant_id) -> [Grant]:
grant_ids = [grant_id] if grant_id else self.grants.keys()
return [grant for _id, grant in self.grants.items() if _id in grant_ids]
def list_retirable_grants(self, retiring_principal) -> [Grant]:
return [
grant
for grant in self.grants.values()
if grant.retiring_principal == retiring_principal
]
def revoke_grant(self, grant_id) -> None:
self.grants.pop(grant_id, None)
def retire_grant(self, grant_id) -> None:
self.grants.pop(grant_id, None)
def retire_grant_by_token(self, grant_token) -> None:
self.grants = {
_id: grant
for _id, grant in self.grants.items()
if grant.token != grant_token
}
def generate_default_policy(self): def generate_default_policy(self):
return json.dumps( return json.dumps(
{ {
@ -214,7 +285,7 @@ class KmsBackend(BaseBackend):
return self.keys.pop(key_id) return self.keys.pop(key_id)
def describe_key(self, key_id): def describe_key(self, key_id) -> Key:
# allow the different methods (alias, ARN :key/, keyId, ARN alias) to # allow the different methods (alias, ARN :key/, keyId, ARN alias) to
# describe key not just KeyId # describe key not just KeyId
key_id = self.get_key_id(key_id) key_id = self.get_key_id(key_id)
@ -410,5 +481,46 @@ class KmsBackend(BaseBackend):
"The request was rejected because the specified entity or resource could not be found.", "The request was rejected because the specified entity or resource could not be found.",
) )
def create_grant(
self,
key_id,
grantee_principal,
operations,
name,
constraints,
retiring_principal,
):
key = self.describe_key(key_id)
grant = key.add_grant(
name,
grantee_principal,
operations,
constraints=constraints,
retiring_principal=retiring_principal,
)
return grant.id, grant.token
def list_grants(self, key_id, grant_id) -> [Grant]:
key = self.describe_key(key_id)
return key.list_grants(grant_id)
def list_retirable_grants(self, retiring_principal):
grants = []
for key in self.keys.values():
grants.extend(key.list_retirable_grants(retiring_principal))
return grants
def revoke_grant(self, key_id, grant_id) -> None:
key = self.describe_key(key_id)
key.revoke_grant(grant_id)
def retire_grant(self, key_id, grant_id, grant_token) -> None:
if grant_token:
for key in self.keys.values():
key.retire_grant_by_token(grant_token)
else:
key = self.describe_key(key_id)
key.retire_grant(grant_id)
kms_backends = BackendDict(KmsBackend, "kms") kms_backends = BackendDict(KmsBackend, "kms")

View File

@ -284,6 +284,64 @@ class KmsResponse(BaseResponse):
return json.dumps({"Truncated": False, "Aliases": response_aliases}) return json.dumps({"Truncated": False, "Aliases": response_aliases})
def create_grant(self):
key_id = self.parameters.get("KeyId")
grantee_principal = self.parameters.get("GranteePrincipal")
retiring_principal = self.parameters.get("RetiringPrincipal")
operations = self.parameters.get("Operations")
name = self.parameters.get("Name")
constraints = self.parameters.get("Constraints")
grant_id, grant_token = self.kms_backend.create_grant(
key_id,
grantee_principal,
operations,
name,
constraints=constraints,
retiring_principal=retiring_principal,
)
return json.dumps({"GrantId": grant_id, "GrantToken": grant_token})
def list_grants(self):
key_id = self.parameters.get("KeyId")
grant_id = self.parameters.get("GrantId")
grants = self.kms_backend.list_grants(key_id=key_id, grant_id=grant_id)
return json.dumps(
{
"Grants": [gr.to_json() for gr in grants],
"GrantCount": len(grants),
"Truncated": False,
}
)
def list_retirable_grants(self):
retiring_principal = self.parameters.get("RetiringPrincipal")
grants = self.kms_backend.list_retirable_grants(retiring_principal)
return json.dumps(
{
"Grants": [gr.to_json() for gr in grants],
"GrantCount": len(grants),
"Truncated": False,
}
)
def revoke_grant(self):
key_id = self.parameters.get("KeyId")
grant_id = self.parameters.get("GrantId")
self.kms_backend.revoke_grant(key_id, grant_id)
return "{}"
def retire_grant(self):
key_id = self.parameters.get("KeyId")
grant_id = self.parameters.get("GrantId")
grant_token = self.parameters.get("GrantToken")
self.kms_backend.retire_grant(key_id, grant_id, grant_token)
return "{}"
def enable_key_rotation(self): def enable_key_rotation(self):
"""https://docs.aws.amazon.com/kms/latest/APIReference/API_EnableKeyRotation.html""" """https://docs.aws.amazon.com/kms/latest/APIReference/API_EnableKeyRotation.html"""
key_id = self.parameters.get("KeyId") key_id = self.parameters.get("KeyId")

View File

@ -112,6 +112,12 @@ iot:
- TestAccIoTEndpointDataSource - TestAccIoTEndpointDataSource
kms: kms:
- TestAccKMSAlias - TestAccKMSAlias
- TestAccKMSGrant_arn
- TestAccKMSGrant_asymmetricKey
- TestAccKMSGrant_basic
- TestAccKMSGrant_bare
- TestAccKMSGrant_withConstraints
- TestAccKMSGrant_withRetiringPrincipal
- TestAccKMSKey_Policy_basic - TestAccKMSKey_Policy_basic
- TestAccKMSKey_Policy_iamRole - TestAccKMSKey_Policy_iamRole
- TestAccKMSKey_Policy_iamRoleOrder - TestAccKMSKey_Policy_iamRoleOrder

View File

@ -0,0 +1,194 @@
import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_kms
from moto.core import ACCOUNT_ID
grantee_principal = (
f"arn:aws:iam::{ACCOUNT_ID}:role/service-role/tf-acc-test-7071877926602081451"
)
@mock_kms
def test_create_grant():
client = boto3.client("kms", region_name="us-east-1")
key_id = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"]
resp = client.create_grant(
KeyId=key_id,
GranteePrincipal=grantee_principal,
Operations=["DECRYPT"],
Name="testgrant",
)
resp.should.have.key("GrantId")
resp.should.have.key("GrantToken")
@mock_kms
def test_list_grants():
client = boto3.client("kms", region_name="us-east-1")
key_id = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"]
client.list_grants(KeyId=key_id).should.have.key("Grants").equals([])
grant_id1 = client.create_grant(
KeyId=key_id,
GranteePrincipal=grantee_principal,
Operations=["DECRYPT"],
Name="testgrant",
)["GrantId"]
grant_id2 = client.create_grant(
KeyId=key_id,
GranteePrincipal=grantee_principal,
Operations=["DECRYPT", "ENCRYPT"],
Constraints={"EncryptionContextSubset": {"baz": "kaz", "foo": "bar"}},
)["GrantId"]
# List all
grants = client.list_grants(KeyId=key_id)["Grants"]
grants.should.have.length_of(2)
grant_1 = [grant for grant in grants if grant["GrantId"] == grant_id1][0]
grant_2 = [grant for grant in grants if grant["GrantId"] == grant_id2][0]
grant_1.should.have.key("KeyId").equals(key_id)
grant_1.should.have.key("GrantId").equals(grant_id1)
grant_1.should.have.key("Name").equals("testgrant")
grant_1.should.have.key("GranteePrincipal").equals(grantee_principal)
grant_1.should.have.key("Operations").equals(["DECRYPT"])
grant_2.should.have.key("KeyId").equals(key_id)
grant_2.should.have.key("GrantId").equals(grant_id2)
grant_2.shouldnt.have.key("Name")
grant_2.should.have.key("GranteePrincipal").equals(grantee_principal)
grant_2.should.have.key("Operations").equals(["DECRYPT", "ENCRYPT"])
grant_2.should.have.key("Constraints").equals(
{"EncryptionContextSubset": {"baz": "kaz", "foo": "bar"}}
)
# List by grant_id
grants = client.list_grants(KeyId=key_id, GrantId=grant_id2)["Grants"]
grants.should.have.length_of(1)
grants[0]["GrantId"].should.equal(grant_id2)
# List by unknown grant_id
grants = client.list_grants(KeyId=key_id, GrantId="unknown")["Grants"]
grants.should.have.length_of(0)
@mock_kms
def test_list_retirable_grants():
client = boto3.client("kms", region_name="us-east-1")
key_id1 = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"]
key_id2 = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"]
client.create_grant(
KeyId=key_id1,
GranteePrincipal=grantee_principal,
Operations=["DECRYPT"],
)
client.create_grant(
KeyId=key_id1,
GranteePrincipal=grantee_principal,
RetiringPrincipal="sth else",
Operations=["DECRYPT"],
)
client.create_grant(
KeyId=key_id2,
GranteePrincipal=grantee_principal,
Operations=["DECRYPT"],
)
grant2_key2 = client.create_grant(
KeyId=key_id2,
GranteePrincipal=grantee_principal,
RetiringPrincipal="principal",
Operations=["DECRYPT"],
)["GrantId"]
# List only the grants from the retiring principal
grants = client.list_retirable_grants(RetiringPrincipal="principal")["Grants"]
grants.should.have.length_of(1)
grants[0]["KeyId"].should.equal(key_id2)
grants[0]["GrantId"].should.equal(grant2_key2)
@mock_kms
def test_revoke_grant():
client = boto3.client("kms", region_name="us-east-1")
key_id = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"]
client.list_grants(KeyId=key_id).should.have.key("Grants").equals([])
grant_id = client.create_grant(
KeyId=key_id,
GranteePrincipal=grantee_principal,
Operations=["DECRYPT"],
Name="testgrant",
)["GrantId"]
client.revoke_grant(KeyId=key_id, GrantId=grant_id)
client.list_grants(KeyId=key_id)["Grants"].should.have.length_of(0)
@mock_kms
def test_revoke_grant_by_token():
client = boto3.client("kms", region_name="us-east-1")
key_id = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"]
client.list_grants(KeyId=key_id).should.have.key("Grants").equals([])
grant_id = client.create_grant(
KeyId=key_id,
GranteePrincipal=grantee_principal,
Operations=["DECRYPT"],
Name="testgrant",
)["GrantId"]
client.revoke_grant(KeyId=key_id, GrantId=grant_id)
client.list_grants(KeyId=key_id)["Grants"].should.have.length_of(0)
@mock_kms
def test_retire_grant_by_token():
client = boto3.client("kms", region_name="us-east-1")
key_id = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"]
for idx in range(0, 3):
grant_token = client.create_grant(
KeyId=key_id,
GranteePrincipal=grantee_principal,
Operations=["DECRYPT"],
Name=f"testgrant{idx}",
)["GrantToken"]
client.retire_grant(GrantToken=grant_token)
client.list_grants(KeyId=key_id)["Grants"].should.have.length_of(2)
@mock_kms
def test_retire_grant_by_grant_id():
client = boto3.client("kms", region_name="us-east-1")
key_id = client.create_key(Policy="my policy")["KeyMetadata"]["KeyId"]
for idx in range(0, 3):
grant_id = client.create_grant(
KeyId=key_id,
GranteePrincipal=grantee_principal,
Operations=["DECRYPT"],
Name=f"testgrant{idx}",
)["GrantId"]
client.retire_grant(KeyId=key_id, GrantId=grant_id)
client.list_grants(KeyId=key_id)["Grants"].should.have.length_of(2)