diff --git a/moto/kms/models.py b/moto/kms/models.py index 9067c3f3f..8549da5ea 100644 --- a/moto/kms/models.py +++ b/moto/kms/models.py @@ -274,6 +274,14 @@ class KmsBackend(BaseBackend): def create_key( self, policy, key_usage, key_spec, description, tags, multi_region=False ): + """ + The provided Policy currently does not need to be valid. If it is valid, Moto will perform authorization checks on key-related operations, just like AWS does. + + These authorization checks are quite basic for now. Moto will only throw an AccessDeniedException if the following conditions are met: + - The principal is set to "*" + - The resource is set to "*" + - The Action matches `describe_key` + """ key = Key( policy, key_usage, diff --git a/moto/kms/policy_validator.py b/moto/kms/policy_validator.py new file mode 100644 index 000000000..f278dbc9b --- /dev/null +++ b/moto/kms/policy_validator.py @@ -0,0 +1,48 @@ +from collections import defaultdict +import json +from .models import Key +from .exceptions import AccessDeniedException + + +ALTERNATIVE_ACTIONS = defaultdict(list) +ALTERNATIVE_ACTIONS["kms:DescribeKey"] = ["kms:*", "kms:Describe*", "kms:DescribeKey"] + + +def validate_policy(key: Key, action: str): + """ + Relevant docs: + - https://docs.aws.amazon.com/kms/latest/developerguide/key-policy-default.html + - https://docs.aws.amazon.com/kms/latest/developerguide/policy-evaluation.html + + This method currently denies action based on whether: + - There is a applicable DENY-statement in the policy + """ + try: + policy = json.loads(key.policy) + except (ValueError, AttributeError): + return + for statement in policy.get("Statement", []): + statement_applies = check_statement(statement, key.arn, action) + if statement_applies and statement.get("Effect", "").lower() == "deny": + raise AccessDeniedException( + message=f"Action {action} is now allowed by the given key policy" + ) + + +def check_statement(statement, resource, action): + return action_matches(statement.get("Action", []), action) and resource_matches( + statement.get("Resource", ""), resource + ) + + +def action_matches(applicable_actions, action): + alternatives = ALTERNATIVE_ACTIONS[action] + if any(alt in applicable_actions for alt in alternatives): + return True + return False + + +def resource_matches(applicable_resources, resource): # pylint: disable=unused-argument + if applicable_resources == "*": + return True + return False diff --git a/moto/kms/responses.py b/moto/kms/responses.py index f2dc70eb7..e135b5eed 100644 --- a/moto/kms/responses.py +++ b/moto/kms/responses.py @@ -6,7 +6,8 @@ import warnings from moto.core.responses import BaseResponse from moto.kms.utils import RESERVED_ALIASES -from .models import kms_backends +from .models import kms_backends, KmsBackend +from .policy_validator import validate_policy from .exceptions import ( NotFoundException, ValidationException, @@ -30,7 +31,7 @@ class KmsResponse(BaseResponse): return params @property - def kms_backend(self): + def kms_backend(self) -> KmsBackend: return kms_backends[self.current_account][self.region] def _display_arn(self, key_id): @@ -104,6 +105,13 @@ class KmsResponse(BaseResponse): self._validate_cmk_id(key_id) + def _validate_key_policy(self, key_id, action): + """ + Validate whether the specified action is allowed, given the key policy + """ + key = self.kms_backend.describe_key(self.kms_backend.get_key_id(key_id)) + validate_policy(key, action) + def create_key(self): """https://docs.aws.amazon.com/kms/latest/APIReference/API_CreateKey.html""" policy = self.parameters.get("Policy") @@ -170,6 +178,7 @@ class KmsResponse(BaseResponse): key_id = self.parameters.get("KeyId") self._validate_key_id(key_id) + self._validate_key_policy(key_id, "kms:DescribeKey") key = self.kms_backend.describe_key(self.kms_backend.get_key_id(key_id)) diff --git a/tests/test_kms/test_kms_policy_enforcement.py b/tests/test_kms/test_kms_policy_enforcement.py new file mode 100644 index 000000000..da450f8a5 --- /dev/null +++ b/tests/test_kms/test_kms_policy_enforcement.py @@ -0,0 +1,129 @@ +import boto3 +import json +import pytest +from botocore.exceptions import ClientError +from unittest import mock + +from moto import mock_kms +from moto.kms.exceptions import AccessDeniedException +from moto.kms.models import Key +from moto.kms.policy_validator import validate_policy +from cryptography.hazmat.primitives.asymmetric import rsa + + +@mock_kms +class TestKMSPolicyEnforcement: + def setup_method(self, *args) -> None: # pylint: disable=unused-argument + self.client = boto3.client("kms", "us-east-1") + + # The key-value is irrelevant, so let's mock the expensive cryptographic key-generation + # Patching does not work in ServerMode, but at least decorator tests times are improved + with mock.patch.object(rsa, "generate_private_key", return_value=""): + new_key = self.client.create_key(Description="t", Origin="AWS_KMS") + self.key_id = new_key["KeyMetadata"]["KeyId"] + + @pytest.mark.parametrize( + "actions", + [["kms:*"], ["kms:unknown", "kms:*"], ["kms:DescribeKey"], ["kms:Describe*"]], + ) + def test_policy__deny_based_on_actions(self, actions): + policy = { + "Version": "2012-10-17", + "Id": "", + "Statement": [ + { + "Sid": "test", + "Effect": "Deny", + "Principal": "*", + "Action": actions, + "Resource": "*", + } + ], + } + self.client.put_key_policy( + Policy=json.dumps(policy), PolicyName="default", KeyId=self.key_id + ) + with pytest.raises(ClientError) as exc: + self.client.describe_key(KeyId=self.key_id) + err = exc.value.response["Error"] + err["Code"].should.equal("AccessDeniedException") + + @pytest.mark.parametrize("actions", [["kms:unknown"], ["kms:describestuff"]]) + def test_policy__allow_based_on_actions(self, actions): + policy = { + "Version": "2012-10-17", + "Id": "", + "Statement": [ + { + "Sid": "test", + "Effect": "Deny", + "Principal": "*", + "Action": actions, + "Resource": "*", + } + ], + } + self.client.put_key_policy( + Policy=json.dumps(policy), PolicyName="default", KeyId=self.key_id + ) + key = self.client.describe_key(KeyId=self.key_id)["KeyMetadata"] + key["Description"].should.equal("t") + + +class TestKMSPolicyValidator: + def test_input_can_be_none(self): + validate_policy(None, None) + + def test_key_can_have_no_policy(self): + validate_policy(self.create_key(policy=None), None) + + def test_key_can_have_unreadable_policy(self): + validate_policy(self.create_key(policy="some policy"), None) + + @pytest.mark.parametrize( + "action", ["kms:*", "kms:DescribeKey", "unknown", "", None] + ) + def test_describe_key_is_allowed_for_actions(self, action): + policy = { + "Statement": [{"Effect": "Allow", "Action": [action], "Resource": "*"}] + } + key = self.create_key(policy=json.dumps(policy)) + validate_policy(key, "kms:DescribeKey") + + @pytest.mark.parametrize( + "action", ["kms:DeleteKey", "awslambda:DescribeKey", "unknown", "", None] + ) + def test_describe_key_is_allowed_if_other_actions_are_allowed(self, action): + policy = { + "Statement": [{"Effect": "denY", "Action": [action], "Resource": "*"}] + } + key = self.create_key(policy=json.dumps(policy)) + validate_policy(key, "kms:DescribeKey") + + @pytest.mark.parametrize("action", ["kms:*", "kms:Describe*", "kms:DescribeKey"]) + def test_describe_key_is_denied_for_all_possible_action_variations(self, action): + policy = { + "Statement": [{"Effect": "denY", "Action": [action], "Resource": "*"}] + } + key = self.create_key(policy=json.dumps(policy)) + with pytest.raises(AccessDeniedException): + validate_policy(key, "kms:DescribeKey") + + @pytest.mark.parametrize("resource", ["arn", "kms:*"]) + def test_describe_key_is_allowed_for_unsupported_resources(self, resource): + policy = { + "Statement": [{"Effect": "denY", "Action": ["kms:*"], "Resource": resource}] + } + key = self.create_key(policy=json.dumps(policy)) + validate_policy(key, "kms:DescribeKey") + + def create_key(self, policy): + with mock.patch.object(rsa, "generate_private_key", return_value=""): + return Key( + account_id=None, + region=None, + description=None, + key_spec=None, + key_usage=None, + policy=policy, + )