130 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			130 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|  | import boto3 | ||
|  | import json | ||
|  | import pytest | ||
|  | from botocore.exceptions import ClientError | ||
|  | from unittest import mock | ||
|  | 
 | ||
|  | from moto import mock_kms | ||
|  | from moto.kms.exceptions import AccessDeniedException | ||
|  | from moto.kms.models import Key | ||
|  | from moto.kms.policy_validator import validate_policy | ||
|  | from cryptography.hazmat.primitives.asymmetric import rsa | ||
|  | 
 | ||
|  | 
 | ||
|  | @mock_kms | ||
|  | class TestKMSPolicyEnforcement: | ||
|  |     def setup_method(self, *args) -> None:  # pylint: disable=unused-argument | ||
|  |         self.client = boto3.client("kms", "us-east-1") | ||
|  | 
 | ||
|  |         # The key-value is irrelevant, so let's mock the expensive cryptographic key-generation | ||
|  |         # Patching does not work in ServerMode, but at least decorator tests times are improved | ||
|  |         with mock.patch.object(rsa, "generate_private_key", return_value=""): | ||
|  |             new_key = self.client.create_key(Description="t", Origin="AWS_KMS") | ||
|  |         self.key_id = new_key["KeyMetadata"]["KeyId"] | ||
|  | 
 | ||
|  |     @pytest.mark.parametrize( | ||
|  |         "actions", | ||
|  |         [["kms:*"], ["kms:unknown", "kms:*"], ["kms:DescribeKey"], ["kms:Describe*"]], | ||
|  |     ) | ||
|  |     def test_policy__deny_based_on_actions(self, actions): | ||
|  |         policy = { | ||
|  |             "Version": "2012-10-17", | ||
|  |             "Id": "", | ||
|  |             "Statement": [ | ||
|  |                 { | ||
|  |                     "Sid": "test", | ||
|  |                     "Effect": "Deny", | ||
|  |                     "Principal": "*", | ||
|  |                     "Action": actions, | ||
|  |                     "Resource": "*", | ||
|  |                 } | ||
|  |             ], | ||
|  |         } | ||
|  |         self.client.put_key_policy( | ||
|  |             Policy=json.dumps(policy), PolicyName="default", KeyId=self.key_id | ||
|  |         ) | ||
|  |         with pytest.raises(ClientError) as exc: | ||
|  |             self.client.describe_key(KeyId=self.key_id) | ||
|  |         err = exc.value.response["Error"] | ||
|  |         err["Code"].should.equal("AccessDeniedException") | ||
|  | 
 | ||
|  |     @pytest.mark.parametrize("actions", [["kms:unknown"], ["kms:describestuff"]]) | ||
|  |     def test_policy__allow_based_on_actions(self, actions): | ||
|  |         policy = { | ||
|  |             "Version": "2012-10-17", | ||
|  |             "Id": "", | ||
|  |             "Statement": [ | ||
|  |                 { | ||
|  |                     "Sid": "test", | ||
|  |                     "Effect": "Deny", | ||
|  |                     "Principal": "*", | ||
|  |                     "Action": actions, | ||
|  |                     "Resource": "*", | ||
|  |                 } | ||
|  |             ], | ||
|  |         } | ||
|  |         self.client.put_key_policy( | ||
|  |             Policy=json.dumps(policy), PolicyName="default", KeyId=self.key_id | ||
|  |         ) | ||
|  |         key = self.client.describe_key(KeyId=self.key_id)["KeyMetadata"] | ||
|  |         key["Description"].should.equal("t") | ||
|  | 
 | ||
|  | 
 | ||
|  | class TestKMSPolicyValidator: | ||
|  |     def test_input_can_be_none(self): | ||
|  |         validate_policy(None, None) | ||
|  | 
 | ||
|  |     def test_key_can_have_no_policy(self): | ||
|  |         validate_policy(self.create_key(policy=None), None) | ||
|  | 
 | ||
|  |     def test_key_can_have_unreadable_policy(self): | ||
|  |         validate_policy(self.create_key(policy="some policy"), None) | ||
|  | 
 | ||
|  |     @pytest.mark.parametrize( | ||
|  |         "action", ["kms:*", "kms:DescribeKey", "unknown", "", None] | ||
|  |     ) | ||
|  |     def test_describe_key_is_allowed_for_actions(self, action): | ||
|  |         policy = { | ||
|  |             "Statement": [{"Effect": "Allow", "Action": [action], "Resource": "*"}] | ||
|  |         } | ||
|  |         key = self.create_key(policy=json.dumps(policy)) | ||
|  |         validate_policy(key, "kms:DescribeKey") | ||
|  | 
 | ||
|  |     @pytest.mark.parametrize( | ||
|  |         "action", ["kms:DeleteKey", "awslambda:DescribeKey", "unknown", "", None] | ||
|  |     ) | ||
|  |     def test_describe_key_is_allowed_if_other_actions_are_allowed(self, action): | ||
|  |         policy = { | ||
|  |             "Statement": [{"Effect": "denY", "Action": [action], "Resource": "*"}] | ||
|  |         } | ||
|  |         key = self.create_key(policy=json.dumps(policy)) | ||
|  |         validate_policy(key, "kms:DescribeKey") | ||
|  | 
 | ||
|  |     @pytest.mark.parametrize("action", ["kms:*", "kms:Describe*", "kms:DescribeKey"]) | ||
|  |     def test_describe_key_is_denied_for_all_possible_action_variations(self, action): | ||
|  |         policy = { | ||
|  |             "Statement": [{"Effect": "denY", "Action": [action], "Resource": "*"}] | ||
|  |         } | ||
|  |         key = self.create_key(policy=json.dumps(policy)) | ||
|  |         with pytest.raises(AccessDeniedException): | ||
|  |             validate_policy(key, "kms:DescribeKey") | ||
|  | 
 | ||
|  |     @pytest.mark.parametrize("resource", ["arn", "kms:*"]) | ||
|  |     def test_describe_key_is_allowed_for_unsupported_resources(self, resource): | ||
|  |         policy = { | ||
|  |             "Statement": [{"Effect": "denY", "Action": ["kms:*"], "Resource": resource}] | ||
|  |         } | ||
|  |         key = self.create_key(policy=json.dumps(policy)) | ||
|  |         validate_policy(key, "kms:DescribeKey") | ||
|  | 
 | ||
|  |     def create_key(self, policy): | ||
|  |         with mock.patch.object(rsa, "generate_private_key", return_value=""): | ||
|  |             return Key( | ||
|  |                 account_id=None, | ||
|  |                 region=None, | ||
|  |                 description=None, | ||
|  |                 key_spec=None, | ||
|  |                 key_usage=None, | ||
|  |                 policy=policy, | ||
|  |             ) |