From 4b898740e89d87a3fd36d0104d1705301f819cff Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Fri, 16 Feb 2024 22:19:46 +0000 Subject: [PATCH] SecretsManager: rotate_secret() now supports the RotateImmediately-parameter (#7347) --- moto/secretsmanager/models.py | 22 +++-- moto/secretsmanager/responses.py | 2 + tests/test_awslambda/test_lambda_invoke.py | 2 + .../test_rotate_simple_lambda.py | 95 +++++++++++++++++++ 4 files changed, 114 insertions(+), 7 deletions(-) create mode 100644 tests/test_secretsmanager/test_rotate_simple_lambda.py diff --git a/moto/secretsmanager/models.py b/moto/secretsmanager/models.py index 229576c9d..3414efa2a 100644 --- a/moto/secretsmanager/models.py +++ b/moto/secretsmanager/models.py @@ -660,6 +660,7 @@ class SecretsManagerBackend(BaseBackend): client_request_token: Optional[str] = None, rotation_lambda_arn: Optional[str] = None, rotation_rules: Optional[Dict[str, Any]] = None, + rotate_immediately: bool = True, ) -> str: rotation_days = "AutomaticallyAfterDays" @@ -758,25 +759,32 @@ class SecretsManagerBackend(BaseBackend): response_headers: Dict[str, Any] = {} try: - func = lambda_backend.get_function(secret.rotation_lambda_arn) + lambda_backend.get_function(secret.rotation_lambda_arn) except Exception: msg = f"Resource not found for ARN '{secret.rotation_lambda_arn}'." raise ResourceNotFoundException(msg) - for step in ["create", "set", "test", "finish"]: - func.invoke( - json.dumps( + rotation_steps = ["create", "set", "test", "finish"] + if not rotate_immediately: + # if you don't immediately rotate the secret, + # Secrets Manager tests the rotation configuration by running the testSecretstep of the Lambda rotation function. + rotation_steps = ["test"] + for step in rotation_steps: + lambda_backend.invoke( + secret.rotation_lambda_arn, + qualifier=None, + body=json.dumps( { "Step": step + "Secret", "SecretId": secret.name, "ClientRequestToken": new_version_id, } ), - request_headers, - response_headers, + headers=request_headers, + response_headers=response_headers, ) - secret.set_default_version_id(new_version_id) + elif secret.versions: # AWS will always require a Lambda ARN # without that, Moto can still apply the 'AWSCURRENT'-label diff --git a/moto/secretsmanager/responses.py b/moto/secretsmanager/responses.py index d4cd60116..f1b107408 100644 --- a/moto/secretsmanager/responses.py +++ b/moto/secretsmanager/responses.py @@ -120,11 +120,13 @@ class SecretsManagerResponse(BaseResponse): rotation_lambda_arn = self._get_param("RotationLambdaARN") rotation_rules = self._get_param("RotationRules") secret_id = self._get_param("SecretId") + rotate_immediately = self._get_bool_param("RotateImmediately", True) return self.backend.rotate_secret( secret_id=secret_id, client_request_token=client_request_token, rotation_lambda_arn=rotation_lambda_arn, rotation_rules=rotation_rules, + rotate_immediately=rotate_immediately, ) def put_secret_value(self) -> str: diff --git a/tests/test_awslambda/test_lambda_invoke.py b/tests/test_awslambda/test_lambda_invoke.py index 7000cc163..318564c36 100644 --- a/tests/test_awslambda/test_lambda_invoke.py +++ b/tests/test_awslambda/test_lambda_invoke.py @@ -379,7 +379,9 @@ def test_invoke_lambda_with_proxy(): assert json.loads(payload) == expected_payload +@pytest.mark.network @mock_aws +@requires_docker def test_invoke_lambda_with_entrypoint(): conn = boto3.client("lambda", _lambda_region) function_name = str(uuid4())[0:6] diff --git a/tests/test_secretsmanager/test_rotate_simple_lambda.py b/tests/test_secretsmanager/test_rotate_simple_lambda.py new file mode 100644 index 000000000..07c3a994b --- /dev/null +++ b/tests/test_secretsmanager/test_rotate_simple_lambda.py @@ -0,0 +1,95 @@ +import io +import json +import zipfile +from unittest import SkipTest +from unittest.mock import patch + +import boto3 +from botocore.exceptions import ClientError + +from moto import mock_aws, settings + +secret_steps = [] + + +def mock_lambda_invoke(*args, **kwarg): + secret_steps.append(json.loads(kwarg["body"])["Step"]) + return "n/a" + + +@mock_aws(config={"lambda": {"use_docker": False}}) +@patch( + "moto.awslambda_simple.models.LambdaSimpleBackend.invoke", new=mock_lambda_invoke +) +def test_simple_lambda_is_invoked(): + if not settings.TEST_DECORATOR_MODE: + raise SkipTest("Can only test patched code in DecoratorMode") + sm_client = boto3.client("secretsmanager", region_name="us-east-1") + secret_arn = sm_client.create_secret(Name="some", SecretString="secret")["ARN"] + + lambda_res = create_mock_rotator_lambda() + sm_client.rotate_secret( + SecretId=secret_arn, + RotationLambdaARN=lambda_res["FunctionArn"], + RotationRules={"AutomaticallyAfterDays": 1, "Duration": "1h"}, + RotateImmediately=True, + ) + assert secret_steps == ["createSecret", "setSecret", "testSecret", "finishSecret"] + secret_steps.clear() + + +@mock_aws(config={"lambda": {"use_docker": False}}) +@patch( + "moto.awslambda_simple.models.LambdaSimpleBackend.invoke", new=mock_lambda_invoke +) +def test_simple_lambda_is_invoked__do_not_rotate_immediately(): + if not settings.TEST_DECORATOR_MODE: + raise SkipTest("Can only test patched code in DecoratorMode") + sm_client = boto3.client("secretsmanager", region_name="us-east-1") + secret_arn = sm_client.create_secret(Name="some", SecretString="secret")["ARN"] + + lambda_res = create_mock_rotator_lambda() + sm_client.rotate_secret( + SecretId=secret_arn, + RotationLambdaARN=lambda_res["FunctionArn"], + RotationRules={"AutomaticallyAfterDays": 1, "Duration": "1h"}, + RotateImmediately=False, + ) + assert secret_steps == ["testSecret"] + secret_steps.clear() + + +def mock_lambda_zip(): + code = """ + def lambda_handler(event, context): + return event + """ + zip_output = io.BytesIO() + zip_file = zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED) + zip_file.writestr("lambda_function.py", code) + zip_file.close() + zip_output.seek(0) + return zip_output.read() + + +def create_mock_rotator_lambda(): + client = boto3.client("lambda", region_name="us-east-1") + return client.create_function( + FunctionName="mock-rotator", + Runtime="python3.9", + Role=get_mock_role_arn(), + Handler="lambda_function.lambda_handler", + Code={"ZipFile": mock_lambda_zip()}, + ) + + +def get_mock_role_arn(): + iam = boto3.client("iam", region_name="us-east-1") + try: + return iam.get_role(RoleName="my-role")["Role"]["Arn"] + except ClientError: + return iam.create_role( + RoleName="my-role", + AssumeRolePolicyDocument="some policy", + Path="/my-path/", + )["Role"]["Arn"]