130 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			130 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
								 | 
							
								import boto3
							 | 
						||
| 
								 | 
							
								import json
							 | 
						||
| 
								 | 
							
								import pytest
							 | 
						||
| 
								 | 
							
								from botocore.exceptions import ClientError
							 | 
						||
| 
								 | 
							
								from unittest import mock
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from moto import mock_kms
							 | 
						||
| 
								 | 
							
								from moto.kms.exceptions import AccessDeniedException
							 | 
						||
| 
								 | 
							
								from moto.kms.models import Key
							 | 
						||
| 
								 | 
							
								from moto.kms.policy_validator import validate_policy
							 | 
						||
| 
								 | 
							
								from cryptography.hazmat.primitives.asymmetric import rsa
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								@mock_kms
							 | 
						||
| 
								 | 
							
								class TestKMSPolicyEnforcement:
							 | 
						||
| 
								 | 
							
								    def setup_method(self, *args) -> None:  # pylint: disable=unused-argument
							 | 
						||
| 
								 | 
							
								        self.client = boto3.client("kms", "us-east-1")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        # The key-value is irrelevant, so let's mock the expensive cryptographic key-generation
							 | 
						||
| 
								 | 
							
								        # Patching does not work in ServerMode, but at least decorator tests times are improved
							 | 
						||
| 
								 | 
							
								        with mock.patch.object(rsa, "generate_private_key", return_value=""):
							 | 
						||
| 
								 | 
							
								            new_key = self.client.create_key(Description="t", Origin="AWS_KMS")
							 | 
						||
