moto/tests/test_kms/test_kms_policy_enforcement.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

130 lines
4.7 KiB
Python
Raw Normal View History

import boto3
import json
import pytest
from botocore.exceptions import ClientError
from unittest import mock
from moto import mock_kms
from moto.kms.exceptions import AccessDeniedException
from moto.kms.models import Key
from moto.kms.policy_validator import validate_policy
from cryptography.hazmat.primitives.asymmetric import rsa
@mock_kms
class TestKMSPolicyEnforcement:
def setup_method(self, *args) -> None: # pylint: disable=unused-argument
self.client = boto3.client("kms", "us-east-1")
# The key-value is irrelevant, so let's mock the expensive cryptographic key-generation
# Patching does not work in ServerMode, but at least decorator tests times are improved
with mock.patch.object(rsa, "generate_private_key", return_value=""):
new_key = self.client.create_key(Description="t", Origin="AWS_KMS")
self.key_id = new_key["KeyMetadata"]["KeyId"]
@pytest.mark.parametrize(
"actions",
[["kms:*"], ["kms:unknown", "kms:*"], ["kms:DescribeKey"], ["kms:Describe*"]],
)
def test_policy__deny_based_on_actions(self, actions):
policy = {
"Version": "2012-10-17",
"Id": "",
"Statement": [
{
"Sid": "test",
"Effect": "Deny",
"Principal": "*",
"Action": actions,
"Resource": "*",
}
],
}
self.client.put_key_policy(
Policy=json.dumps(policy), PolicyName="default", KeyId=self.key_id
)
with pytest.raises(ClientError) as exc:
self.client.describe_key(KeyId=self.key_id)
err = exc.value.response["Error"]
err["Code"].should.equal("AccessDeniedException")
@pytest.mark.parametrize("actions", [["kms:unknown"], ["kms:describestuff"]])
def test_policy__allow_based_on_actions(self, actions):
policy = {
"Version": "2012-10-17",
"Id": "",
"Statement": [
{
"Sid": "test",
"Effect": "Deny",
"Principal": "*",
"Action": actions,
"Resource": "*",
}
],
}
self.client.put_key_policy(
Policy=json.dumps(policy), PolicyName="default", KeyId=self.key_id
)
key = self.client.describe_key(KeyId=self.key_id)["KeyMetadata"]
key["Description"].should.equal("t")
class TestKMSPolicyValidator:
def test_input_can_be_none(self):
validate_policy(None, None)
def test_key_can_have_no_policy(self):
validate_policy(self.create_key(policy=None), None)
def test_key_can_have_unreadable_policy(self):
validate_policy(self.create_key(policy="some policy"), None)
@pytest.mark.parametrize(
"action", ["kms:*", "kms:DescribeKey", "unknown", "", None]
)
def test_describe_key_is_allowed_for_actions(self, action):
policy = {
"Statement": [{"Effect": "Allow", "Action": [action], "Resource": "*"}]
}
key = self.create_key(policy=json.dumps(policy))
validate_policy(key, "kms:DescribeKey")
@pytest.mark.parametrize(
"action", ["kms:DeleteKey", "awslambda:DescribeKey", "unknown", "", None]
)
def test_describe_key_is_allowed_if_other_actions_are_allowed(self, action):
policy = {
"Statement": [{"Effect": "denY", "Action": [action], "Resource": "*"}]
}
key = self.create_key(policy=json.dumps(policy))
validate_policy(key, "kms:DescribeKey")
@pytest.mark.parametrize("action", ["kms:*", "kms:Describe*", "kms:DescribeKey"])
def test_describe_key_is_denied_for_all_possible_action_variations(self, action):
policy = {
"Statement": [{"Effect": "denY", "Action": [action], "Resource": "*"}]
}
key = self.create_key(policy=json.dumps(policy))
with pytest.raises(AccessDeniedException):
validate_policy(key, "kms:DescribeKey")
@pytest.mark.parametrize("resource", ["arn", "kms:*"])
def test_describe_key_is_allowed_for_unsupported_resources(self, resource):
policy = {
"Statement": [{"Effect": "denY", "Action": ["kms:*"], "Resource": resource}]
}
key = self.create_key(policy=json.dumps(policy))
validate_policy(key, "kms:DescribeKey")
def create_key(self, policy):
with mock.patch.object(rsa, "generate_private_key", return_value=""):
return Key(
account_id=None,
region=None,
description=None,
key_spec=None,
key_usage=None,
policy=policy,
)