Merge pull request #2145 from chriskilding/feature/secretsmanager-deletesecret
Feature: Secrets Manager delete-secret
This commit is contained in:
commit
664dc53cce
@ -3654,7 +3654,7 @@
|
||||
## secretsmanager - 33% implemented
|
||||
- [ ] cancel_rotate_secret
|
||||
- [X] create_secret
|
||||
- [ ] delete_secret
|
||||
- [X] delete_secret
|
||||
- [X] describe_secret
|
||||
- [X] get_random_password
|
||||
- [X] get_secret_value
|
||||
|
@ -27,3 +27,10 @@ class InvalidParameterException(SecretsManagerClientError):
|
||||
super(InvalidParameterException, self).__init__(
|
||||
'InvalidParameterException',
|
||||
message)
|
||||
|
||||
|
||||
class InvalidRequestException(SecretsManagerClientError):
|
||||
def __init__(self, message):
|
||||
super(InvalidRequestException, self).__init__(
|
||||
'InvalidRequestException',
|
||||
message)
|
||||
|
@ -3,6 +3,7 @@ from __future__ import unicode_literals
|
||||
import time
|
||||
import json
|
||||
import uuid
|
||||
import datetime
|
||||
|
||||
import boto3
|
||||
|
||||
@ -10,6 +11,7 @@ from moto.core import BaseBackend, BaseModel
|
||||
from .exceptions import (
|
||||
ResourceNotFoundException,
|
||||
InvalidParameterException,
|
||||
InvalidRequestException,
|
||||
ClientError
|
||||
)
|
||||
from .utils import random_password, secret_arn
|
||||
@ -36,11 +38,21 @@ class SecretsManagerBackend(BaseBackend):
|
||||
def _is_valid_identifier(self, identifier):
|
||||
return identifier in self.secrets
|
||||
|
||||
def _unix_time_secs(self, dt):
|
||||
epoch = datetime.datetime.utcfromtimestamp(0)
|
||||
return (dt - epoch).total_seconds()
|
||||
|
||||
def get_secret_value(self, secret_id, version_id, version_stage):
|
||||
|
||||
if not self._is_valid_identifier(secret_id):
|
||||
raise ResourceNotFoundException()
|
||||
|
||||
if 'deleted_date' in self.secrets[secret_id]:
|
||||
raise InvalidRequestException(
|
||||
"An error occurred (InvalidRequestException) when calling the GetSecretValue operation: You tried to \
|
||||
perform the operation on a secret that's currently marked deleted."
|
||||
)
|
||||
|
||||
secret = self.secrets[secret_id]
|
||||
|
||||
response = json.dumps({
|
||||
@ -101,7 +113,7 @@ class SecretsManagerBackend(BaseBackend):
|
||||
"LastRotatedDate": None,
|
||||
"LastChangedDate": None,
|
||||
"LastAccessedDate": None,
|
||||
"DeletedDate": None,
|
||||
"DeletedDate": secret.get('deleted_date', None),
|
||||
"Tags": secret['tags']
|
||||
})
|
||||
|
||||
@ -115,6 +127,12 @@ class SecretsManagerBackend(BaseBackend):
|
||||
if not self._is_valid_identifier(secret_id):
|
||||
raise ResourceNotFoundException
|
||||
|
||||
if 'deleted_date' in self.secrets[secret_id]:
|
||||
raise InvalidRequestException(
|
||||
"An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \
|
||||
perform the operation on a secret that's currently marked deleted."
|
||||
)
|
||||
|
||||
if client_request_token:
|
||||
token_length = len(client_request_token)
|
||||
if token_length < 32 or token_length > 64:
|
||||
@ -193,7 +211,7 @@ class SecretsManagerBackend(BaseBackend):
|
||||
|
||||
secret_list = [{
|
||||
"ARN": secret_arn(self.region, secret['secret_id']),
|
||||
"DeletedDate": None,
|
||||
"DeletedDate": secret.get('deleted_date', None),
|
||||
"Description": "",
|
||||
"KmsKeyId": "",
|
||||
"LastAccessedDate": None,
|
||||
@ -213,6 +231,46 @@ class SecretsManagerBackend(BaseBackend):
|
||||
|
||||
return secret_list, None
|
||||
|
||||
def delete_secret(self, secret_id, recovery_window_in_days, force_delete_without_recovery):
|
||||
|
||||
if not self._is_valid_identifier(secret_id):
|
||||
raise ResourceNotFoundException
|
||||
|
||||
if 'deleted_date' in self.secrets[secret_id]:
|
||||
raise InvalidRequestException(
|
||||
"An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \
|
||||
perform the operation on a secret that's currently marked deleted."
|
||||
)
|
||||
|
||||
if recovery_window_in_days and force_delete_without_recovery:
|
||||
raise InvalidParameterException(
|
||||
"An error occurred (InvalidParameterException) when calling the DeleteSecret operation: You can't \
|
||||
use ForceDeleteWithoutRecovery in conjunction with RecoveryWindowInDays."
|
||||
)
|
||||
|
||||
if recovery_window_in_days and (recovery_window_in_days < 7 or recovery_window_in_days > 30):
|
||||
raise InvalidParameterException(
|
||||
"An error occurred (InvalidParameterException) when calling the DeleteSecret operation: The \
|
||||
RecoveryWindowInDays value must be between 7 and 30 days (inclusive)."
|
||||
)
|
||||
|
||||
deletion_date = datetime.datetime.utcnow()
|
||||
|
||||
if force_delete_without_recovery:
|
||||
secret = self.secrets.pop(secret_id, None)
|
||||
else:
|
||||
deletion_date += datetime.timedelta(days=recovery_window_in_days or 30)
|
||||
self.secrets[secret_id]['deleted_date'] = self._unix_time_secs(deletion_date)
|
||||
secret = self.secrets.get(secret_id, None)
|
||||
|
||||
if not secret:
|
||||
raise ResourceNotFoundException
|
||||
|
||||
arn = secret_arn(self.region, secret['secret_id'])
|
||||
name = secret['name']
|
||||
|
||||
return arn, name, self._unix_time_secs(deletion_date)
|
||||
|
||||
|
||||
available_regions = (
|
||||
boto3.session.Session().get_available_regions("secretsmanager")
|
||||
|
@ -75,3 +75,14 @@ class SecretsManagerResponse(BaseResponse):
|
||||
next_token=next_token,
|
||||
)
|
||||
return json.dumps(dict(SecretList=secret_list, NextToken=next_token))
|
||||
|
||||
def delete_secret(self):
|
||||
secret_id = self._get_param("SecretId")
|
||||
recovery_window_in_days = self._get_param("RecoveryWindowInDays")
|
||||
force_delete_without_recovery = self._get_param("ForceDeleteWithoutRecovery")
|
||||
arn, name, deletion_date = secretsmanager_backends[self.region].delete_secret(
|
||||
secret_id=secret_id,
|
||||
recovery_window_in_days=recovery_window_in_days,
|
||||
force_delete_without_recovery=force_delete_without_recovery,
|
||||
)
|
||||
return json.dumps(dict(ARN=arn, Name=name, DeletionDate=deletion_date))
|
||||
|
@ -6,6 +6,8 @@ from moto import mock_secretsmanager
|
||||
from botocore.exceptions import ClientError
|
||||
import sure # noqa
|
||||
import string
|
||||
import pytz
|
||||
from datetime import datetime
|
||||
import unittest
|
||||
from nose.tools import assert_raises
|
||||
|
||||
@ -34,6 +36,20 @@ def test_get_secret_that_does_not_match():
|
||||
with assert_raises(ClientError):
|
||||
result = conn.get_secret_value(SecretId='i-dont-match')
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_get_secret_value_that_is_marked_deleted():
|
||||
conn = boto3.client('secretsmanager', region_name='us-west-2')
|
||||
|
||||
conn.create_secret(Name='test-secret',
|
||||
SecretString='foosecret')
|
||||
|
||||
conn.delete_secret(SecretId='test-secret')
|
||||
|
||||
with assert_raises(ClientError):
|
||||
result = conn.get_secret_value(SecretId='test-secret')
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_create_secret():
|
||||
conn = boto3.client('secretsmanager', region_name='us-east-1')
|
||||
@ -61,6 +77,98 @@ def test_create_secret_with_tags():
|
||||
secret_details = conn.describe_secret(SecretId=secret_name)
|
||||
assert secret_details['Tags'] == [{"Key": "Foo", "Value": "Bar"}, {"Key": "Mykey", "Value": "Myvalue"}]
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_delete_secret():
|
||||
conn = boto3.client('secretsmanager', region_name='us-west-2')
|
||||
|
||||
conn.create_secret(Name='test-secret',
|
||||
SecretString='foosecret')
|
||||
|
||||
deleted_secret = conn.delete_secret(SecretId='test-secret')
|
||||
|
||||
assert deleted_secret['ARN']
|
||||
assert deleted_secret['Name'] == 'test-secret'
|
||||
assert deleted_secret['DeletionDate'] > datetime.fromtimestamp(1, pytz.utc)
|
||||
|
||||
secret_details = conn.describe_secret(SecretId='test-secret')
|
||||
|
||||
assert secret_details['ARN']
|
||||
assert secret_details['Name'] == 'test-secret'
|
||||
assert secret_details['DeletedDate'] > datetime.fromtimestamp(1, pytz.utc)
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_delete_secret_force():
|
||||
conn = boto3.client('secretsmanager', region_name='us-west-2')
|
||||
|
||||
conn.create_secret(Name='test-secret',
|
||||
SecretString='foosecret')
|
||||
|
||||
result = conn.delete_secret(SecretId='test-secret', ForceDeleteWithoutRecovery=True)
|
||||
|
||||
assert result['ARN']
|
||||
assert result['DeletionDate'] > datetime.fromtimestamp(1, pytz.utc)
|
||||
assert result['Name'] == 'test-secret'
|
||||
|
||||
with assert_raises(ClientError):
|
||||
result = conn.get_secret_value(SecretId='test-secret')
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_delete_secret_that_does_not_exist():
|
||||
conn = boto3.client('secretsmanager', region_name='us-west-2')
|
||||
|
||||
with assert_raises(ClientError):
|
||||
result = conn.delete_secret(SecretId='i-dont-exist', ForceDeleteWithoutRecovery=True)
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_delete_secret_fails_with_both_force_delete_flag_and_recovery_window_flag():
|
||||
conn = boto3.client('secretsmanager', region_name='us-west-2')
|
||||
|
||||
conn.create_secret(Name='test-secret',
|
||||
SecretString='foosecret')
|
||||
|
||||
with assert_raises(ClientError):
|
||||
result = conn.delete_secret(SecretId='test-secret', RecoveryWindowInDays=1, ForceDeleteWithoutRecovery=True)
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_delete_secret_recovery_window_too_short():
|
||||
conn = boto3.client('secretsmanager', region_name='us-west-2')
|
||||
|
||||
conn.create_secret(Name='test-secret',
|
||||
SecretString='foosecret')
|
||||
|
||||
with assert_raises(ClientError):
|
||||
result = conn.delete_secret(SecretId='test-secret', RecoveryWindowInDays=6)
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_delete_secret_recovery_window_too_long():
|
||||
conn = boto3.client('secretsmanager', region_name='us-west-2')
|
||||
|
||||
conn.create_secret(Name='test-secret',
|
||||
SecretString='foosecret')
|
||||
|
||||
with assert_raises(ClientError):
|
||||
result = conn.delete_secret(SecretId='test-secret', RecoveryWindowInDays=31)
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_delete_secret_that_is_marked_deleted():
|
||||
conn = boto3.client('secretsmanager', region_name='us-west-2')
|
||||
|
||||
conn.create_secret(Name='test-secret',
|
||||
SecretString='foosecret')
|
||||
|
||||
deleted_secret = conn.delete_secret(SecretId='test-secret')
|
||||
|
||||
with assert_raises(ClientError):
|
||||
result = conn.delete_secret(SecretId='test-secret')
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_get_random_password_default_length():
|
||||
conn = boto3.client('secretsmanager', region_name='us-west-2')
|
||||
@ -273,6 +381,20 @@ def test_rotate_secret_enable_rotation():
|
||||
assert rotated_description['RotationEnabled'] is True
|
||||
assert rotated_description['RotationRules']['AutomaticallyAfterDays'] == 42
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_rotate_secret_that_is_marked_deleted():
|
||||
conn = boto3.client('secretsmanager', region_name='us-west-2')
|
||||
|
||||
conn.create_secret(Name='test-secret',
|
||||
SecretString='foosecret')
|
||||
|
||||
conn.delete_secret(SecretId='test-secret')
|
||||
|
||||
with assert_raises(ClientError):
|
||||
result = conn.rotate_secret(SecretId='test-secret')
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_rotate_secret_that_does_not_exist():
|
||||
conn = boto3.client('secretsmanager', 'us-west-2')
|
||||
|
Loading…
Reference in New Issue
Block a user