From a4b14986658a00eb93c1627c54b1e9ff2cad0eed Mon Sep 17 00:00:00 2001 From: Daniel Samuels Date: Tue, 11 May 2021 12:08:01 +0100 Subject: [PATCH] Support rotating secrets using Lambda [#3905] (#3912) * Support rotating secrets using Lambda The Secrets manager rotation process uses an AWS Lambda function to perform the rotation of a secret. [1] In fact, it's not possible to trigger rotation of a Secret without specifying a Lambda function at some point in the life of the secret: ``` $ aws secretsmanager rotate-secret --secret-id /rotationTest An error occurred (InvalidRequestException) when calling the RotateSecret operation: No Lambda rotation function ARN is associated with this secret. ``` `moto` can be a little more lenient in this regard and allow `rotate_secret` to be called without a Lambda function being present, if only to allow simulation of the `AWSCURRENT` and `AWSPREVIOUS` labels moving across versions. However, if a lambda function _has_ been specified when calling `rotate_secret`, it should be invoked therefore providing the developer with the full multi-stage process [3] which can be used to test the Lambda function itself and ensuring that full end-to-end testing is performed. Without this there's no easy way to configure the Secret in the state needed to provide the Lambda function with the data in the format it needs to be in at each step of the invocation process. [1]: https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets-lambda-function-overview.html [2]: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/secretsmanager.html#SecretsManager.Client.rotate_secret [3]: https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets-lambda-function-overview.html#rotation-explanation-of-steps * Run `black` over `secretsmanager/models.py` * Make `lambda_backends` import local to the condition * Implement `update_secret_version_stage` Allow a staging label to be moved across versions. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/secretsmanager.html#SecretsManager.Client.update_secret_version_stage * Add an integration test for Secrets Manager & Lambda * Support passing `ClientRequestToken` to `put_secret_value` By passing `ClientRequestToken` to `put_secret_value` within the lambda function invoked by calling `rotate_secret`, one can update the value associated with the existing (pending) version, without causing a new secret version to be created. * Add application logic for `AWSPENDING` The rotation function must end with the versions of the secret in one of two states: - The `AWSPENDING` and `AWSCURRENT` staging labels are attached to the same version of the secret, or - The `AWSPENDING` staging label is not attached to any version of the secret. If the `AWSPENDING` staging label is present but not attached to the same version as `AWSCURRENT` then any later invocation of RotateSecret assumes that a previous rotation request is still in progress and returns an error. * Update `default_version_id` after Lambda rotation concludes Call `set_default_version_id` directly, rather than going through `reset_default_version` as the Lambda function is responsible for moving the version labels around, not `rotate_secret`. * Run `black` over changed files * Fix Python 2.7 compatibility * Add additional test coverage for Secrets Manager * Fix bug found by tests AWSPENDING + AWSCURRENT check wasn't using `version_stages`. Also tidy up the AWSCURRENT moving in `update_secret_version_stage` to remove AWSPREVIOUS it from the new stage. * Run `black` over changed files * Add additional `rotate_secret` tests * Skip `test_rotate_secret_lambda_invocations` in test server mode * Add test for invalid Lambda ARN --- moto/secretsmanager/models.py | 139 +++++++- moto/secretsmanager/responses.py | 17 + .../test_secretsmanager.py | 127 +++++++- tests/test_secretsmanager/test_server.py | 296 +++++++++++++++++- 4 files changed, 564 insertions(+), 15 deletions(-) diff --git a/moto/secretsmanager/models.py b/moto/secretsmanager/models.py index 5542cb3e3..84b251035 100644 --- a/moto/secretsmanager/models.py +++ b/moto/secretsmanager/models.py @@ -269,13 +269,7 @@ class SecretsManagerBackend(BaseBackend): return secret.to_short_dict() def create_secret( - self, - name, - secret_string=None, - secret_binary=None, - description=None, - tags=[], - **kwargs + self, name, secret_string=None, secret_binary=None, description=None, tags=[] ): # error if secret exists @@ -325,7 +319,11 @@ class SecretsManagerBackend(BaseBackend): if secret_id in self.secrets: secret = self.secrets[secret_id] secret.update(description, tags) - secret.reset_default_version(secret_version, version_id) + + if "AWSPENDING" in version_stages: + secret.versions[version_id] = secret_version + else: + secret.reset_default_version(secret_version, version_id) else: secret = FakeSecret( region_name=self.region, @@ -341,7 +339,14 @@ class SecretsManagerBackend(BaseBackend): return secret - def put_secret_value(self, secret_id, secret_string, secret_binary, version_stages): + def put_secret_value( + self, + secret_id, + secret_string, + secret_binary, + client_request_token, + version_stages, + ): if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() @@ -354,6 +359,7 @@ class SecretsManagerBackend(BaseBackend): secret_id, secret_string, secret_binary, + version_id=client_request_token, description=description, tags=tags, version_stages=version_stages, @@ -410,26 +416,85 @@ class SecretsManagerBackend(BaseBackend): secret = self.secrets[secret_id] + # The rotation function must end with the versions of the secret in + # one of two states: + # + # - The AWSPENDING and AWSCURRENT staging labels are attached to the + # same version of the secret, or + # - The AWSPENDING staging label is not attached to any version of the secret. + # + # If the AWSPENDING staging label is present but not attached to the same + # version as AWSCURRENT then any later invocation of RotateSecret assumes + # that a previous rotation request is still in progress and returns an error. + try: + version = next( + version + for version in secret.versions.values() + if "AWSPENDING" in version["version_stages"] + ) + if "AWSCURRENT" in version["version_stages"]: + msg = "Previous rotation request is still in progress." + raise InvalidRequestException(msg) + + except StopIteration: + # Pending is not present in any version + pass + old_secret_version = secret.versions[secret.default_version_id] new_version_id = client_request_token or str(uuid.uuid4()) + # We add the new secret version as "pending". The previous version remains + # as "current" for now. Once we've passed the new secret through the lambda + # rotation function (if provided) we can then update the status to "current". self._add_secret( secret_id, old_secret_version["secret_string"], description=secret.description, tags=secret.tags, version_id=new_version_id, - version_stages=["AWSCURRENT"], + version_stages=["AWSPENDING"], ) - secret.rotation_lambda_arn = rotation_lambda_arn or "" if rotation_rules: secret.auto_rotate_after_days = rotation_rules.get(rotation_days, 0) if secret.auto_rotate_after_days > 0: secret.rotation_enabled = True - if "AWSCURRENT" in old_secret_version["version_stages"]: - old_secret_version["version_stages"].remove("AWSCURRENT") + # Begin the rotation process for the given secret by invoking the lambda function. + if secret.rotation_lambda_arn: + from moto.awslambda.models import lambda_backends + + lambda_backend = lambda_backends[self.region] + + request_headers = {} + response_headers = {} + + func = lambda_backend.get_function(secret.rotation_lambda_arn) + if not func: + msg = "Resource not found for ARN '{}'.".format( + secret.rotation_lambda_arn + ) + raise ResourceNotFoundException(msg) + + for step in ["create", "set", "test", "finish"]: + func.invoke( + json.dumps( + { + "Step": step + "Secret", + "SecretId": secret.name, + "ClientRequestToken": new_version_id, + } + ), + request_headers, + response_headers, + ) + + secret.set_default_version_id(new_version_id) + else: + secret.reset_default_version( + secret.versions[new_version_id], new_version_id + ) + secret.versions[new_version_id]["version_stages"] = ["AWSCURRENT"] return secret.to_short_dict() @@ -592,6 +657,54 @@ class SecretsManagerBackend(BaseBackend): return secret_id + def update_secret_version_stage( + self, secret_id, version_stage, remove_from_version_id, move_to_version_id + ): + if secret_id not in self.secrets.keys(): + raise SecretNotFoundException() + + secret = self.secrets[secret_id] + + if remove_from_version_id: + if remove_from_version_id not in secret.versions: + raise InvalidParameterException( + "Not a valid version: %s" % remove_from_version_id + ) + + stages = secret.versions[remove_from_version_id]["version_stages"] + if version_stage not in stages: + raise InvalidParameterException( + "Version stage %s not found in version %s" + % (version_stage, remove_from_version_id) + ) + + stages.remove(version_stage) + + if move_to_version_id: + if move_to_version_id not in secret.versions: + raise InvalidParameterException( + "Not a valid version: %s" % move_to_version_id + ) + + stages = secret.versions[move_to_version_id]["version_stages"] + stages.append(version_stage) + + if version_stage == "AWSCURRENT": + if remove_from_version_id: + # Whenever you move AWSCURRENT, Secrets Manager automatically + # moves the label AWSPREVIOUS to the version that AWSCURRENT + # was removed from. + secret.versions[remove_from_version_id]["version_stages"].append( + "AWSPREVIOUS" + ) + + if move_to_version_id: + stages = secret.versions[move_to_version_id]["version_stages"] + if "AWSPREVIOUS" in stages: + stages.remove("AWSPREVIOUS") + + return secret_id + @staticmethod def get_resource_policy(secret_id): resource_policy = { diff --git a/moto/secretsmanager/responses.py b/moto/secretsmanager/responses.py index a469f5baa..0433a565e 100644 --- a/moto/secretsmanager/responses.py +++ b/moto/secretsmanager/responses.py @@ -106,16 +106,21 @@ class SecretsManagerResponse(BaseResponse): secret_id = self._get_param("SecretId", if_none="") secret_string = self._get_param("SecretString") secret_binary = self._get_param("SecretBinary") + client_request_token = self._get_param("ClientRequestToken") if not secret_binary and not secret_string: raise InvalidRequestException( "You must provide either SecretString or SecretBinary." ) version_stages = self._get_param("VersionStages", if_none=["AWSCURRENT"]) + if not isinstance(version_stages, list): + version_stages = [version_stages] + return secretsmanager_backends[self.region].put_secret_value( secret_id=secret_id, secret_binary=secret_binary, secret_string=secret_string, version_stages=version_stages, + client_request_token=client_request_token, ) def list_secret_version_ids(self): @@ -169,3 +174,15 @@ class SecretsManagerResponse(BaseResponse): return secretsmanager_backends[self.region].untag_resource( secret_id=secret_id, tag_keys=tag_keys ) + + def update_secret_version_stage(self): + secret_id = self._get_param("SecretId") + version_stage = self._get_param("VersionStage") + remove_from_version_id = self._get_param("RemoveFromVersionId") + move_to_version_id = self._get_param("MoveToVersionId") + return secretsmanager_backends[self.region].update_secret_version_stage( + secret_id=secret_id, + version_stage=version_stage, + remove_from_version_id=remove_from_version_id, + move_to_version_id=move_to_version_id, + ) diff --git a/tests/test_secretsmanager/test_secretsmanager.py b/tests/test_secretsmanager/test_secretsmanager.py index 1e3515709..a60742942 100644 --- a/tests/test_secretsmanager/test_secretsmanager.py +++ b/tests/test_secretsmanager/test_secretsmanager.py @@ -3,7 +3,7 @@ from __future__ import unicode_literals import boto3 -from moto import mock_secretsmanager +from moto import mock_secretsmanager, mock_lambda, settings from botocore.exceptions import ClientError import string import pytz @@ -628,6 +628,131 @@ def test_rotate_secret_rotation_period_too_long(): ) +def get_rotation_zip_file(): + from tests.test_awslambda.test_lambda import _process_lambda + + func_str = """ +import boto3 +import json + +def lambda_handler(event, context): + arn = event['SecretId'] + token = event['ClientRequestToken'] + step = event['Step'] + + client = boto3.client("secretsmanager", region_name="us-west-2", endpoint_url="http://motoserver:5000") + metadata = client.describe_secret(SecretId=arn) + value = client.get_secret_value(SecretId=arn, VersionId=token, VersionStage="AWSPENDING") + + if not metadata['RotationEnabled']: + print("Secret %s is not enabled for rotation." % arn) + raise ValueError("Secret %s is not enabled for rotation." % arn) + versions = metadata['VersionIdsToStages'] + if token not in versions: + print("Secret version %s has no stage for rotation of secret %s." % (token, arn)) + raise ValueError("Secret version %s has no stage for rotation of secret %s." % (token, arn)) + if "AWSCURRENT" in versions[token]: + print("Secret version %s already set as AWSCURRENT for secret %s." % (token, arn)) + return + elif "AWSPENDING" not in versions[token]: + print("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn)) + raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn)) + + if step == 'createSecret': + try: + client.get_secret_value(SecretId=arn, VersionId=token, VersionStage='AWSPENDING') + except client.exceptions.ResourceNotFoundException: + client.put_secret_value( + SecretId=arn, + ClientRequestToken=token, + SecretString=json.dumps({'create': True}), + VersionStages=['AWSPENDING'] + ) + + if step == 'setSecret': + client.put_secret_value( + SecretId=arn, + ClientRequestToken=token, + SecretString='UpdatedValue', + VersionStages=["AWSPENDING"], + ) + + elif step == 'finishSecret': + current_version = next( + version + for version, stages in metadata['VersionIdsToStages'].items() + if 'AWSCURRENT' in stages + ) + print("current: %s new: %s" % (current_version, token)) + client.update_secret_version_stage( + SecretId=arn, + VersionStage='AWSCURRENT', + MoveToVersionId=token, + RemoveFromVersionId=current_version, + ) + client.update_secret_version_stage( + SecretId=arn, + VersionStage='AWSPENDING', + RemoveFromVersionId=token, + ) + """ + return _process_lambda(func_str) + + +if settings.TEST_SERVER_MODE: + + @mock_lambda + @mock_secretsmanager + def test_rotate_secret_using_lambda(): + from tests.test_awslambda.test_lambda import get_role_name + + # Passing a `RotationLambdaARN` value to `rotate_secret` should invoke lambda + lambda_conn = boto3.client( + "lambda", region_name="us-west-2", endpoint_url="http://localhost:5000", + ) + func = lambda_conn.create_function( + FunctionName="testFunction", + Runtime="python3.8", + Role=get_role_name(), + Handler="lambda_function.lambda_handler", + Code={"ZipFile": get_rotation_zip_file()}, + Description="Secret rotator", + Timeout=3, + MemorySize=128, + Publish=True, + ) + + secrets_conn = boto3.client( + "secretsmanager", + region_name="us-west-2", + endpoint_url="http://localhost:5000", + ) + secret = secrets_conn.create_secret( + Name=DEFAULT_SECRET_NAME, SecretString="InitialValue", + ) + initial_version = secret["VersionId"] + + rotated_secret = secrets_conn.rotate_secret( + SecretId=DEFAULT_SECRET_NAME, + RotationLambdaARN=func["FunctionArn"], + RotationRules=dict(AutomaticallyAfterDays=30,), + ) + + # Ensure we received an updated VersionId from `rotate_secret` + assert rotated_secret["VersionId"] != initial_version + + updated_secret = secrets_conn.get_secret_value( + SecretId=DEFAULT_SECRET_NAME, VersionStage="AWSCURRENT", + ) + rotated_version = updated_secret["VersionId"] + + assert initial_version != rotated_version + metadata = secrets_conn.describe_secret(SecretId=DEFAULT_SECRET_NAME) + assert metadata["VersionIdsToStages"][initial_version] == ["AWSPREVIOUS"] + assert metadata["VersionIdsToStages"][rotated_version] == ["AWSCURRENT"] + assert updated_secret["SecretString"] == "UpdatedValue" + + @mock_secretsmanager def test_put_secret_value_on_non_existing_secret(): conn = boto3.client("secretsmanager", region_name="us-west-2") diff --git a/tests/test_secretsmanager/test_server.py b/tests/test_secretsmanager/test_server.py index da41eb5fb..d5d9223ed 100644 --- a/tests/test_secretsmanager/test_server.py +++ b/tests/test_secretsmanager/test_server.py @@ -2,10 +2,14 @@ from __future__ import unicode_literals import json + +import boto3 +import pytest import sure # noqa import moto.server as server -from moto import mock_secretsmanager +from moto import mock_secretsmanager, mock_lambda, mock_iam, mock_logs, settings +from tests.test_awslambda.test_lambda import get_test_zip_file1 """ Test the different server responses for secretsmanager @@ -324,6 +328,54 @@ def test_rotate_secret_that_does_not_match(): assert json_data["__type"] == "ResourceNotFoundException" +@mock_secretsmanager +def test_rotate_secret_that_is_still_rotating(): + backend = server.create_backend_app("secretsmanager") + test_client = backend.test_client() + + create_secret = test_client.post( + "/", + data={ + "Name": DEFAULT_SECRET_NAME, + "SecretString": "foosecret", + # "VersionStages": ["AWSPENDING"], + }, + headers={"X-Amz-Target": "secretsmanager.CreateSecret"}, + ) + create_secret = json.loads(create_secret.data.decode("utf-8")) + + # Get the secret into a broken state. + version_id = create_secret["VersionId"] + test_client.post( + "/", + data={ + "SecretId": "test-secret", + "VersionStage": "AWSPENDING", + "MoveToVersionId": version_id, + }, + headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, + ) + describe_secret = test_client.post( + "/", + data={"SecretId": DEFAULT_SECRET_NAME}, + headers={"X-Amz-Target": "secretsmanager.DescribeSecret"}, + ) + + metadata = json.loads(describe_secret.data.decode("utf-8")) + assert metadata["SecretVersionsToStages"][version_id] == [ + "AWSCURRENT", + "AWSPENDING", + ] + + # Then attempt to rotate it + rotate_secret = test_client.post( + "/", + data={"SecretId": DEFAULT_SECRET_NAME}, + headers={"X-Amz-Target": "secretsmanager.RotateSecret"}, + ) + assert rotate_secret.status_code == 400 + + @mock_secretsmanager def test_rotate_secret_client_request_token_too_short(): backend = server.create_backend_app("secretsmanager") @@ -404,6 +456,79 @@ def test_rotate_secret_rotation_lambda_arn_too_long(): assert json_data["__type"] == "InvalidParameterException" +if not settings.TEST_SERVER_MODE: + + @mock_iam + @mock_lambda + @mock_logs + @mock_secretsmanager + def test_rotate_secret_lambda_invocations(): + conn = boto3.client("iam", region_name="us-east-1") + logs_conn = boto3.client("logs", region_name="us-east-1") + role = conn.create_role( + RoleName="role", AssumeRolePolicyDocument="some policy", Path="/my-path/", + ) + + conn = boto3.client("lambda", region_name="us-east-1") + func = conn.create_function( + FunctionName="testFunction", + Code=dict(ZipFile=get_test_zip_file1()), + Handler="lambda_function.lambda_handler", + Runtime="python2.7", + Role=role["Role"]["Arn"], + ) + + secretsmanager_backend = server.create_backend_app("secretsmanager") + secretsmanager_client = secretsmanager_backend.test_client() + + secretsmanager_client.post( + "/", + data={"Name": DEFAULT_SECRET_NAME, "SecretString": "foosecret"}, + headers={"X-Amz-Target": "secretsmanager.CreateSecret"}, + ) + + with pytest.raises(logs_conn.exceptions.ResourceNotFoundException): + # The log group doesn't exist yet + logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction") + + secretsmanager_client.post( + "/", + data={ + "SecretId": DEFAULT_SECRET_NAME, + "RotationLambdaARN": func["FunctionArn"], + }, + headers={"X-Amz-Target": "secretsmanager.RotateSecret"}, + ) + + # The log group now exists and has been logged to 4 times (for each invocation) + logs = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction") + assert len(logs["logStreams"]) == 4 + + @mock_iam + @mock_lambda + @mock_logs + @mock_secretsmanager + def test_rotate_secret_with_incorrect_lambda_arn(): + secretsmanager_backend = server.create_backend_app("secretsmanager") + secretsmanager_client = secretsmanager_backend.test_client() + + secretsmanager_client.post( + "/", + data={"Name": DEFAULT_SECRET_NAME, "SecretString": "foosecret"}, + headers={"X-Amz-Target": "secretsmanager.CreateSecret"}, + ) + + resp = secretsmanager_client.post( + "/", + data={"SecretId": DEFAULT_SECRET_NAME, "RotationLambdaARN": "notarealarn",}, + headers={"X-Amz-Target": "secretsmanager.RotateSecret"}, + ) + json_data = json.loads(resp.data.decode("utf-8")) + assert json_data["message"] == "Resource not found for ARN 'notarealarn'." + assert json_data["__type"] == "ResourceNotFoundException" + assert resp.status_code == 404 + + @mock_secretsmanager def test_put_secret_value_puts_new_secret(): backend = server.create_backend_app("secretsmanager") @@ -629,6 +754,175 @@ def test_get_resource_policy_secret(): assert json_data["Name"] == "test-secret" +@mock_secretsmanager +def test_update_secret_version_stage(): + custom_stage = "CUSTOM_STAGE" + backend = server.create_backend_app("secretsmanager") + test_client = backend.test_client() + create_secret = test_client.post( + "/", + data={"Name": "test-secret", "SecretString": "secret"}, + headers={"X-Amz-Target": "secretsmanager.CreateSecret"}, + ) + create_secret = json.loads(create_secret.data.decode("utf-8")) + initial_version = create_secret["VersionId"] + + # Create a new version + put_secret = test_client.post( + "/", + data={ + "SecretId": DEFAULT_SECRET_NAME, + "SecretString": "secret", + "VersionStages": [custom_stage], + }, + headers={"X-Amz-Target": "secretsmanager.PutSecretValue"}, + ) + put_secret = json.loads(put_secret.data.decode("utf-8")) + new_version = put_secret["VersionId"] + + describe_secret = test_client.post( + "/", + data={"SecretId": "test-secret"}, + headers={"X-Amz-Target": "secretsmanager.DescribeSecret"}, + ) + + json_data = json.loads(describe_secret.data.decode("utf-8")) + stages = json_data["SecretVersionsToStages"] + assert len(stages) == 2 + assert stages[initial_version] == ["AWSPREVIOUS"] + assert stages[new_version] == [custom_stage] + + test_client.post( + "/", + data={ + "SecretId": "test-secret", + "VersionStage": custom_stage, + "RemoveFromVersionId": new_version, + "MoveToVersionId": initial_version, + }, + headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, + ) + + describe_secret = test_client.post( + "/", + data={"SecretId": "test-secret"}, + headers={"X-Amz-Target": "secretsmanager.DescribeSecret"}, + ) + + json_data = json.loads(describe_secret.data.decode("utf-8")) + stages = json_data["SecretVersionsToStages"] + assert len(stages) == 2 + assert stages[initial_version] == ["AWSPREVIOUS", custom_stage] + assert stages[new_version] == [] + + +@mock_secretsmanager +def test_update_secret_version_stage_currentversion_handling(): + backend = server.create_backend_app("secretsmanager") + test_client = backend.test_client() + create_secret = test_client.post( + "/", + data={"Name": "test-secret", "SecretString": "secret"}, + headers={"X-Amz-Target": "secretsmanager.CreateSecret"}, + ) + create_secret = json.loads(create_secret.data.decode("utf-8")) + initial_version = create_secret["VersionId"] + + # Create a new version + put_secret = test_client.post( + "/", + data={"SecretId": DEFAULT_SECRET_NAME, "SecretString": "secret",}, + headers={"X-Amz-Target": "secretsmanager.PutSecretValue"}, + ) + put_secret = json.loads(put_secret.data.decode("utf-8")) + new_version = put_secret["VersionId"] + + describe_secret = test_client.post( + "/", + data={"SecretId": "test-secret"}, + headers={"X-Amz-Target": "secretsmanager.DescribeSecret"}, + ) + + json_data = json.loads(describe_secret.data.decode("utf-8")) + stages = json_data["SecretVersionsToStages"] + assert len(stages) == 2 + assert stages[initial_version] == ["AWSPREVIOUS"] + assert stages[new_version] == ["AWSCURRENT"] + + test_client.post( + "/", + data={ + "SecretId": "test-secret", + "VersionStage": "AWSCURRENT", + "RemoveFromVersionId": new_version, + "MoveToVersionId": initial_version, + }, + headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, + ) + + describe_secret = test_client.post( + "/", + data={"SecretId": "test-secret"}, + headers={"X-Amz-Target": "secretsmanager.DescribeSecret"}, + ) + + json_data = json.loads(describe_secret.data.decode("utf-8")) + stages = json_data["SecretVersionsToStages"] + assert len(stages) == 2 + assert stages[initial_version] == ["AWSCURRENT"] + assert stages[new_version] == ["AWSPREVIOUS"] + + +@mock_secretsmanager +def test_update_secret_version_stage_validation(): + backend = server.create_backend_app("secretsmanager") + test_client = backend.test_client() + + # Secret ID that doesn't exist + resp = test_client.post( + "/", + data={"SecretId": "nonexistent"}, + headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, + ) + assert resp.status_code == 404 + + # Add a secret so we can run further checks + secret = test_client.post( + "/", + data={"Name": DEFAULT_SECRET_NAME, "SecretString": "secret"}, + headers={"X-Amz-Target": "secretsmanager.CreateSecret"}, + ) + secret = json.loads(secret.data.decode("utf-8")) + + # "Remove from" version ID that doesn't exist + resp = test_client.post( + "/", + data={"SecretId": DEFAULT_SECRET_NAME, "RemoveFromVersionId": "nonexistent"}, + headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, + ) + assert resp.status_code == 400 + + # "Remove from" stage name which isn't attached to the given version + resp = test_client.post( + "/", + data={ + "SecretId": DEFAULT_SECRET_NAME, + "RemoveFromVersionId": secret["VersionId"], + "VersionStage": "nonexistent", + }, + headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, + ) + assert resp.status_code == 400 + + # "Move to" version ID that doesn't exist + resp = test_client.post( + "/", + data={"SecretId": DEFAULT_SECRET_NAME, "MoveToVersionId": "nonexistent",}, + headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, + ) + assert resp.status_code == 400 + + # # The following tests should work, but fail on the embedded dict in # RotationRules. The error message suggests a problem deeper in the code, which