Merge pull request #1856 from jrbeilke/feature-schedule_key_deletion

Add support for disable_key, enable_key, cancel_key_deletion, schedule_key_deletion to KMS
This commit is contained in:
Steve Pulec 2018-10-15 00:03:26 -04:00 committed by GitHub
commit 5568950394
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 183 additions and 2 deletions

View File

@ -2,8 +2,10 @@ from __future__ import unicode_literals
import boto.kms
from moto.core import BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_without_milliseconds
from .utils import generate_key_id
from collections import defaultdict
from datetime import datetime, timedelta
class Key(BaseModel):
@ -12,11 +14,13 @@ class Key(BaseModel):
self.id = generate_key_id()
self.policy = policy
self.key_usage = key_usage
self.key_state = "Enabled"
self.description = description
self.enabled = True
self.region = region
self.account_id = "0123456789012"
self.key_rotation_status = False
self.deletion_date = None
@property
def physical_resource_id(self):
@ -27,7 +31,7 @@ class Key(BaseModel):
return "arn:aws:kms:{0}:{1}:key/{2}".format(self.region, self.account_id, self.id)
def to_dict(self):
return {
key_dict = {
"KeyMetadata": {
"AWSAccountId": self.account_id,
"Arn": self.arn,
@ -36,8 +40,12 @@ class Key(BaseModel):
"Enabled": self.enabled,
"KeyId": self.id,
"KeyUsage": self.key_usage,
"KeyState": self.key_state,
}
}
if self.key_state == 'PendingDeletion':
key_dict['KeyMetadata']['DeletionDate'] = iso_8601_datetime_without_milliseconds(self.deletion_date)
return key_dict
def delete(self, region_name):
kms_backends[region_name].delete_key(self.id)
@ -138,6 +146,29 @@ class KmsBackend(BaseBackend):
def get_key_policy(self, key_id):
return self.keys[self.get_key_id(key_id)].policy
def disable_key(self, key_id):
if key_id in self.keys:
self.keys[key_id].enabled = False
self.keys[key_id].key_state = 'Disabled'
def enable_key(self, key_id):
if key_id in self.keys:
self.keys[key_id].enabled = True
self.keys[key_id].key_state = 'Enabled'
def cancel_key_deletion(self, key_id):
if key_id in self.keys:
self.keys[key_id].key_state = 'Disabled'
self.keys[key_id].deletion_date = None
def schedule_key_deletion(self, key_id, pending_window_in_days):
if key_id in self.keys:
if 7 <= pending_window_in_days <= 30:
self.keys[key_id].enabled = False
self.keys[key_id].key_state = 'PendingDeletion'
self.keys[key_id].deletion_date = datetime.now() + timedelta(days=pending_window_in_days)
return iso_8601_datetime_without_milliseconds(self.keys[key_id].deletion_date)
kms_backends = {}
for region in boto.kms.regions():

View File

@ -233,6 +233,56 @@ class KmsResponse(BaseResponse):
value = self.parameters.get("CiphertextBlob")
return json.dumps({"Plaintext": base64.b64decode(value).decode("utf-8")})
def disable_key(self):
key_id = self.parameters.get('KeyId')
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
try:
self.kms_backend.disable_key(key_id)
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id),
'__type': 'NotFoundException'})
return json.dumps(None)
def enable_key(self):
key_id = self.parameters.get('KeyId')
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
try:
self.kms_backend.enable_key(key_id)
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id),
'__type': 'NotFoundException'})
return json.dumps(None)
def cancel_key_deletion(self):
key_id = self.parameters.get('KeyId')
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
try:
self.kms_backend.cancel_key_deletion(key_id)
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id),
'__type': 'NotFoundException'})
return json.dumps({'KeyId': key_id})
def schedule_key_deletion(self):
key_id = self.parameters.get('KeyId')
if self.parameters.get('PendingWindowInDays') is None:
pending_window_in_days = 30
else:
pending_window_in_days = self.parameters.get('PendingWindowInDays')
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
try:
return json.dumps({
'KeyId': key_id,
'DeletionDate': self.kms_backend.schedule_key_deletion(key_id, pending_window_in_days)
})
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id),
'__type': 'NotFoundException'})
def _assert_valid_key_id(key_id):
if not re.match(r'^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$', key_id, re.IGNORECASE):

View File

