KMS: Basic key policy enforcement (#5777)
This commit is contained in:
parent
ee6e8dd359
commit
137f06b55e
@ -274,6 +274,14 @@ class KmsBackend(BaseBackend):
|
|||||||
def create_key(
|
def create_key(
|
||||||
self, policy, key_usage, key_spec, description, tags, multi_region=False
|
self, policy, key_usage, key_spec, description, tags, multi_region=False
|
||||||
):
|
):
|
||||||
|
"""
|
||||||
|
The provided Policy currently does not need to be valid. If it is valid, Moto will perform authorization checks on key-related operations, just like AWS does.
|
||||||
|
|
||||||
|
These authorization checks are quite basic for now. Moto will only throw an AccessDeniedException if the following conditions are met:
|
||||||
|
- The principal is set to "*"
|
||||||
|
- The resource is set to "*"
|
||||||
|
- The Action matches `describe_key`
|
||||||
|
"""
|
||||||
key = Key(
|
key = Key(
|
||||||
policy,
|
policy,
|
||||||
key_usage,
|
key_usage,
|
||||||
|
48
moto/kms/policy_validator.py
Normal file
48
moto/kms/policy_validator.py
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
from collections import defaultdict
|
||||||
|
import json
|
||||||
|
from .models import Key
|
||||||
|
from .exceptions import AccessDeniedException
|
||||||
|
|
||||||
|
|
||||||
|
ALTERNATIVE_ACTIONS = defaultdict(list)
|
||||||
|
ALTERNATIVE_ACTIONS["kms:DescribeKey"] = ["kms:*", "kms:Describe*", "kms:DescribeKey"]
|
||||||
|
|
||||||
|
|
||||||
|
def validate_policy(key: Key, action: str):
|
||||||
|
"""
|
||||||
|
Relevant docs:
|
||||||
|
- https://docs.aws.amazon.com/kms/latest/developerguide/key-policy-default.html
|
||||||
|
- https://docs.aws.amazon.com/kms/latest/developerguide/policy-evaluation.html
|
||||||
|
|
||||||
|
This method currently denies action based on whether:
|
||||||
|
- There is a applicable DENY-statement in the policy
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
policy = json.loads(key.policy)
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
return
|
||||||
|
for statement in policy.get("Statement", []):
|
||||||
|
statement_applies = check_statement(statement, key.arn, action)
|
||||||
|
if statement_applies and statement.get("Effect", "").lower() == "deny":
|
||||||
|
raise AccessDeniedException(
|
||||||
|
message=f"Action {action} is now allowed by the given key policy"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def check_statement(statement, resource, action):
|
||||||
|
return action_matches(statement.get("Action", []), action) and resource_matches(
|
||||||
|
statement.get("Resource", ""), resource
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def action_matches(applicable_actions, action):
|
||||||
|
alternatives = ALTERNATIVE_ACTIONS[action]
|
||||||
|
if any(alt in applicable_actions for alt in alternatives):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def resource_matches(applicable_resources, resource): # pylint: disable=unused-argument
|
||||||
|
if applicable_resources == "*":
|
||||||
|
return True
|
||||||
|
return False
|
@ -6,7 +6,8 @@ import warnings
|
|||||||
|
|
||||||
from moto.core.responses import BaseResponse
|
from moto.core.responses import BaseResponse
|
||||||
from moto.kms.utils import RESERVED_ALIASES
|
from moto.kms.utils import RESERVED_ALIASES
|
||||||
from .models import kms_backends
|
from .models import kms_backends, KmsBackend
|
||||||
|
from .policy_validator import validate_policy
|
||||||
from .exceptions import (
|
from .exceptions import (
|
||||||
NotFoundException,
|
NotFoundException,
|
||||||
ValidationException,
|
ValidationException,
|
||||||
@ -30,7 +31,7 @@ class KmsResponse(BaseResponse):
|
|||||||
return params
|
return params
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def kms_backend(self):
|
def kms_backend(self) -> KmsBackend:
|
||||||
return kms_backends[self.current_account][self.region]
|
return kms_backends[self.current_account][self.region]
|
||||||
|
|
||||||
def _display_arn(self, key_id):
|
def _display_arn(self, key_id):
|
||||||
@ -104,6 +105,13 @@ class KmsResponse(BaseResponse):
|
|||||||
|
|
||||||
self._validate_cmk_id(key_id)
|
self._validate_cmk_id(key_id)
|
||||||
|
|
||||||
|
def _validate_key_policy(self, key_id, action):
|
||||||
|
"""
|
||||||
|
Validate whether the specified action is allowed, given the key policy
|
||||||
|
"""
|
||||||
|
key = self.kms_backend.describe_key(self.kms_backend.get_key_id(key_id))
|
||||||
|
validate_policy(key, action)
|
||||||
|
|
||||||
def create_key(self):
|
def create_key(self):
|
||||||
"""https://docs.aws.amazon.com/kms/latest/APIReference/API_CreateKey.html"""
|
"""https://docs.aws.amazon.com/kms/latest/APIReference/API_CreateKey.html"""
|
||||||
policy = self.parameters.get("Policy")
|
policy = self.parameters.get("Policy")
|
||||||
@ -170,6 +178,7 @@ class KmsResponse(BaseResponse):
|
|||||||
key_id = self.parameters.get("KeyId")
|
key_id = self.parameters.get("KeyId")
|
||||||
|
|
||||||
self._validate_key_id(key_id)
|
self._validate_key_id(key_id)
|
||||||
|
self._validate_key_policy(key_id, "kms:DescribeKey")
|
||||||
|
|
||||||
key = self.kms_backend.describe_key(self.kms_backend.get_key_id(key_id))
|
key = self.kms_backend.describe_key(self.kms_backend.get_key_id(key_id))
|
||||||
|
|
||||||
|
129
tests/test_kms/test_kms_policy_enforcement.py
Normal file
129
tests/test_kms/test_kms_policy_enforcement.py
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
import boto3
|
||||||
|
import json
|
||||||
|
import pytest
|
||||||
|
from botocore.exceptions import ClientError
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
from moto import mock_kms
|
||||||
|
from moto.kms.exceptions import AccessDeniedException
|
||||||
|
from moto.kms.models import Key
|
||||||
|
from moto.kms.policy_validator import validate_policy
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kms
|
||||||
|
class TestKMSPolicyEnforcement:
|
||||||
|
def setup_method(self, *args) -> None: # pylint: disable=unused-argument
|
||||||
|
self.client = boto3.client("kms", "us-east-1")
|
||||||
|
|
||||||
|
# The key-value is irrelevant, so let's mock the expensive cryptographic key-generation
|
||||||
|
# Patching does not work in ServerMode, but at least decorator tests times are improved
|
||||||
|
with mock.patch.object(rsa, "generate_private_key", return_value=""):
|
||||||
|
new_key = self.client.create_key(Description="t", Origin="AWS_KMS")
|
||||||
|
self.key_id = new_key["KeyMetadata"]["KeyId"]
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"actions",
|
||||||
|
[["kms:*"], ["kms:unknown", "kms:*"], ["kms:DescribeKey"], ["kms:Describe*"]],
|
||||||
|
)
|
||||||
|
def test_policy__deny_based_on_actions(self, actions):
|
||||||
|
policy = {
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Id": "",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Sid": "test",
|
||||||
|
"Effect": "Deny",
|
||||||
|
"Principal": "*",
|
||||||
|
"Action": actions,
|
||||||
|
"Resource": "*",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
self.client.put_key_policy(
|
||||||
|
Policy=json.dumps(policy), PolicyName="default", KeyId=self.key_id
|
||||||
|
)
|
||||||
|
with pytest.raises(ClientError) as exc:
|
||||||
|
self.client.describe_key(KeyId=self.key_id)
|
||||||
|
err = exc.value.response["Error"]
|
||||||
|
err["Code"].should.equal("AccessDeniedException")
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("actions", [["kms:unknown"], ["kms:describestuff"]])
|
||||||
|
def test_policy__allow_based_on_actions(self, actions):
|
||||||
|
policy = {
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Id": "",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Sid": "test",
|
||||||
|
"Effect": "Deny",
|
||||||
|
"Principal": "*",
|
||||||
|
"Action": actions,
|
||||||
|
"Resource": "*",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
self.client.put_key_policy(
|
||||||
|
Policy=json.dumps(policy), PolicyName="default", KeyId=self.key_id
|
||||||
|
)
|
||||||
|
key = self.client.describe_key(KeyId=self.key_id)["KeyMetadata"]
|
||||||
|
key["Description"].should.equal("t")
|
||||||
|
|
||||||
|
|
||||||
|
class TestKMSPolicyValidator:
|
||||||
|
def test_input_can_be_none(self):
|
||||||
|
validate_policy(None, None)
|
||||||
|
|
||||||
|
def test_key_can_have_no_policy(self):
|
||||||
|
validate_policy(self.create_key(policy=None), None)
|
||||||
|
|
||||||
|
def test_key_can_have_unreadable_policy(self):
|
||||||
|
validate_policy(self.create_key(policy="some policy"), None)
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"action", ["kms:*", "kms:DescribeKey", "unknown", "", None]
|
||||||
|
)
|
||||||
|
def test_describe_key_is_allowed_for_actions(self, action):
|
||||||
|
policy = {
|
||||||
|
"Statement": [{"Effect": "Allow", "Action": [action], "Resource": "*"}]
|
||||||
|
}
|
||||||
|
key = self.create_key(policy=json.dumps(policy))
|
||||||
|
validate_policy(key, "kms:DescribeKey")
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"action", ["kms:DeleteKey", "awslambda:DescribeKey", "unknown", "", None]
|
||||||
|
)
|
||||||
|
def test_describe_key_is_allowed_if_other_actions_are_allowed(self, action):
|
||||||
|
policy = {
|
||||||
|
"Statement": [{"Effect": "denY", "Action": [action], "Resource": "*"}]
|
||||||
|
}
|
||||||
|
key = self.create_key(policy=json.dumps(policy))
|
||||||
|
validate_policy(key, "kms:DescribeKey")
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("action", ["kms:*", "kms:Describe*", "kms:DescribeKey"])
|
||||||
|
def test_describe_key_is_denied_for_all_possible_action_variations(self, action):
|
||||||
|
policy = {
|
||||||
|
"Statement": [{"Effect": "denY", "Action": [action], "Resource": "*"}]
|
||||||
|
}
|
||||||
|
key = self.create_key(policy=json.dumps(policy))
|
||||||
|
with pytest.raises(AccessDeniedException):
|
||||||
|
validate_policy(key, "kms:DescribeKey")
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("resource", ["arn", "kms:*"])
|
||||||
|
def test_describe_key_is_allowed_for_unsupported_resources(self, resource):
|
||||||
|
policy = {
|
||||||
|
"Statement": [{"Effect": "denY", "Action": ["kms:*"], "Resource": resource}]
|
||||||
|
}
|
||||||
|
key = self.create_key(policy=json.dumps(policy))
|
||||||
|
validate_policy(key, "kms:DescribeKey")
|
||||||
|
|
||||||
|
def create_key(self, policy):
|
||||||
|
with mock.patch.object(rsa, "generate_private_key", return_value=""):
|
||||||
|
return Key(
|
||||||
|
account_id=None,
|
||||||
|
region=None,
|
||||||
|
description=None,
|
||||||
|
key_spec=None,
|
||||||
|
key_usage=None,
|
||||||
|
policy=policy,
|
||||||
|
)
|
Loading…
Reference in New Issue
Block a user