SecretsManager: rotate_secret() now supports the RotateImmediately-parameter (#7347)

This commit is contained in:
Bert Blommers 2024-02-16 22:19:46 +00:00 committed by GitHub
parent b9d7c20d14
commit 4b898740e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 114 additions and 7 deletions

View File

@ -660,6 +660,7 @@ class SecretsManagerBackend(BaseBackend):
client_request_token: Optional[str] = None, client_request_token: Optional[str] = None,
rotation_lambda_arn: Optional[str] = None, rotation_lambda_arn: Optional[str] = None,
rotation_rules: Optional[Dict[str, Any]] = None, rotation_rules: Optional[Dict[str, Any]] = None,
rotate_immediately: bool = True,
) -> str: ) -> str:
rotation_days = "AutomaticallyAfterDays" rotation_days = "AutomaticallyAfterDays"
@ -758,25 +759,32 @@ class SecretsManagerBackend(BaseBackend):
response_headers: Dict[str, Any] = {} response_headers: Dict[str, Any] = {}
try: try:
func = lambda_backend.get_function(secret.rotation_lambda_arn) lambda_backend.get_function(secret.rotation_lambda_arn)
except Exception: except Exception:
msg = f"Resource not found for ARN '{secret.rotation_lambda_arn}'." msg = f"Resource not found for ARN '{secret.rotation_lambda_arn}'."
raise ResourceNotFoundException(msg) raise ResourceNotFoundException(msg)
for step in ["create", "set", "test", "finish"]: rotation_steps = ["create", "set", "test", "finish"]
func.invoke( if not rotate_immediately:
json.dumps( # if you don't immediately rotate the secret,
# Secrets Manager tests the rotation configuration by running the testSecretstep of the Lambda rotation function.
rotation_steps = ["test"]
for step in rotation_steps:
lambda_backend.invoke(
secret.rotation_lambda_arn,
qualifier=None,
body=json.dumps(
{ {
"Step": step + "Secret", "Step": step + "Secret",
"SecretId": secret.name, "SecretId": secret.name,
"ClientRequestToken": new_version_id, "ClientRequestToken": new_version_id,
} }
), ),
request_headers, headers=request_headers,
response_headers, response_headers=response_headers,
) )
secret.set_default_version_id(new_version_id) secret.set_default_version_id(new_version_id)
elif secret.versions: elif secret.versions:
# AWS will always require a Lambda ARN # AWS will always require a Lambda ARN
# without that, Moto can still apply the 'AWSCURRENT'-label # without that, Moto can still apply the 'AWSCURRENT'-label

View File

@ -120,11 +120,13 @@ class SecretsManagerResponse(BaseResponse):
rotation_lambda_arn = self._get_param("RotationLambdaARN") rotation_lambda_arn = self._get_param("RotationLambdaARN")
rotation_rules = self._get_param("RotationRules") rotation_rules = self._get_param("RotationRules")
secret_id = self._get_param("SecretId") secret_id = self._get_param("SecretId")
rotate_immediately = self._get_bool_param("RotateImmediately", True)
return self.backend.rotate_secret( return self.backend.rotate_secret(
secret_id=secret_id, secret_id=secret_id,
client_request_token=client_request_token, client_request_token=client_request_token,
rotation_lambda_arn=rotation_lambda_arn, rotation_lambda_arn=rotation_lambda_arn,
rotation_rules=rotation_rules, rotation_rules=rotation_rules,
rotate_immediately=rotate_immediately,
) )
def put_secret_value(self) -> str: def put_secret_value(self) -> str:

View File

@ -379,7 +379,9 @@ def test_invoke_lambda_with_proxy():
assert json.loads(payload) == expected_payload assert json.loads(payload) == expected_payload
@pytest.mark.network
@mock_aws @mock_aws
@requires_docker
def test_invoke_lambda_with_entrypoint(): def test_invoke_lambda_with_entrypoint():
conn = boto3.client("lambda", _lambda_region) conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6] function_name = str(uuid4())[0:6]

View File

@ -0,0 +1,95 @@
import io
import json
import zipfile
from unittest import SkipTest
from unittest.mock import patch
import boto3
from botocore.exceptions import ClientError
from moto import mock_aws, settings
secret_steps = []
def mock_lambda_invoke(*args, **kwarg):
secret_steps.append(json.loads(kwarg["body"])["Step"])
return "n/a"
@mock_aws(config={"lambda": {"use_docker": False}})
@patch(
"moto.awslambda_simple.models.LambdaSimpleBackend.invoke", new=mock_lambda_invoke
)
def test_simple_lambda_is_invoked():
if not settings.TEST_DECORATOR_MODE:
raise SkipTest("Can only test patched code in DecoratorMode")
sm_client = boto3.client("secretsmanager", region_name="us-east-1")
secret_arn = sm_client.create_secret(Name="some", SecretString="secret")["ARN"]
lambda_res = create_mock_rotator_lambda()
sm_client.rotate_secret(
SecretId=secret_arn,
RotationLambdaARN=lambda_res["FunctionArn"],
RotationRules={"AutomaticallyAfterDays": 1, "Duration": "1h"},
RotateImmediately=True,
)
assert secret_steps == ["createSecret", "setSecret", "testSecret", "finishSecret"]
secret_steps.clear()
@mock_aws(config={"lambda": {"use_docker": False}})
@patch(
"moto.awslambda_simple.models.LambdaSimpleBackend.invoke", new=mock_lambda_invoke
)
def test_simple_lambda_is_invoked__do_not_rotate_immediately():
if not settings.TEST_DECORATOR_MODE:
raise SkipTest("Can only test patched code in DecoratorMode")
sm_client = boto3.client("secretsmanager", region_name="us-east-1")
secret_arn = sm_client.create_secret(Name="some", SecretString="secret")["ARN"]
lambda_res = create_mock_rotator_lambda()
sm_client.rotate_secret(
SecretId=secret_arn,
RotationLambdaARN=lambda_res["FunctionArn"],
RotationRules={"AutomaticallyAfterDays": 1, "Duration": "1h"},
RotateImmediately=False,
)
assert secret_steps == ["testSecret"]
secret_steps.clear()
def mock_lambda_zip():
code = """
def lambda_handler(event, context):
return event
"""
zip_output = io.BytesIO()
zip_file = zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED)
zip_file.writestr("lambda_function.py", code)
zip_file.close()
zip_output.seek(0)
return zip_output.read()
def create_mock_rotator_lambda():
client = boto3.client("lambda", region_name="us-east-1")
return client.create_function(
FunctionName="mock-rotator",
Runtime="python3.9",
Role=get_mock_role_arn(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": mock_lambda_zip()},
)
def get_mock_role_arn():
iam = boto3.client("iam", region_name="us-east-1")
try:
return iam.get_role(RoleName="my-role")["Role"]["Arn"]
except ClientError:
return iam.create_role(
RoleName="my-role",
AssumeRolePolicyDocument="some policy",
Path="/my-path/",
)["Role"]["Arn"]