@ -1,5 +1,5 @@
from __future__ import unicode_literals
import re
import os, re
import boto3
import boto.kms
@ -8,6 +8,9 @@ from boto.kms.exceptions import AlreadyExistsException, NotFoundException
import sure # noqa
from moto import mock_kms, mock_kms_deprecated
from nose.tools import assert_raises
from freezegun import freeze_time
from datetime import datetime, timedelta
from dateutil.tz import tzlocal
@mock_kms_deprecated
@ -617,3 +620,100 @@ def test_kms_encrypt_boto3():
response = client.decrypt(CiphertextBlob=response['CiphertextBlob'])
response['Plaintext'].should.equal(b'bar')
@mock_kms
def test_disable_key():
client = boto3.client('kms', region_name='us-east-1')
key = client.create_key(Description='disable-key')
client.disable_key(
KeyId=key['KeyMetadata']['KeyId']
)
result = client.describe_key(KeyId=key['KeyMetadata']['KeyId'])
assert result["KeyMetadata"]["Enabled"] == False
assert result["KeyMetadata"]["KeyState"] == 'Disabled'
@mock_kms
def test_enable_key():
client = boto3.client('kms', region_name='us-east-1')
key = client.create_key(Description='enable-key')
client.disable_key(
KeyId=key['KeyMetadata']['KeyId']
)
client.enable_key(
KeyId=key['KeyMetadata']['KeyId']
)
result = client.describe_key(KeyId=key['KeyMetadata']['KeyId'])
assert result["KeyMetadata"]["Enabled"] == True
assert result["KeyMetadata"]["KeyState"] == 'Enabled'
@mock_kms
def test_schedule_key_deletion():
client = boto3.client('kms', region_name='us-east-1')
key = client.create_key(Description='schedule-key-deletion')
if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'false':
with freeze_time("2015-01-01 12:00:00"):
response = client.schedule_key_deletion(
KeyId=key['KeyMetadata']['KeyId']
)
assert response['KeyId'] == key['KeyMetadata']['KeyId']
assert response['DeletionDate'] == datetime(2015, 1, 31, 12, 0, tzinfo=tzlocal())
else:
# Can't manipulate time in server mode
response = client.schedule_key_deletion(
KeyId=key['KeyMetadata']['KeyId']
)
assert response['KeyId'] == key['KeyMetadata']['KeyId']
result = client.describe_key(KeyId=key['KeyMetadata']['KeyId'])
assert result["KeyMetadata"]["Enabled"] == False
assert result["KeyMetadata"]["KeyState"] == 'PendingDeletion'
assert 'DeletionDate' in result["KeyMetadata"]
@mock_kms
def test_schedule_key_deletion_custom():
client = boto3.client('kms', region_name='us-east-1')
key = client.create_key(Description='schedule-key-deletion')
if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'false':
with freeze_time("2015-01-01 12:00:00"):
response = client.schedule_key_deletion(
KeyId=key['KeyMetadata']['KeyId'],
PendingWindowInDays=7
)
assert response['KeyId'] == key['KeyMetadata']['KeyId']
assert response['DeletionDate'] == datetime(2015, 1, 8, 12, 0, tzinfo=tzlocal())
else:
# Can't manipulate time in server mode
response = client.schedule_key_deletion(
KeyId=key['KeyMetadata']['KeyId'],
PendingWindowInDays=7
)
assert response['KeyId'] == key['KeyMetadata']['KeyId']
result = client.describe_key(KeyId=key['KeyMetadata']['KeyId'])
assert result["KeyMetadata"]["Enabled"] == False
assert result["KeyMetadata"]["KeyState"] == 'PendingDeletion'
assert 'DeletionDate' in result["KeyMetadata"]
@mock_kms
def test_cancel_key_deletion():
client = boto3.client('kms', region_name='us-east-1')
key = client.create_key(Description='cancel-key-deletion')
client.schedule_key_deletion(
KeyId=key['KeyMetadata']['KeyId']
)
response = client.cancel_key_deletion(
KeyId=key['KeyMetadata']['KeyId']
)
assert response['KeyId'] == key['KeyMetadata']['KeyId']
result = client.describe_key(KeyId=key['KeyMetadata']['KeyId'])
assert result["KeyMetadata"]["Enabled"] == False
assert result["KeyMetadata"]["KeyState"] == 'Disabled'
assert 'DeletionDate' not in result["KeyMetadata"]