From 136f622b3b497b754f0e4a3bd7cf481bb03cda36 Mon Sep 17 00:00:00 2001 From: Andrew Garrett Date: Wed, 25 Nov 2015 19:50:55 +0000 Subject: [PATCH] Add KMS policy actions Adds the following to the KMS service * PutKeyPolicy * GetKeyPolicy * ListKeyPolicies Signed-off-by: Jesse Szwedko --- moto/kms/models.py | 6 +++++ moto/kms/responses.py | 47 ++++++++++++++++++++++++++++++++++++++ tests/test_kms/test_kms.py | 42 ++++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+) diff --git a/moto/kms/models.py b/moto/kms/models.py index ec67759d2..1047fe71e 100644 --- a/moto/kms/models.py +++ b/moto/kms/models.py @@ -78,6 +78,12 @@ class KmsBackend(BaseBackend): def get_key_rotation_status(self, key_id): return self.keys[key_id].key_rotation_status + def put_key_policy(self, key_id, policy): + self.keys[key_id].policy = policy + + def get_key_policy(self, key_id): + return self.keys[key_id].policy + kms_backends = {} for region in boto.kms.regions(): diff --git a/moto/kms/responses.py b/moto/kms/responses.py index 196a6b851..d0ecc8863 100644 --- a/moto/kms/responses.py +++ b/moto/kms/responses.py @@ -171,6 +171,53 @@ class KmsResponse(BaseResponse): '__type': 'NotFoundException'}) return json.dumps({'KeyRotationEnabled': rotation_enabled}) + def put_key_policy(self): + key_id = self.parameters.get('KeyId') + policy_name = self.parameters.get('PolicyName') + policy = self.parameters.get('Policy') + _assert_valid_key_id(key_id) + _assert_default_policy(policy_name) + + try: + self.kms_backend.put_key_policy(key_id, policy) + except KeyError: + raise JSONResponseError(404, 'Not Found', body={ + 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id), + '__type': 'NotFoundException'}) + + return json.dumps(None) + + def get_key_policy(self): + key_id = self.parameters.get('KeyId') + policy_name = self.parameters.get('PolicyName') + _assert_valid_key_id(key_id) + _assert_default_policy(policy_name) + + try: + return json.dumps({'Policy': self.kms_backend.get_key_policy(key_id)}) + except KeyError: + raise JSONResponseError(404, 'Not Found', body={ + 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id), + '__type': 'NotFoundException'}) + + def list_key_policies(self): + key_id = self.parameters.get('KeyId') + _assert_valid_key_id(key_id) + try: + self.kms_backend.describe_key(key_id) + except KeyError: + raise JSONResponseError(404, 'Not Found', body={ + 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id), + '__type': 'NotFoundException'}) + + return json.dumps({'Truncated': False, 'PolicyNames': ['default']}) + def _assert_valid_key_id(key_id): if not re.match(r'^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$', key_id, re.IGNORECASE): raise JSONResponseError(404, 'Not Found', body={'message': ' Invalid keyId', '__type': 'NotFoundException'}) + +def _assert_default_policy(policy_name): + if policy_name != 'default': + raise JSONResponseError(404, 'Not Found', body={ + 'message': "No such policy exists", + '__type': 'NotFoundException'}) diff --git a/tests/test_kms/test_kms.py b/tests/test_kms/test_kms.py index 5453ccb92..fe1394434 100644 --- a/tests/test_kms/test_kms.py +++ b/tests/test_kms/test_kms.py @@ -111,6 +111,40 @@ def test_create_key_defaults_key_rotation(): conn.get_key_rotation_status(key_id)['KeyRotationEnabled'].should.equal(False) +@mock_kms +def test_get_key_policy(): + conn = boto.kms.connect_to_region('us-west-2') + + key = conn.create_key(policy='my policy', description='my key1', key_usage='ENCRYPT_DECRYPT') + key_id = key['KeyMetadata']['KeyId'] + + policy = conn.get_key_policy(key_id, 'default') + policy['Policy'].should.equal('my policy') + + +@mock_kms +def test_put_key_policy(): + conn = boto.kms.connect_to_region('us-west-2') + + key = conn.create_key(policy='my policy', description='my key1', key_usage='ENCRYPT_DECRYPT') + key_id = key['KeyMetadata']['KeyId'] + + conn.put_key_policy(key_id, 'default', 'new policy') + policy = conn.get_key_policy(key_id, 'default') + policy['Policy'].should.equal('new policy') + + +@mock_kms +def test_list_key_policies(): + conn = boto.kms.connect_to_region('us-west-2') + + key = conn.create_key(policy='my policy', description='my key1', key_usage='ENCRYPT_DECRYPT') + key_id = key['KeyMetadata']['KeyId'] + + policies = conn.list_key_policies(key_id) + policies['PolicyNames'].should.equal(['default']) + + @mock_kms def test__create_alias__returns_none_if_correct(): kms = boto.connect_kms() @@ -386,3 +420,11 @@ def test__assert_valid_key_id(): _assert_valid_key_id.when.called_with("not-a-key").should.throw(JSONResponseError) _assert_valid_key_id.when.called_with(str(uuid.uuid4())).should_not.throw(JSONResponseError) + + +@mock_kms +def test__assert_default_policy(): + from moto.kms.responses import _assert_default_policy + + _assert_default_policy.when.called_with("not-default").should.throw(JSONResponseError) + _assert_default_policy.when.called_with("default").should_not.throw(JSONResponseError)