Merge pull request #721 from rocky4570/kms-fixes
Add ARN and Alias functionality to KMS
This commit is contained in:
commit
b3b538bbef
@ -77,11 +77,19 @@ class KmsBackend(BaseBackend):
|
||||
return self.keys.pop(key_id)
|
||||
|
||||
def describe_key(self, key_id):
|
||||
return self.keys[key_id]
|
||||
# allow the different methods (alias, ARN :key/, keyId, ARN alias) to describe key not just KeyId
|
||||
key_id = self.get_key_id(key_id)
|
||||
if r'alias/' in str(key_id).lower():
|
||||
key_id = self.get_key_id_from_alias(key_id.split('alias/')[1])
|
||||
return self.keys[self.get_key_id(key_id)]
|
||||
|
||||
def list_keys(self):
|
||||
return self.keys.values()
|
||||
|
||||
def get_key_id(self, key_id):
|
||||
# Allow use of ARN as well as pure KeyId
|
||||
return str(key_id).split(r':key/')[1] if r':key/' in str(key_id).lower() else key_id
|
||||
|
||||
def alias_exists(self, alias_name):
|
||||
for aliases in self.key_to_aliases.values():
|
||||
if alias_name in aliases:
|
||||
@ -99,21 +107,26 @@ class KmsBackend(BaseBackend):
|
||||
def get_all_aliases(self):
|
||||
return self.key_to_aliases
|
||||
|
||||
def get_key_id_from_alias(self, alias_name):
|
||||
for key_id, aliases in dict(self.key_to_aliases).items():
|
||||
if alias_name in ",".join(aliases):
|
||||
return key_id
|
||||
return None
|
||||
|
||||
def enable_key_rotation(self, key_id):
|
||||
self.keys[key_id].key_rotation_status = True
|
||||
self.keys[self.get_key_id(key_id)].key_rotation_status = True
|
||||
|
||||
def disable_key_rotation(self, key_id):
|
||||
self.keys[key_id].key_rotation_status = False
|
||||
self.keys[self.get_key_id(key_id)].key_rotation_status = False
|
||||
|
||||
def get_key_rotation_status(self, key_id):
|
||||
return self.keys[key_id].key_rotation_status
|
||||
return self.keys[self.get_key_id(key_id)].key_rotation_status
|
||||
|
||||
def put_key_policy(self, key_id, policy):
|
||||
self.keys[key_id].policy = policy
|
||||
self.keys[self.get_key_id(key_id)].policy = policy
|
||||
|
||||
def get_key_policy(self, key_id):
|
||||
return self.keys[key_id].policy
|
||||
|
||||
return self.keys[self.get_key_id(key_id)].policy
|
||||
|
||||
kms_backends = {}
|
||||
for region in boto.kms.regions():
|
||||
|
@ -37,7 +37,7 @@ class KmsResponse(BaseResponse):
|
||||
def describe_key(self):
|
||||
key_id = self.parameters.get('KeyId')
|
||||
try:
|
||||
key = self.kms_backend.describe_key(key_id)
|
||||
key = self.kms_backend.describe_key(self.kms_backend.get_key_id(key_id))
|
||||
except KeyError:
|
||||
headers = dict(self.headers)
|
||||
headers['status'] = 404
|
||||
@ -140,7 +140,7 @@ class KmsResponse(BaseResponse):
|
||||
|
||||
def enable_key_rotation(self):
|
||||
key_id = self.parameters.get('KeyId')
|
||||
_assert_valid_key_id(key_id)
|
||||
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
|
||||
try:
|
||||
self.kms_backend.enable_key_rotation(key_id)
|
||||
except KeyError:
|
||||
@ -152,7 +152,7 @@ class KmsResponse(BaseResponse):
|
||||
|
||||
def disable_key_rotation(self):
|
||||
key_id = self.parameters.get('KeyId')
|
||||
_assert_valid_key_id(key_id)
|
||||
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
|
||||
try:
|
||||
self.kms_backend.disable_key_rotation(key_id)
|
||||
except KeyError:
|
||||
@ -163,7 +163,7 @@ class KmsResponse(BaseResponse):
|
||||
|
||||
def get_key_rotation_status(self):
|
||||
key_id = self.parameters.get('KeyId')
|
||||
_assert_valid_key_id(key_id)
|
||||
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
|
||||
try:
|
||||
rotation_enabled = self.kms_backend.get_key_rotation_status(key_id)
|
||||
except KeyError:
|
||||
@ -176,7 +176,7 @@ class KmsResponse(BaseResponse):
|
||||
key_id = self.parameters.get('KeyId')
|
||||
policy_name = self.parameters.get('PolicyName')
|
||||
policy = self.parameters.get('Policy')
|
||||
_assert_valid_key_id(key_id)
|
||||
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
|
||||
_assert_default_policy(policy_name)
|
||||
|
||||
try:
|
||||
@ -191,7 +191,7 @@ class KmsResponse(BaseResponse):
|
||||
def get_key_policy(self):
|
||||
key_id = self.parameters.get('KeyId')
|
||||
policy_name = self.parameters.get('PolicyName')
|
||||
_assert_valid_key_id(key_id)
|
||||
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
|
||||
_assert_default_policy(policy_name)
|
||||
|
||||
try:
|
||||
@ -203,7 +203,7 @@ class KmsResponse(BaseResponse):
|
||||
|
||||
def list_key_policies(self):
|
||||
key_id = self.parameters.get('KeyId')
|
||||
_assert_valid_key_id(key_id)
|
||||
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
|
||||
try:
|
||||
self.kms_backend.describe_key(key_id)
|
||||
except KeyError:
|
||||
|
@ -30,6 +30,39 @@ def test_describe_key():
|
||||
key['KeyMetadata']['KeyUsage'].should.equal("ENCRYPT_DECRYPT")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_describe_key_via_alias():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
key = conn.create_key(policy="my policy", description="my key", key_usage='ENCRYPT_DECRYPT')
|
||||
conn.create_alias(alias_name='alias/my-key-alias', target_key_id=key['KeyMetadata']['KeyId'])
|
||||
|
||||
alias_key = conn.describe_key('alias/my-key-alias')
|
||||
alias_key['KeyMetadata']['Description'].should.equal("my key")
|
||||
alias_key['KeyMetadata']['KeyUsage'].should.equal("ENCRYPT_DECRYPT")
|
||||
alias_key['KeyMetadata']['Arn'].should.equal(key['KeyMetadata']['Arn'])
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_describe_key_via_alias_not_found():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
key = conn.create_key(policy="my policy", description="my key", key_usage='ENCRYPT_DECRYPT')
|
||||
conn.create_alias(alias_name='alias/my-key-alias', target_key_id=key['KeyMetadata']['KeyId'])
|
||||
|
||||
conn.describe_key.when.called_with('alias/not-found-alias').should.throw(JSONResponseError)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_describe_key_via_arn():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
key = conn.create_key(policy="my policy", description="my key", key_usage='ENCRYPT_DECRYPT')
|
||||
arn = key['KeyMetadata']['Arn']
|
||||
|
||||
the_key = conn.describe_key(arn)
|
||||
the_key['KeyMetadata']['Description'].should.equal("my key")
|
||||
the_key['KeyMetadata']['KeyUsage'].should.equal("ENCRYPT_DECRYPT")
|
||||
the_key['KeyMetadata']['KeyId'].should.equal(key['KeyMetadata']['KeyId'])
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_describe_missing_key():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
@ -58,6 +91,18 @@ def test_enable_key_rotation():
|
||||
|
||||
conn.get_key_rotation_status(key_id)['KeyRotationEnabled'].should.equal(True)
|
||||
|
||||
@mock_kms
|
||||
def test_enable_key_rotation_via_arn():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
|
||||
key = conn.create_key(policy="my policy", description="my key", key_usage='ENCRYPT_DECRYPT')
|
||||
key_id = key['KeyMetadata']['Arn']
|
||||
|
||||
conn.enable_key_rotation(key_id)
|
||||
|
||||
conn.get_key_rotation_status(key_id)['KeyRotationEnabled'].should.equal(True)
|
||||
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_enable_key_rotation_with_missing_key():
|
||||
@ -65,6 +110,18 @@ def test_enable_key_rotation_with_missing_key():
|
||||
conn.enable_key_rotation.when.called_with("not-a-key").should.throw(JSONResponseError)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_enable_key_rotation_with_alias_name_should_fail():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
key = conn.create_key(policy="my policy", description="my key", key_usage='ENCRYPT_DECRYPT')
|
||||
conn.create_alias(alias_name='alias/my-key-alias', target_key_id=key['KeyMetadata']['KeyId'])
|
||||
|
||||
alias_key = conn.describe_key('alias/my-key-alias')
|
||||
alias_key['KeyMetadata']['Arn'].should.equal(key['KeyMetadata']['Arn'])
|
||||
|
||||
conn.enable_key_rotation.when.called_with('alias/my-alias').should.throw(JSONResponseError)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_disable_key_rotation():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
@ -121,6 +178,14 @@ def test_get_key_policy():
|
||||
policy = conn.get_key_policy(key_id, 'default')
|
||||
policy['Policy'].should.equal('my policy')
|
||||
|
||||
@mock_kms
|
||||
def test_get_key_policy_via_arn():
|
||||
conn = boto.kms.connect_to_region('us-west-2')
|
||||
|
||||
key = conn.create_key(policy='my policy', description='my key1', key_usage='ENCRYPT_DECRYPT')
|
||||
policy = conn.get_key_policy(key['KeyMetadata']['Arn'], 'default')
|
||||
|
||||
policy['Policy'].should.equal('my policy')
|
||||
|
||||
@mock_kms
|
||||
def test_put_key_policy():
|
||||
@ -134,6 +199,42 @@ def test_put_key_policy():
|
||||
policy['Policy'].should.equal('new policy')
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_put_key_policy_via_arn():
|
||||
conn = boto.kms.connect_to_region('us-west-2')
|
||||
|
||||
key = conn.create_key(policy='my policy', description='my key1', key_usage='ENCRYPT_DECRYPT')
|
||||
key_id = key['KeyMetadata']['Arn']
|
||||
|
||||
conn.put_key_policy(key_id, 'default', 'new policy')
|
||||
policy = conn.get_key_policy(key_id, 'default')
|
||||
policy['Policy'].should.equal('new policy')
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_put_key_policy_via_alias_should_not_update():
|
||||
conn = boto.kms.connect_to_region('us-west-2')
|
||||
|
||||
key = conn.create_key(policy='my policy', description='my key1', key_usage='ENCRYPT_DECRYPT')
|
||||
conn.create_alias(alias_name='alias/my-key-alias', target_key_id=key['KeyMetadata']['KeyId'])
|
||||
|
||||
conn.put_key_policy.when.called_with('alias/my-key-alias', 'default', 'new policy').should.throw(JSONResponseError)
|
||||
|
||||
policy = conn.get_key_policy(key['KeyMetadata']['KeyId'], 'default')
|
||||
policy['Policy'].should.equal('my policy')
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_put_key_policy():
|
||||
conn = boto.kms.connect_to_region('us-west-2')
|
||||
|
||||
key = conn.create_key(policy='my policy', description='my key1', key_usage='ENCRYPT_DECRYPT')
|
||||
conn.put_key_policy(key['KeyMetadata']['Arn'], 'default', 'new policy')
|
||||
|
||||
policy = conn.get_key_policy(key['KeyMetadata']['KeyId'], 'default')
|
||||
policy['Policy'].should.equal('new policy')
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_list_key_policies():
|
||||
conn = boto.kms.connect_to_region('us-west-2')
|
||||
|
Loading…
Reference in New Issue
Block a user