diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index 3cce40df9..2afd3f19c 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -3654,7 +3654,7 @@ ## secretsmanager - 33% implemented - [ ] cancel_rotate_secret - [X] create_secret -- [ ] delete_secret +- [X] delete_secret - [X] describe_secret - [X] get_random_password - [X] get_secret_value diff --git a/moto/secretsmanager/exceptions.py b/moto/secretsmanager/exceptions.py index a72a32645..06010c411 100644 --- a/moto/secretsmanager/exceptions.py +++ b/moto/secretsmanager/exceptions.py @@ -27,3 +27,10 @@ class InvalidParameterException(SecretsManagerClientError): super(InvalidParameterException, self).__init__( 'InvalidParameterException', message) + + +class InvalidRequestException(SecretsManagerClientError): + def __init__(self, message): + super(InvalidRequestException, self).__init__( + 'InvalidRequestException', + message) diff --git a/moto/secretsmanager/models.py b/moto/secretsmanager/models.py index 74cf89039..af8846a66 100644 --- a/moto/secretsmanager/models.py +++ b/moto/secretsmanager/models.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals import time import json import uuid +import datetime import boto3 @@ -10,6 +11,7 @@ from moto.core import BaseBackend, BaseModel from .exceptions import ( ResourceNotFoundException, InvalidParameterException, + InvalidRequestException, ClientError ) from .utils import random_password, secret_arn @@ -36,11 +38,21 @@ class SecretsManagerBackend(BaseBackend): def _is_valid_identifier(self, identifier): return identifier in self.secrets + def _unix_time_secs(self, dt): + epoch = datetime.datetime.utcfromtimestamp(0) + return (dt - epoch).total_seconds() + def get_secret_value(self, secret_id, version_id, version_stage): if not self._is_valid_identifier(secret_id): raise ResourceNotFoundException() + if 'deleted_date' in self.secrets[secret_id]: + raise InvalidRequestException( + "An error occurred (InvalidRequestException) when calling the GetSecretValue operation: You tried to \ + perform the operation on a secret that's currently marked deleted." + ) + secret = self.secrets[secret_id] response = json.dumps({ @@ -101,7 +113,7 @@ class SecretsManagerBackend(BaseBackend): "LastRotatedDate": None, "LastChangedDate": None, "LastAccessedDate": None, - "DeletedDate": None, + "DeletedDate": secret.get('deleted_date', None), "Tags": secret['tags'] }) @@ -115,6 +127,12 @@ class SecretsManagerBackend(BaseBackend): if not self._is_valid_identifier(secret_id): raise ResourceNotFoundException + if 'deleted_date' in self.secrets[secret_id]: + raise InvalidRequestException( + "An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \ + perform the operation on a secret that's currently marked deleted." + ) + if client_request_token: token_length = len(client_request_token) if token_length < 32 or token_length > 64: @@ -193,7 +211,7 @@ class SecretsManagerBackend(BaseBackend): secret_list = [{ "ARN": secret_arn(self.region, secret['secret_id']), - "DeletedDate": None, + "DeletedDate": secret.get('deleted_date', None), "Description": "", "KmsKeyId": "", "LastAccessedDate": None, @@ -213,6 +231,46 @@ class SecretsManagerBackend(BaseBackend): return secret_list, None + def delete_secret(self, secret_id, recovery_window_in_days, force_delete_without_recovery): + + if not self._is_valid_identifier(secret_id): + raise ResourceNotFoundException + + if 'deleted_date' in self.secrets[secret_id]: + raise InvalidRequestException( + "An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \ + perform the operation on a secret that's currently marked deleted." + ) + + if recovery_window_in_days and force_delete_without_recovery: + raise InvalidParameterException( + "An error occurred (InvalidParameterException) when calling the DeleteSecret operation: You can't \ + use ForceDeleteWithoutRecovery in conjunction with RecoveryWindowInDays." + ) + + if recovery_window_in_days and (recovery_window_in_days < 7 or recovery_window_in_days > 30): + raise InvalidParameterException( + "An error occurred (InvalidParameterException) when calling the DeleteSecret operation: The \ + RecoveryWindowInDays value must be between 7 and 30 days (inclusive)." + ) + + deletion_date = datetime.datetime.utcnow() + + if force_delete_without_recovery: + secret = self.secrets.pop(secret_id, None) + else: + deletion_date += datetime.timedelta(days=recovery_window_in_days or 30) + self.secrets[secret_id]['deleted_date'] = self._unix_time_secs(deletion_date) + secret = self.secrets.get(secret_id, None) + + if not secret: + raise ResourceNotFoundException + + arn = secret_arn(self.region, secret['secret_id']) + name = secret['name'] + + return arn, name, self._unix_time_secs(deletion_date) + available_regions = ( boto3.session.Session().get_available_regions("secretsmanager") diff --git a/moto/secretsmanager/responses.py b/moto/secretsmanager/responses.py index f017ec0dc..a33890202 100644 --- a/moto/secretsmanager/responses.py +++ b/moto/secretsmanager/responses.py @@ -75,3 +75,14 @@ class SecretsManagerResponse(BaseResponse): next_token=next_token, ) return json.dumps(dict(SecretList=secret_list, NextToken=next_token)) + + def delete_secret(self): + secret_id = self._get_param("SecretId") + recovery_window_in_days = self._get_param("RecoveryWindowInDays") + force_delete_without_recovery = self._get_param("ForceDeleteWithoutRecovery") + arn, name, deletion_date = secretsmanager_backends[self.region].delete_secret( + secret_id=secret_id, + recovery_window_in_days=recovery_window_in_days, + force_delete_without_recovery=force_delete_without_recovery, + ) + return json.dumps(dict(ARN=arn, Name=name, DeletionDate=deletion_date)) diff --git a/tests/test_secretsmanager/test_secretsmanager.py b/tests/test_secretsmanager/test_secretsmanager.py index 6146698d9..7456c7d13 100644 --- a/tests/test_secretsmanager/test_secretsmanager.py +++ b/tests/test_secretsmanager/test_secretsmanager.py @@ -6,6 +6,8 @@ from moto import mock_secretsmanager from botocore.exceptions import ClientError import sure # noqa import string +import pytz +from datetime import datetime import unittest from nose.tools import assert_raises @@ -34,6 +36,20 @@ def test_get_secret_that_does_not_match(): with assert_raises(ClientError): result = conn.get_secret_value(SecretId='i-dont-match') + +@mock_secretsmanager +def test_get_secret_value_that_is_marked_deleted(): + conn = boto3.client('secretsmanager', region_name='us-west-2') + + conn.create_secret(Name='test-secret', + SecretString='foosecret') + + conn.delete_secret(SecretId='test-secret') + + with assert_raises(ClientError): + result = conn.get_secret_value(SecretId='test-secret') + + @mock_secretsmanager def test_create_secret(): conn = boto3.client('secretsmanager', region_name='us-east-1') @@ -61,6 +77,98 @@ def test_create_secret_with_tags(): secret_details = conn.describe_secret(SecretId=secret_name) assert secret_details['Tags'] == [{"Key": "Foo", "Value": "Bar"}, {"Key": "Mykey", "Value": "Myvalue"}] + +@mock_secretsmanager +def test_delete_secret(): + conn = boto3.client('secretsmanager', region_name='us-west-2') + + conn.create_secret(Name='test-secret', + SecretString='foosecret') + + deleted_secret = conn.delete_secret(SecretId='test-secret') + + assert deleted_secret['ARN'] + assert deleted_secret['Name'] == 'test-secret' + assert deleted_secret['DeletionDate'] > datetime.fromtimestamp(1, pytz.utc) + + secret_details = conn.describe_secret(SecretId='test-secret') + + assert secret_details['ARN'] + assert secret_details['Name'] == 'test-secret' + assert secret_details['DeletedDate'] > datetime.fromtimestamp(1, pytz.utc) + + +@mock_secretsmanager +def test_delete_secret_force(): + conn = boto3.client('secretsmanager', region_name='us-west-2') + + conn.create_secret(Name='test-secret', + SecretString='foosecret') + + result = conn.delete_secret(SecretId='test-secret', ForceDeleteWithoutRecovery=True) + + assert result['ARN'] + assert result['DeletionDate'] > datetime.fromtimestamp(1, pytz.utc) + assert result['Name'] == 'test-secret' + + with assert_raises(ClientError): + result = conn.get_secret_value(SecretId='test-secret') + + +@mock_secretsmanager +def test_delete_secret_that_does_not_exist(): + conn = boto3.client('secretsmanager', region_name='us-west-2') + + with assert_raises(ClientError): + result = conn.delete_secret(SecretId='i-dont-exist', ForceDeleteWithoutRecovery=True) + + +@mock_secretsmanager +def test_delete_secret_fails_with_both_force_delete_flag_and_recovery_window_flag(): + conn = boto3.client('secretsmanager', region_name='us-west-2') + + conn.create_secret(Name='test-secret', + SecretString='foosecret') + + with assert_raises(ClientError): + result = conn.delete_secret(SecretId='test-secret', RecoveryWindowInDays=1, ForceDeleteWithoutRecovery=True) + + +@mock_secretsmanager +def test_delete_secret_recovery_window_too_short(): + conn = boto3.client('secretsmanager', region_name='us-west-2') + + conn.create_secret(Name='test-secret', + SecretString='foosecret') + + with assert_raises(ClientError): + result = conn.delete_secret(SecretId='test-secret', RecoveryWindowInDays=6) + + +@mock_secretsmanager +def test_delete_secret_recovery_window_too_long(): + conn = boto3.client('secretsmanager', region_name='us-west-2') + + conn.create_secret(Name='test-secret', + SecretString='foosecret') + + with assert_raises(ClientError): + result = conn.delete_secret(SecretId='test-secret', RecoveryWindowInDays=31) + + +@mock_secretsmanager +def test_delete_secret_that_is_marked_deleted(): + conn = boto3.client('secretsmanager', region_name='us-west-2') + + conn.create_secret(Name='test-secret', + SecretString='foosecret') + + deleted_secret = conn.delete_secret(SecretId='test-secret') + + with assert_raises(ClientError): + result = conn.delete_secret(SecretId='test-secret') + + @mock_secretsmanager def test_get_random_password_default_length(): conn = boto3.client('secretsmanager', region_name='us-west-2') @@ -273,6 +381,20 @@ def test_rotate_secret_enable_rotation(): assert rotated_description['RotationEnabled'] is True assert rotated_description['RotationRules']['AutomaticallyAfterDays'] == 42 + +@mock_secretsmanager +def test_rotate_secret_that_is_marked_deleted(): + conn = boto3.client('secretsmanager', region_name='us-west-2') + + conn.create_secret(Name='test-secret', + SecretString='foosecret') + + conn.delete_secret(SecretId='test-secret') + + with assert_raises(ClientError): + result = conn.rotate_secret(SecretId='test-secret') + + @mock_secretsmanager def test_rotate_secret_that_does_not_exist(): conn = boto3.client('secretsmanager', 'us-west-2')