add disable_key, enable_key, cancel_key_deletion, and schedule_key_deletion actions to KMS endpoint
This commit is contained in:
parent
fe9b312fd2
commit
60ec840eef
@ -12,11 +12,13 @@ class Key(BaseModel):
|
|||||||
self.id = generate_key_id()
|
self.id = generate_key_id()
|
||||||
self.policy = policy
|
self.policy = policy
|
||||||
self.key_usage = key_usage
|
self.key_usage = key_usage
|
||||||
|
self.key_state = "Enabled"
|
||||||
self.description = description
|
self.description = description
|
||||||
self.enabled = True
|
self.enabled = True
|
||||||
self.region = region
|
self.region = region
|
||||||
self.account_id = "0123456789012"
|
self.account_id = "0123456789012"
|
||||||
self.key_rotation_status = False
|
self.key_rotation_status = False
|
||||||
|
self.deletion_date = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def physical_resource_id(self):
|
def physical_resource_id(self):
|
||||||
@ -27,7 +29,7 @@ class Key(BaseModel):
|
|||||||
return "arn:aws:kms:{0}:{1}:key/{2}".format(self.region, self.account_id, self.id)
|
return "arn:aws:kms:{0}:{1}:key/{2}".format(self.region, self.account_id, self.id)
|
||||||
|
|
||||||
def to_dict(self):
|
def to_dict(self):
|
||||||
return {
|
key_dict = {
|
||||||
"KeyMetadata": {
|
"KeyMetadata": {
|
||||||
"AWSAccountId": self.account_id,
|
"AWSAccountId": self.account_id,
|
||||||
"Arn": self.arn,
|
"Arn": self.arn,
|
||||||
@ -36,8 +38,11 @@ class Key(BaseModel):
|
|||||||
"Enabled": self.enabled,
|
"Enabled": self.enabled,
|
||||||
"KeyId": self.id,
|
"KeyId": self.id,
|
||||||
"KeyUsage": self.key_usage,
|
"KeyUsage": self.key_usage,
|
||||||
|
"KeyState": self.key_state,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
key_dict['KeyMetadata']['DeletionDate'] = self.deletion_date if self.key_state == 'PendingDeletion'
|
||||||
|
return key_dict
|
||||||
|
|
||||||
def delete(self, region_name):
|
def delete(self, region_name):
|
||||||
kms_backends[region_name].delete_key(self.id)
|
kms_backends[region_name].delete_key(self.id)
|
||||||
@ -138,6 +143,29 @@ class KmsBackend(BaseBackend):
|
|||||||
def get_key_policy(self, key_id):
|
def get_key_policy(self, key_id):
|
||||||
return self.keys[self.get_key_id(key_id)].policy
|
return self.keys[self.get_key_id(key_id)].policy
|
||||||
|
|
||||||
|
def disable_key(self, key_id):
|
||||||
|
if key_id in self.keys:
|
||||||
|
self.keys[key_id].enabled = False
|
||||||
|
self.keys[key_id].key_state = 'Disabled'
|
||||||
|
|
||||||
|
def enable_key(self, key_id):
|
||||||
|
if key_id in self.keys:
|
||||||
|
self.keys[key_id].enabled = True
|
||||||
|
self.keys[key_id].key_state = 'Enabled'
|
||||||
|
|
||||||
|
def cancel_key_deletion(self, key_id):
|
||||||
|
if key_id in self.keys:
|
||||||
|
self.keys[key_id].key_state = 'Disabled'
|
||||||
|
self.keys[key_id].deletion_date = None
|
||||||
|
|
||||||
|
def schedule_key_deletion(self, key_id, pending_window_in_days=30):
|
||||||
|
if key_id in self.keys:
|
||||||
|
if 7 <= pending_window_in_days <= 30:
|
||||||
|
self.keys[key_id].enabled = False
|
||||||
|
self.keys[key_id].key_state = 'PendingDeletion'
|
||||||
|
self.keys[key_id].deletion_date = datetime.now() + timedelta(days=pending_window_in_days)
|
||||||
|
return self.keys[key_id].deletion_date
|
||||||
|
|
||||||
|
|
||||||
kms_backends = {}
|
kms_backends = {}
|
||||||
for region in boto.kms.regions():
|
for region in boto.kms.regions():
|
||||||
|
@ -233,6 +233,52 @@ class KmsResponse(BaseResponse):
|
|||||||
value = self.parameters.get("CiphertextBlob")
|
value = self.parameters.get("CiphertextBlob")
|
||||||
return json.dumps({"Plaintext": base64.b64decode(value).decode("utf-8")})
|
return json.dumps({"Plaintext": base64.b64decode(value).decode("utf-8")})
|
||||||
|
|
||||||
|
def disable_key(self):
|
||||||
|
key_id = self.parameters.get('KeyId')
|
||||||
|
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
|
||||||
|
try:
|
||||||
|
self.kms_backend.disable_key(key_id)
|
||||||
|
except KeyError:
|
||||||
|
raise JSONResponseError(404, 'Not Found', body={
|
||||||
|
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id),
|
||||||
|
'__type': 'NotFoundException'})
|
||||||
|
return json.dumps(None)
|
||||||
|
|
||||||
|
def enable_key(self):
|
||||||
|
key_id = self.parameters.get('KeyId')
|
||||||
|
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
|
||||||
|
try:
|
||||||
|
self.kms_backend.enable_key(key_id)
|
||||||
|
except KeyError:
|
||||||
|
raise JSONResponseError(404, 'Not Found', body={
|
||||||
|
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id),
|
||||||
|
'__type': 'NotFoundException'})
|
||||||
|
return json.dumps(None)
|
||||||
|
|
||||||
|
def cancel_key_deletion(self):
|
||||||
|
key_id = self.parameters.get('KeyId')
|
||||||
|
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
|
||||||
|
try:
|
||||||
|
self.kms_backend.cancel_key_deletion(key_id)
|
||||||
|
except KeyError:
|
||||||
|
raise JSONResponseError(404, 'Not Found', body={
|
||||||
|
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id),
|
||||||
|
'__type': 'NotFoundException'})
|
||||||
|
return json.dumps({'KeyId': key_id})
|
||||||
|
|
||||||
|
def schedule_key_deletion(self):
|
||||||
|
key_id = self.parameters.get('KeyId')
|
||||||
|
_assert_valid_key_id(self.kms_backend.get_key_id(key_id))
|
||||||
|
try:
|
||||||
|
return json.dumps({
|
||||||
|
'KeyId': key_id,
|
||||||
|
'DeletionDate': self.kms_backend.schedule_key_deletion(key_id)
|
||||||
|
})
|
||||||
|
except KeyError:
|
||||||
|
raise JSONResponseError(404, 'Not Found', body={
|
||||||
|
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id),
|
||||||
|
'__type': 'NotFoundException'})
|
||||||
|
|
||||||
|
|
||||||
def _assert_valid_key_id(key_id):
|
def _assert_valid_key_id(key_id):
|
||||||
if not re.match(r'^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$', key_id, re.IGNORECASE):
|
if not re.match(r'^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$', key_id, re.IGNORECASE):
|
||||||
|
@ -617,3 +617,83 @@ def test_kms_encrypt_boto3():
|
|||||||
|
|
||||||
response = client.decrypt(CiphertextBlob=response['CiphertextBlob'])
|
response = client.decrypt(CiphertextBlob=response['CiphertextBlob'])
|
||||||
response['Plaintext'].should.equal(b'bar')
|
response['Plaintext'].should.equal(b'bar')
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kms
|
||||||
|
def test_disable_key():
|
||||||
|
client = boto3.client('kms', region_name='us-east-1')
|
||||||
|
key = client.create_key(description='disable-key')
|
||||||
|
client.disable_key(
|
||||||
|
KeyId=key['KeyMetadata']['KeyId']
|
||||||
|
)
|
||||||
|
|
||||||
|
result = client.describe_key(KeyId='disable-key')
|
||||||
|
assert result["KeyMetadata"]["Enabled"] == False
|
||||||
|
assert result["KeyMetadata"]["KeyState"] == 'Disabled'
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kms
|
||||||
|
def test_enable_key():
|
||||||
|
client = boto3.client('kms', region_name='us-east-1')
|
||||||
|
key = client.create_key(description='enable-key')
|
||||||
|
client.disable_key(
|
||||||
|
KeyId=key['KeyMetadata']['KeyId']
|
||||||
|
)
|
||||||
|
client.enable_key(
|
||||||
|
KeyId=key['KeyMetadata']['KeyId']
|
||||||
|
)
|
||||||
|
|
||||||
|
result = client.describe_key(KeyId='enable-key')
|
||||||
|
assert result["KeyMetadata"]["Enabled"] == True
|
||||||
|
assert result["KeyMetadata"]["KeyState"] == 'Enabled'
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kms
|
||||||
|
def test_schedule_key_deletion():
|
||||||
|
client = boto3.client('kms', region_name='us-east-1')
|
||||||
|
key = client.create_key(description='schedule-key-deletion')
|
||||||
|
response = client.schedule_key_deletion(
|
||||||
|
KeyId=key['KeyMetadata']['KeyId']
|
||||||
|
)
|
||||||
|
assert response['KeyId'] == 'schedule-key-deletion'
|
||||||
|
assert response['DeletionDate'] == datetime.now() + timedelta(days=30)
|
||||||
|
|
||||||
|
result = client.describe_key(KeyId='schedule-key-deletion')
|
||||||
|
assert result["KeyMetadata"]["Enabled"] == False
|
||||||
|
assert result["KeyMetadata"]["KeyState"] == 'PendingDeletion'
|
||||||
|
assert 'DeletionDate' in result["KeyMetadata"]
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kms
|
||||||
|
def test_schedule_key_deletion_custom():
|
||||||
|
client = boto3.client('kms', region_name='us-east-1')
|
||||||
|
key = client.create_key(description='schedule-key-deletion')
|
||||||
|
response = client.schedule_key_deletion(
|
||||||
|
KeyId=key['KeyMetadata']['KeyId'],
|
||||||
|
PendingWindowInDays=7
|
||||||
|
)
|
||||||
|
assert response['KeyId'] == 'schedule-key-deletion'
|
||||||
|
assert response['DeletionDate'] == datetime.now() + timedelta(days=7)
|
||||||
|
|
||||||
|
result = client.describe_key(KeyId='schedule-key-deletion')
|
||||||
|
assert result["KeyMetadata"]["Enabled"] == False
|
||||||
|
assert result["KeyMetadata"]["KeyState"] == 'PendingDeletion'
|
||||||
|
assert 'DeletionDate' in result["KeyMetadata"]
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kms
|
||||||
|
def test_cancel_key_deletion():
|
||||||
|
client = boto3.client('kms', region_name='us-east-1')
|
||||||
|
key = client.create_key(description='cancel-key-deletion')
|
||||||
|
client.schedule_key_deletion(
|
||||||
|
KeyId=key['KeyMetadata']['KeyId']
|
||||||
|
)
|
||||||
|
response = client.cancel_key_deletion(
|
||||||
|
KeyId=key['KeyMetadata']['KeyId']
|
||||||
|
)
|
||||||
|
assert response['KeyId'] == 'cancel-key-deletion'
|
||||||
|
|
||||||
|
result = client.describe_key(KeyId='cancel-key-deletion')
|
||||||
|
assert result["KeyMetadata"]["Enabled"] == False
|
||||||
|
assert result["KeyMetadata"]["KeyState"] == 'Disabled'
|
||||||
|
assert 'DeletionDate' not in result["KeyMetadata"]
|
||||||
|
Loading…
Reference in New Issue
Block a user