Allow soft deletion of secrets

This commit is contained in:
Chris Kilding 2019-04-18 12:58:50 +01:00
parent fc8cf2d872
commit 749f4f63e6
3 changed files with 110 additions and 25 deletions

View File

@ -27,3 +27,10 @@ class InvalidParameterException(SecretsManagerClientError):
super(InvalidParameterException, self).__init__( super(InvalidParameterException, self).__init__(
'InvalidParameterException', 'InvalidParameterException',
message) message)
class InvalidRequestException(SecretsManagerClientError):
def __init__(self, message):
super(InvalidRequestException, self).__init__(
'InvalidRequestException',
message)

View File

@ -3,6 +3,7 @@ from __future__ import unicode_literals
import time import time
import json import json
import uuid import uuid
import datetime
import boto3 import boto3
@ -10,6 +11,7 @@ from moto.core import BaseBackend, BaseModel
from .exceptions import ( from .exceptions import (
ResourceNotFoundException, ResourceNotFoundException,
InvalidParameterException, InvalidParameterException,
InvalidRequestException,
ClientError ClientError
) )
from .utils import random_password, secret_arn from .utils import random_password, secret_arn
@ -36,11 +38,21 @@ class SecretsManagerBackend(BaseBackend):
def _is_valid_identifier(self, identifier): def _is_valid_identifier(self, identifier):
return identifier in self.secrets return identifier in self.secrets
def _unix_time_secs(self, dt):
epoch = datetime.datetime.utcfromtimestamp(0)
return (dt - epoch).total_seconds()
def get_secret_value(self, secret_id, version_id, version_stage): def get_secret_value(self, secret_id, version_id, version_stage):
if not self._is_valid_identifier(secret_id): if not self._is_valid_identifier(secret_id):
raise ResourceNotFoundException() raise ResourceNotFoundException()
if 'deleted_date' in self.secrets[secret_id]:
raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \
perform the operation on a secret that's currently marked deleted."
)
secret = self.secrets[secret_id] secret = self.secrets[secret_id]
response = json.dumps({ response = json.dumps({
@ -101,7 +113,7 @@ class SecretsManagerBackend(BaseBackend):
"LastRotatedDate": None, "LastRotatedDate": None,
"LastChangedDate": None, "LastChangedDate": None,
"LastAccessedDate": None, "LastAccessedDate": None,
"DeletedDate": None, "DeletedDate": secret.get('deleted_date', None),
"Tags": secret['tags'] "Tags": secret['tags']
}) })
@ -193,7 +205,7 @@ class SecretsManagerBackend(BaseBackend):
secret_list = [{ secret_list = [{
"ARN": secret_arn(self.region, secret['secret_id']), "ARN": secret_arn(self.region, secret['secret_id']),
"DeletedDate": None, "DeletedDate": secret.get('deleted_date', None),
"Description": "", "Description": "",
"KmsKeyId": "", "KmsKeyId": "",
"LastAccessedDate": None, "LastAccessedDate": None,
@ -218,10 +230,10 @@ class SecretsManagerBackend(BaseBackend):
if not self._is_valid_identifier(secret_id): if not self._is_valid_identifier(secret_id):
raise ResourceNotFoundException raise ResourceNotFoundException
if not force_delete_without_recovery: if 'deleted_date' in self.secrets[secret_id]:
raise InvalidParameterException( raise InvalidRequestException(
"An error occurred (InvalidParameterException) when calling the DeleteSecret operation: \ "An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \
ForceDeleteWithoutRecovery must be true (Moto cannot simulate soft deletion with a recovery window)" perform the operation on a secret that's currently marked deleted."
) )
if recovery_window_in_days and force_delete_without_recovery: if recovery_window_in_days and force_delete_without_recovery:
@ -230,9 +242,20 @@ class SecretsManagerBackend(BaseBackend):
use ForceDeleteWithoutRecovery in conjunction with RecoveryWindowInDays." use ForceDeleteWithoutRecovery in conjunction with RecoveryWindowInDays."
) )
secret = self.secrets.pop(secret_id, None) if recovery_window_in_days and (recovery_window_in_days < 7 or recovery_window_in_days > 30):
raise InvalidParameterException(
"An error occurred (InvalidParameterException) when calling the DeleteSecret operation: The \
RecoveryWindowInDays value must be between 7 and 30 days (inclusive)."
)
deletion_date = int(time.time()) deletion_date = datetime.datetime.utcnow()
if force_delete_without_recovery:
secret = self.secrets.pop(secret_id, None)
else:
deletion_date += datetime.timedelta(days=recovery_window_in_days or 30)
self.secrets[secret_id]['deleted_date'] = self._unix_time_secs(deletion_date)
secret = self.secrets.get(secret_id, None)
if not secret: if not secret:
raise ResourceNotFoundException raise ResourceNotFoundException
@ -240,7 +263,7 @@ class SecretsManagerBackend(BaseBackend):
arn = secret_arn(self.region, secret['secret_id']) arn = secret_arn(self.region, secret['secret_id'])
name = secret['name'] name = secret['name']
return arn, name, deletion_date return arn, name, self._unix_time_secs(deletion_date)
available_regions = ( available_regions = (

View File

@ -6,7 +6,7 @@ from moto import mock_secretsmanager
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
import sure # noqa import sure # noqa
import string import string
from datetime import datetime from datetime import datetime, timezone
import unittest import unittest
from nose.tools import assert_raises from nose.tools import assert_raises
@ -35,6 +35,20 @@ def test_get_secret_that_does_not_match():
with assert_raises(ClientError): with assert_raises(ClientError):
result = conn.get_secret_value(SecretId='i-dont-match') result = conn.get_secret_value(SecretId='i-dont-match')
@mock_secretsmanager
def test_get_secret_value_that_is_marked_deleted():
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name='test-secret',
SecretString='foosecret')
deleted_secret = conn.delete_secret(SecretId='test-secret')
with assert_raises(ClientError):
result = conn.get_secret_value(SecretId='test-secret')
@mock_secretsmanager @mock_secretsmanager
def test_create_secret(): def test_create_secret():
conn = boto3.client('secretsmanager', region_name='us-east-1') conn = boto3.client('secretsmanager', region_name='us-east-1')
@ -67,16 +81,33 @@ def test_create_secret_with_tags():
def test_delete_secret(): def test_delete_secret():
conn = boto3.client('secretsmanager', region_name='us-west-2') conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name='test-secret',
SecretString='foosecret')
deleted_secret = conn.delete_secret(SecretId='test-secret')
assert deleted_secret['ARN']
assert deleted_secret['Name'] == 'test-secret'
assert deleted_secret['DeletionDate'] > datetime.fromtimestamp(1, timezone.utc)
secret_details = conn.describe_secret(SecretId='test-secret')
assert secret_details['ARN']
assert secret_details['Name'] == 'test-secret'
assert secret_details['DeletedDate'] > datetime.fromtimestamp(1, timezone.utc)
@mock_secretsmanager
def test_delete_secret_force():
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name='test-secret', conn.create_secret(Name='test-secret',
SecretString='foosecret') SecretString='foosecret')
result = conn.delete_secret(SecretId='test-secret', ForceDeleteWithoutRecovery=True) result = conn.delete_secret(SecretId='test-secret', ForceDeleteWithoutRecovery=True)
deletion_date = result['DeletionDate']
assert result['ARN'] assert result['ARN']
assert result['DeletionDate'] > datetime.fromtimestamp(1, timezone.utc)
assert deletion_date > datetime.fromtimestamp(1, deletion_date.tzinfo)
assert result['Name'] == 'test-secret' assert result['Name'] == 'test-secret'
with assert_raises(ClientError): with assert_raises(ClientError):
@ -91,17 +122,6 @@ def test_delete_secret_that_does_not_exist():
result = conn.delete_secret(SecretId='i-dont-exist', ForceDeleteWithoutRecovery=True) result = conn.delete_secret(SecretId='i-dont-exist', ForceDeleteWithoutRecovery=True)
@mock_secretsmanager
def test_delete_secret_requires_force_delete_flag():
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name='test-secret',
SecretString='foosecret')
with assert_raises(ClientError):
result = conn.delete_secret(SecretId='test-secret', ForceDeleteWithoutRecovery=False)
@mock_secretsmanager @mock_secretsmanager
def test_delete_secret_fails_with_both_force_delete_flag_and_recovery_window_flag(): def test_delete_secret_fails_with_both_force_delete_flag_and_recovery_window_flag():
conn = boto3.client('secretsmanager', region_name='us-west-2') conn = boto3.client('secretsmanager', region_name='us-west-2')
@ -113,6 +133,41 @@ def test_delete_secret_fails_with_both_force_delete_flag_and_recovery_window_fla
result = conn.delete_secret(SecretId='test-secret', RecoveryWindowInDays=1, ForceDeleteWithoutRecovery=True) result = conn.delete_secret(SecretId='test-secret', RecoveryWindowInDays=1, ForceDeleteWithoutRecovery=True)
@mock_secretsmanager
def test_delete_secret_recovery_window_too_short():
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name='test-secret',
SecretString='foosecret')
with assert_raises(ClientError):
result = conn.delete_secret(SecretId='test-secret', RecoveryWindowInDays=6)
@mock_secretsmanager
def test_delete_secret_recovery_window_too_long():
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name='test-secret',
SecretString='foosecret')
with assert_raises(ClientError):
result = conn.delete_secret(SecretId='test-secret', RecoveryWindowInDays=31)
@mock_secretsmanager
def test_delete_secret_that_is_marked_deleted():
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name='test-secret',
SecretString='foosecret')
deleted_secret = conn.delete_secret(SecretId='test-secret')
with assert_raises(ClientError):
result = conn.delete_secret(SecretId='test-secret')
@mock_secretsmanager @mock_secretsmanager
def test_get_random_password_default_length(): def test_get_random_password_default_length():
conn = boto3.client('secretsmanager', region_name='us-west-2') conn = boto3.client('secretsmanager', region_name='us-west-2')