| 
								 | 
							
								        self.key_id = new_key["KeyMetadata"]["KeyId"]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    @pytest.mark.parametrize(
							 | 
						||
| 
								 | 
							
								        "actions",
							 | 
						||
| 
								 | 
							
								        [["kms:*"], ["kms:unknown", "kms:*"], ["kms:DescribeKey"], ["kms:Describe*"]],
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								    def test_policy__deny_based_on_actions(self, actions):
							 | 
						||
| 
								 | 
							
								        policy = {
							 | 
						||
| 
								 | 
							
								            "Version": "2012-10-17",
							 | 
						||
| 
								 | 
							
								            "Id": "",
							 | 
						||
| 
								 | 
							
								            "Statement": [
							 | 
						||
| 
								 | 
							
								                {
							 | 
						||
| 
								 | 
							
								                    "Sid": "test",
							 | 
						||
| 
								 | 
							
								                    "Effect": "Deny",
							 | 
						||
| 
								 | 
							
								                    "Principal": "*",
							 | 
						||
| 
								 | 
							
								                    "Action": actions,
							 | 
						||
| 
								 | 
							
								                    "Resource": "*",
							 | 
						||
| 
								 | 
							
								                }
							 | 
						||
| 
								 | 
							
								            ],
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        self.client.put_key_policy(
							 | 
						||
| 
								 | 
							
								            Policy=json.dumps(policy), PolicyName="default", KeyId=self.key_id
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        with pytest.raises(ClientError) as exc:
							 | 
						||
| 
								 | 
							
								            self.client.describe_key(KeyId=self.key_id)
							 | 
						||
| 
								 | 
							
								        err = exc.value.response["Error"]
							 | 
						||
| 
								 | 
							
								        err["Code"].should.equal("AccessDeniedException")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    @pytest.mark.parametrize("actions", [["kms:unknown"], ["kms:describestuff"]])
							 | 
						||
| 
								 | 
							
								    def test_policy__allow_based_on_actions(self, actions):
							 | 
						||
| 
								 | 
							
								        policy = {
							 | 
						||
| 
								 | 
							
								            "Version": "2012-10-17",
							 | 
						||
| 
								 | 
							
								            "Id": "",
							 | 
						||
| 
								 | 
							
								            "Statement": [
							 | 
						||
| 
								 | 
							
								                {
							 | 
						||
| 
								 | 
							
								                    "Sid": "test",
							 | 
						||
| 
								 | 
							
								                    "Effect": "Deny",
							 | 
						||
| 
								 | 
							
								                    "Principal": "*",
							 | 
						||
| 
								 | 
							
								                    "Action": actions,
							 | 
						||
| 
								 | 
							
								                    "Resource": "*",
							 | 
						||
| 
								 | 
							
								                }
							 | 
						||
| 
								 | 
							
								            ],
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        self.client.put_key_policy(
							 | 
						||
| 
								 | 
							
								            Policy=json.dumps(policy), PolicyName="default", KeyId=self.key_id
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        key = self.client.describe_key(KeyId=self.key_id)["KeyMetadata"]
							 | 
						||
| 
								 | 
							
								        key["Description"].should.equal("t")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class TestKMSPolicyValidator:
							 | 
						||
| 
								 | 
							
								    def test_input_can_be_none(self):
							 | 
						||
| 
								 | 
							
								        validate_policy(None, None)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def test_key_can_have_no_policy(self):
							 | 
						||
| 
								 | 
							
								        validate_policy(self.create_key(policy=None), None)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def test_key_can_have_unreadable_policy(self):
							 | 
						||
| 
								 | 
							
								        validate_policy(self.create_key(policy="some policy"), None)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    @pytest.mark.parametrize(
							 | 
						||
| 
								 | 
							
								        "action", ["kms:*", "kms:DescribeKey", "unknown", "", None]
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								    def test_describe_key_is_allowed_for_actions(self, action):
							 | 
						||
| 
								 | 
							
								        policy = {
							 | 
						||
| 
								 | 
							
								            "Statement": [{"Effect": "Allow", "Action": [action], "Resource": "*"}]
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        key = self.create_key(policy=json.dumps(policy))
							 | 
						||
| 
								 | 
							
								        validate_policy(key, "kms:DescribeKey")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    @pytest.mark.parametrize(
							 | 
						||
| 
								 | 
							
								        "action", ["kms:DeleteKey", "awslambda:DescribeKey", "unknown", "", None]
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								    def test_describe_key_is_allowed_if_other_actions_are_allowed(self, action):
							 | 
						||
| 
								 | 
							
								        policy = {
							 | 
						||
| 
								 | 
							
								            "Statement": [{"Effect": "denY", "Action": [action], "Resource": "*"}]
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        key = self.create_key(policy=json.dumps(policy))
							 | 
						||
| 
								 | 
							
								        validate_policy(key, "kms:DescribeKey")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    @pytest.mark.parametrize("action", ["kms:*", "kms:Describe*", "kms:DescribeKey"])
							 | 
						||
| 
								 | 
							
								    def test_describe_key_is_denied_for_all_possible_action_variations(self, action):
							 | 
						||
| 
								 | 
							
								        policy = {
							 | 
						||
| 
								 | 
							
								            "Statement": [{"Effect": "denY", "Action": [action], "Resource": "*"}]
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        key = self.create_key(policy=json.dumps(policy))
							 | 
						||
| 
								 | 
							
								        with pytest.raises(AccessDeniedException):
							 | 
						||
| 
								 | 
							
								            validate_policy(key, "kms:DescribeKey")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    @pytest.mark.parametrize("resource", ["arn", "kms:*"])
							 | 
						||
| 
								 | 
							
								    def test_describe_key_is_allowed_for_unsupported_resources(self, resource):
							 | 
						||
| 
								 | 
							
								        policy = {
							 | 
						||
| 
								 | 
							
								            "Statement": [{"Effect": "denY", "Action": ["kms:*"], "Resource": resource}]
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        key = self.create_key(policy=json.dumps(policy))
							 | 
						||
| 
								 | 
							
								        validate_policy(key, "kms:DescribeKey")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def create_key(self, policy):
							 | 
						||
| 
								 | 
							
								        with mock.patch.object(rsa, "generate_private_key", return_value=""):
							 | 
						||
| 
								 | 
							
								            return Key(
							 | 
						||
| 
								 | 
							
								                account_id=None,
							 | 
						||
| 
								 | 
							
								                region=None,
							 | 
						||
| 
								 | 
							
								                description=None,
							 | 
						||
| 
								 | 
							
								                key_spec=None,
							 | 
						||
| 
								 | 
							
								                key_usage=None,
							 | 
						||
| 
								 | 
							
								                policy=policy,
							 | 
						||
| 
								 | 
							
								            )
							 |