Merge pull request #464 from jszwedko/add-kms-key-rotation-endpoints

Add support for KMS key rotation operations
This commit is contained in:
Steve Pulec 2015-11-26 09:36:26 -05:00
commit 835078790c
3 changed files with 122 additions and 0 deletions

View File

@ -15,6 +15,7 @@ class Key(object):
self.enabled = True
self.region = region
self.account_id = "0123456789012"
self.key_rotation_status = False
@property
def arn(self):
@ -68,6 +69,16 @@ class KmsBackend(BaseBackend):
def get_all_aliases(self):
return self.key_to_aliases
def enable_key_rotation(self, key_id):
self.keys[key_id].key_rotation_status = True
def disable_key_rotation(self, key_id):
self.keys[key_id].key_rotation_status = False
def get_key_rotation_status(self, key_id):
return self.keys[key_id].key_rotation_status
kms_backends = {}
for region in boto.kms.regions():
kms_backends[region.name] = KmsBackend()

View File

@ -136,3 +136,41 @@ class KmsResponse(BaseResponse):
'Truncated': False,
'Aliases': response_aliases,
})
def enable_key_rotation(self):
key_id = self.parameters.get('KeyId')
_assert_valid_key_id(key_id)
try:
self.kms_backend.enable_key_rotation(key_id)
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
'__type': 'NotFoundException'})
return json.dumps(None)
def disable_key_rotation(self):
key_id = self.parameters.get('KeyId')
_assert_valid_key_id(key_id)
try:
self.kms_backend.disable_key_rotation(key_id)
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
'__type': 'NotFoundException'})
return json.dumps(None)
def get_key_rotation_status(self):
key_id = self.parameters.get('KeyId')
_assert_valid_key_id(key_id)
try:
rotation_enabled = self.kms_backend.get_key_rotation_status(key_id)
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
'__type': 'NotFoundException'})
return json.dumps({'KeyRotationEnabled': rotation_enabled})
def _assert_valid_key_id(key_id):
if not re.match(r'^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$', key_id, re.IGNORECASE):
raise JSONResponseError(404, 'Not Found', body={'message': ' Invalid keyId', '__type': 'NotFoundException'})

View File

@ -47,6 +47,70 @@ def test_list_keys():
keys['Keys'].should.have.length_of(2)
@mock_kms
def test_enable_key_rotation():
conn = boto.kms.connect_to_region("us-west-2")
key = conn.create_key(policy="my policy", description="my key", key_usage='ENCRYPT_DECRYPT')
key_id = key['KeyMetadata']['KeyId']
conn.enable_key_rotation(key_id)
conn.get_key_rotation_status(key_id)['KeyRotationEnabled'].should.equal(True)
@mock_kms
def test_enable_key_rotation_with_missing_key():
conn = boto.kms.connect_to_region("us-west-2")
conn.enable_key_rotation.when.called_with("not-a-key").should.throw(JSONResponseError)
@mock_kms
def test_disable_key_rotation():
conn = boto.kms.connect_to_region("us-west-2")
key = conn.create_key(policy="my policy", description="my key", key_usage='ENCRYPT_DECRYPT')
key_id = key['KeyMetadata']['KeyId']
conn.enable_key_rotation(key_id)
conn.get_key_rotation_status(key_id)['KeyRotationEnabled'].should.equal(True)
conn.disable_key_rotation(key_id)
conn.get_key_rotation_status(key_id)['KeyRotationEnabled'].should.equal(False)
@mock_kms
def test_disable_key_rotation_with_missing_key():
conn = boto.kms.connect_to_region("us-west-2")
conn.disable_key_rotation.when.called_with("not-a-key").should.throw(JSONResponseError)
@mock_kms
def test_get_key_rotation_status_with_missing_key():
conn = boto.kms.connect_to_region("us-west-2")
conn.get_key_rotation_status.when.called_with("not-a-key").should.throw(JSONResponseError)
@mock_kms
def test_get_key_rotation_status():
conn = boto.kms.connect_to_region("us-west-2")
key = conn.create_key(policy="my policy", description="my key", key_usage='ENCRYPT_DECRYPT')
key_id = key['KeyMetadata']['KeyId']
conn.get_key_rotation_status(key_id)['KeyRotationEnabled'].should.equal(False)
@mock_kms
def test_create_key_defaults_key_rotation():
conn = boto.kms.connect_to_region("us-west-2")
key = conn.create_key(policy="my policy", description="my key", key_usage='ENCRYPT_DECRYPT')
key_id = key['KeyMetadata']['KeyId']
conn.get_key_rotation_status(key_id)['KeyRotationEnabled'].should.equal(False)
@mock_kms
def test__create_alias__returns_none_if_correct():
kms = boto.connect_kms()
@ -313,3 +377,12 @@ def test__list_aliases():
len([alias for alias in aliases if 'TargetKeyId' in alias and key_id == alias['TargetKeyId']]).should.equal(3)
len(aliases).should.equal(7)
@mock_kms
def test__assert_valid_key_id():
from moto.kms.responses import _assert_valid_key_id
import uuid
_assert_valid_key_id.when.called_with("not-a-key").should.throw(JSONResponseError)
_assert_valid_key_id.when.called_with(str(uuid.uuid4())).should_not.throw(JSONResponseError)