From 60ec840eef44c395e83342469ab81c0ca5a46daf Mon Sep 17 00:00:00 2001 From: Jon Beilke Date: Fri, 5 Oct 2018 15:55:47 -0500 Subject: [PATCH] add disable_key, enable_key, cancel_key_deletion, and schedule_key_deletion actions to KMS endpoint --- moto/kms/models.py | 30 +++++++++++++- moto/kms/responses.py | 46 ++++++++++++++++++++++ tests/test_kms/test_kms.py | 80 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 155 insertions(+), 1 deletion(-) diff --git a/moto/kms/models.py b/moto/kms/models.py index 89ebf0082..30a01d366 100644 --- a/moto/kms/models.py +++ b/moto/kms/models.py @@ -12,11 +12,13 @@ class Key(BaseModel): self.id = generate_key_id() self.policy = policy self.key_usage = key_usage + self.key_state = "Enabled" self.description = description self.enabled = True self.region = region self.account_id = "0123456789012" self.key_rotation_status = False + self.deletion_date = None @property def physical_resource_id(self): @@ -27,7 +29,7 @@ class Key(BaseModel): return "arn:aws:kms:{0}:{1}:key/{2}".format(self.region, self.account_id, self.id) def to_dict(self): - return { + key_dict = { "KeyMetadata": { "AWSAccountId": self.account_id, "Arn": self.arn, @@ -36,8 +38,11 @@ class Key(BaseModel): "Enabled": self.enabled, "KeyId": self.id, "KeyUsage": self.key_usage, + "KeyState": self.key_state, } } + key_dict['KeyMetadata']['DeletionDate'] = self.deletion_date if self.key_state == 'PendingDeletion' + return key_dict def delete(self, region_name): kms_backends[region_name].delete_key(self.id) @@ -138,6 +143,29 @@ class KmsBackend(BaseBackend): def get_key_policy(self, key_id): return self.keys[self.get_key_id(key_id)].policy + def disable_key(self, key_id): + if key_id in self.keys: + self.keys[key_id].enabled = False + self.keys[key_id].key_state = 'Disabled' + + def enable_key(self, key_id): + if key_id in self.keys: + self.keys[key_id].enabled = True + self.keys[key_id].key_state = 'Enabled' + + def cancel_key_deletion(self, key_id): + if key_id in self.keys: + self.keys[key_id].key_state = 'Disabled' + self.keys[key_id].deletion_date = None + + def schedule_key_deletion(self, key_id, pending_window_in_days=30): + if key_id in self.keys: + if 7 <= pending_window_in_days <= 30: + self.keys[key_id].enabled = False + self.keys[key_id].key_state = 'PendingDeletion' + self.keys[key_id].deletion_date = datetime.now() + timedelta(days=pending_window_in_days) + return self.keys[key_id].deletion_date + kms_backends = {} for region in boto.kms.regions(): diff --git a/moto/kms/responses.py b/moto/kms/responses.py index 0f544e954..e782d862e 100644 --- a/moto/kms/responses.py +++ b/moto/kms/responses.py @@ -233,6 +233,52 @@ class KmsResponse(BaseResponse): value = self.parameters.get("CiphertextBlob") return json.dumps({"Plaintext": base64.b64decode(value).decode("utf-8")}) + def disable_key(self): + key_id = self.parameters.get('KeyId') + _assert_valid_key_id(self.kms_backend.get_key_id(key_id)) + try: + self.kms_backend.disable_key(key_id) + except KeyError: + raise JSONResponseError(404, 'Not Found', body={ + 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), + '__type': 'NotFoundException'}) + return json.dumps(None) + + def enable_key(self): + key_id = self.parameters.get('KeyId') + _assert_valid_key_id(self.kms_backend.get_key_id(key_id)) + try: + self.kms_backend.enable_key(key_id) + except KeyError: + raise JSONResponseError(404, 'Not Found', body={ + 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), + '__type': 'NotFoundException'}) + return json.dumps(None) + + def cancel_key_deletion(self): + key_id = self.parameters.get('KeyId') + _assert_valid_key_id(self.kms_backend.get_key_id(key_id)) + try: + self.kms_backend.cancel_key_deletion(key_id) + except KeyError: + raise JSONResponseError(404, 'Not Found', body={ + 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), + '__type': 'NotFoundException'}) + return json.dumps({'KeyId': key_id}) + + def schedule_key_deletion(self): + key_id = self.parameters.get('KeyId') + _assert_valid_key_id(self.kms_backend.get_key_id(key_id)) + try: + return json.dumps({ + 'KeyId': key_id, + 'DeletionDate': self.kms_backend.schedule_key_deletion(key_id) + }) + except KeyError: + raise JSONResponseError(404, 'Not Found', body={ + 'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region, key_id=key_id), + '__type': 'NotFoundException'}) + def _assert_valid_key_id(key_id): if not re.match(r'^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$', key_id, re.IGNORECASE): diff --git a/tests/test_kms/test_kms.py b/tests/test_kms/test_kms.py index 96715de71..9779f02a6 100644 --- a/tests/test_kms/test_kms.py +++ b/tests/test_kms/test_kms.py @@ -617,3 +617,83 @@ def test_kms_encrypt_boto3(): response = client.decrypt(CiphertextBlob=response['CiphertextBlob']) response['Plaintext'].should.equal(b'bar') + + +@mock_kms +def test_disable_key(): + client = boto3.client('kms', region_name='us-east-1') + key = client.create_key(description='disable-key') + client.disable_key( + KeyId=key['KeyMetadata']['KeyId'] + ) + + result = client.describe_key(KeyId='disable-key') + assert result["KeyMetadata"]["Enabled"] == False + assert result["KeyMetadata"]["KeyState"] == 'Disabled' + + +@mock_kms +def test_enable_key(): + client = boto3.client('kms', region_name='us-east-1') + key = client.create_key(description='enable-key') + client.disable_key( + KeyId=key['KeyMetadata']['KeyId'] + ) + client.enable_key( + KeyId=key['KeyMetadata']['KeyId'] + ) + + result = client.describe_key(KeyId='enable-key') + assert result["KeyMetadata"]["Enabled"] == True + assert result["KeyMetadata"]["KeyState"] == 'Enabled' + + +@mock_kms +def test_schedule_key_deletion(): + client = boto3.client('kms', region_name='us-east-1') + key = client.create_key(description='schedule-key-deletion') + response = client.schedule_key_deletion( + KeyId=key['KeyMetadata']['KeyId'] + ) + assert response['KeyId'] == 'schedule-key-deletion' + assert response['DeletionDate'] == datetime.now() + timedelta(days=30) + + result = client.describe_key(KeyId='schedule-key-deletion') + assert result["KeyMetadata"]["Enabled"] == False + assert result["KeyMetadata"]["KeyState"] == 'PendingDeletion' + assert 'DeletionDate' in result["KeyMetadata"] + + +@mock_kms +def test_schedule_key_deletion_custom(): + client = boto3.client('kms', region_name='us-east-1') + key = client.create_key(description='schedule-key-deletion') + response = client.schedule_key_deletion( + KeyId=key['KeyMetadata']['KeyId'], + PendingWindowInDays=7 + ) + assert response['KeyId'] == 'schedule-key-deletion' + assert response['DeletionDate'] == datetime.now() + timedelta(days=7) + + result = client.describe_key(KeyId='schedule-key-deletion') + assert result["KeyMetadata"]["Enabled"] == False + assert result["KeyMetadata"]["KeyState"] == 'PendingDeletion' + assert 'DeletionDate' in result["KeyMetadata"] + + +@mock_kms +def test_cancel_key_deletion(): + client = boto3.client('kms', region_name='us-east-1') + key = client.create_key(description='cancel-key-deletion') + client.schedule_key_deletion( + KeyId=key['KeyMetadata']['KeyId'] + ) + response = client.cancel_key_deletion( + KeyId=key['KeyMetadata']['KeyId'] + ) + assert response['KeyId'] == 'cancel-key-deletion' + + result = client.describe_key(KeyId='cancel-key-deletion') + assert result["KeyMetadata"]["Enabled"] == False + assert result["KeyMetadata"]["KeyState"] == 'Disabled' + assert 'DeletionDate' not in result["KeyMetadata"]