Implement secretsmanager CancelRotateSecret (#5809)

This commit is contained in:
Abdullah Ahmed 2023-01-03 19:22:38 +00:00 committed by GitHub
parent d89e4d236c
commit 031f89dee0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 97 additions and 0 deletions

View File

@ -229,6 +229,29 @@ class SecretsManagerBackend(BaseBackend):
version_id = str(mock_random.uuid4())
return version_id
def cancel_rotate_secret(self, secret_id: str):
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
if self.secrets[secret_id].is_deleted():
raise InvalidRequestException(
"You tried to perform the operation on a secret that's currently marked deleted."
)
secret = self.secrets.get(key=secret_id)
if not secret.rotation_lambda_arn:
# This response doesn't make much sense for `CancelRotateSecret`, but this is what AWS has documented ...
# https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_CancelRotateSecret.html
raise InvalidRequestException(
(
"You tried to enable rotation on a secret that doesn't already have a Lambda function ARN configured"
"and you didn't include such an ARN as a parameter in this call."
)
)
secret.rotation_enabled = False
return secret.to_short_dict()
def get_secret_value(self, secret_id, version_id, version_stage):
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()

View File

@ -35,6 +35,10 @@ class SecretsManagerResponse(BaseResponse):
def backend(self):
return secretsmanager_backends[self.current_account][self.region]
def cancel_rotate_secret(self):
secret_id = self._get_param("SecretId")
return self.backend.cancel_rotate_secret(secret_id=secret_id)
def get_secret_value(self):
secret_id = self._get_param("SecretId")
version_id = self._get_param("VersionId")

View File

@ -13,6 +13,7 @@ from datetime import datetime, timedelta, timezone
import sure # noqa # pylint: disable=unused-import
from uuid import uuid4
import pytest
from unittest import SkipTest
DEFAULT_SECRET_NAME = "test-secret"
@ -630,6 +631,75 @@ def test_restore_secret_that_does_not_exist():
conn.restore_secret(SecretId="i-dont-exist")
@mock_secretsmanager
def test_cancel_rotate_secret_with_invalid_secret_id():
conn = boto3.client("secretsmanager", region_name="us-east-1")
with pytest.raises(ClientError):
conn.cancel_rotate_secret(SecretId="invalid_id")
@mock_secretsmanager
def test_cancel_rotate_secret_after_delete():
conn = boto3.client("secretsmanager", region_name="us-east-1")
conn.create_secret(
Name=DEFAULT_SECRET_NAME, SecretString="foosecret", Description="foodescription"
)
conn.delete_secret(
SecretId=DEFAULT_SECRET_NAME,
RecoveryWindowInDays=7,
ForceDeleteWithoutRecovery=False,
)
with pytest.raises(ClientError):
conn.cancel_rotate_secret(SecretId=DEFAULT_SECRET_NAME)
@mock_secretsmanager
def test_cancel_rotate_secret_before_enable():
conn = boto3.client("secretsmanager", region_name="us-east-1")
conn.create_secret(
Name=DEFAULT_SECRET_NAME, SecretString="foosecret", Description="foodescription"
)
with pytest.raises(ClientError):
conn.cancel_rotate_secret(SecretId=DEFAULT_SECRET_NAME)
@mock_secretsmanager
def test_cancel_rotate_secret():
if not settings.TEST_SERVER_MODE:
raise SkipTest("rotation requires a server to be running")
from tests.test_awslambda.utilities import get_role_name
lambda_conn = boto3.client(
"lambda", region_name="us-east-1", endpoint_url="http://localhost:5000"
)
func = lambda_conn.create_function(
FunctionName="testFunction",
Runtime="python3.8",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_rotation_zip_file()},
Description="Secret rotator",
Timeout=3,
MemorySize=128,
Publish=True,
)
secrets_conn = boto3.client("secretsmanager", region_name="us-east-1")
secrets_conn.create_secret(
Name=DEFAULT_SECRET_NAME, SecretString="foosecret", Description="foodescription"
)
secrets_conn.rotate_secret(
SecretId=DEFAULT_SECRET_NAME,
RotationLambdaARN=func["FunctionArn"],
RotationRules=dict(AutomaticallyAfterDays=30),
)
secrets_conn.cancel_rotate_secret(SecretId=DEFAULT_SECRET_NAME)
cancelled_rotation = secrets_conn.describe_secret(SecretId=DEFAULT_SECRET_NAME)
assert not cancelled_rotation["RotationEnabled"]
# The function config should be preserved
assert cancelled_rotation["RotationLambdaARN"]
@mock_secretsmanager
def test_rotate_secret():
conn = boto3.client("secretsmanager", region_name="us-west-2")