Merge pull request #473 from jszwedko/add-kms-policy-actions
Add KMS policy actions
This commit is contained in:
commit
1bca6f2849
@ -78,6 +78,12 @@ class KmsBackend(BaseBackend):
|
||||
def get_key_rotation_status(self, key_id):
|
||||
return self.keys[key_id].key_rotation_status
|
||||
|
||||
def put_key_policy(self, key_id, policy):
|
||||
self.keys[key_id].policy = policy
|
||||
|
||||
def get_key_policy(self, key_id):
|
||||
return self.keys[key_id].policy
|
||||
|
||||
|
||||
kms_backends = {}
|
||||
for region in boto.kms.regions():
|
||||
|
@ -172,6 +172,53 @@ class KmsResponse(BaseResponse):
|
||||
'__type': 'NotFoundException'})
|
||||
return json.dumps({'KeyRotationEnabled': rotation_enabled})
|
||||
|
||||
def put_key_policy(self):
|
||||
key_id = self.parameters.get('KeyId')
|
||||
policy_name = self.parameters.get('PolicyName')
|
||||
policy = self.parameters.get('Policy')
|
||||
_assert_valid_key_id(key_id)
|
||||
_assert_default_policy(policy_name)
|
||||
|
||||
try:
|
||||
self.kms_backend.put_key_policy(key_id, policy)
|
||||
except KeyError:
|
||||
raise JSONResponseError(404, 'Not Found', body={
|
||||
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
|
||||
'__type': 'NotFoundException'})
|
||||
|
||||
return json.dumps(None)
|
||||
|
||||
def get_key_policy(self):
|
||||
key_id = self.parameters.get('KeyId')
|
||||
policy_name = self.parameters.get('PolicyName')
|
||||
_assert_valid_key_id(key_id)
|
||||
_assert_default_policy(policy_name)
|
||||
|
||||
try:
|
||||
return json.dumps({'Policy': self.kms_backend.get_key_policy(key_id)})
|
||||
except KeyError:
|
||||
raise JSONResponseError(404, 'Not Found', body={
|
||||
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
|
||||
'__type': 'NotFoundException'})
|
||||
|
||||
def list_key_policies(self):
|
||||
key_id = self.parameters.get('KeyId')
|
||||
_assert_valid_key_id(key_id)
|
||||
try:
|
||||
self.kms_backend.describe_key(key_id)
|
||||
except KeyError:
|
||||
raise JSONResponseError(404, 'Not Found', body={
|
||||
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
|
||||
'__type': 'NotFoundException'})
|
||||
|
||||
return json.dumps({'Truncated': False, 'PolicyNames': ['default']})
|
||||
|
||||
def _assert_valid_key_id(key_id):
|
||||
if not re.match(r'^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$', key_id, re.IGNORECASE):
|
||||
raise JSONResponseError(404, 'Not Found', body={'message': ' Invalid keyId', '__type': 'NotFoundException'})
|
||||
|
||||
def _assert_default_policy(policy_name):
|
||||
if policy_name != 'default':
|
||||
raise JSONResponseError(404, 'Not Found', body={
|
||||
'message': "No such policy exists",
|
||||
'__type': 'NotFoundException'})
|
||||
|
@ -111,6 +111,40 @@ def test_create_key_defaults_key_rotation():
|
||||
conn.get_key_rotation_status(key_id)['KeyRotationEnabled'].should.equal(False)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_get_key_policy():
|
||||
conn = boto.kms.connect_to_region('us-west-2')
|
||||
|
||||
key = conn.create_key(policy='my policy', description='my key1', key_usage='ENCRYPT_DECRYPT')
|
||||
key_id = key['KeyMetadata']['KeyId']
|
||||
|
||||
policy = conn.get_key_policy(key_id, 'default')
|
||||
policy['Policy'].should.equal('my policy')
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_put_key_policy():
|
||||
conn = boto.kms.connect_to_region('us-west-2')
|
||||
|
||||
key = conn.create_key(policy='my policy', description='my key1', key_usage='ENCRYPT_DECRYPT')
|
||||
key_id = key['KeyMetadata']['KeyId']
|
||||
|
||||
conn.put_key_policy(key_id, 'default', 'new policy')
|
||||
policy = conn.get_key_policy(key_id, 'default')
|
||||
policy['Policy'].should.equal('new policy')
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_list_key_policies():
|
||||
conn = boto.kms.connect_to_region('us-west-2')
|
||||
|
||||
key = conn.create_key(policy='my policy', description='my key1', key_usage='ENCRYPT_DECRYPT')
|
||||
key_id = key['KeyMetadata']['KeyId']
|
||||
|
||||
policies = conn.list_key_policies(key_id)
|
||||
policies['PolicyNames'].should.equal(['default'])
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test__create_alias__returns_none_if_correct():
|
||||
kms = boto.connect_kms()
|
||||
@ -386,3 +420,11 @@ def test__assert_valid_key_id():
|
||||
|
||||
_assert_valid_key_id.when.called_with("not-a-key").should.throw(JSONResponseError)
|
||||
_assert_valid_key_id.when.called_with(str(uuid.uuid4())).should_not.throw(JSONResponseError)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test__assert_default_policy():
|
||||
from moto.kms.responses import _assert_default_policy
|
||||
|
||||
_assert_default_policy.when.called_with("not-default").should.throw(JSONResponseError)
|
||||
_assert_default_policy.when.called_with("default").should_not.throw(JSONResponseError)
|
||||
|
Loading…
Reference in New Issue
Block a user