From 7194456d0d82bd04f936857afaf51085e8225380 Mon Sep 17 00:00:00 2001 From: MEP <64580864+MEPalma@users.noreply.github.com> Date: Fri, 11 Feb 2022 13:49:14 +0100 Subject: [PATCH] [LocalStack] Fixes to secretsmanager's PutSecretValue, CreateSecret, DeleteSecret (#4851) --- moto/secretsmanager/models.py | 94 ++++++----- moto/secretsmanager/responses.py | 2 + .../test_secretsmanager.py | 151 +++++++++++++++++- tests/test_secretsmanager/test_server.py | 5 +- 4 files changed, 211 insertions(+), 41 deletions(-) diff --git a/moto/secretsmanager/models.py b/moto/secretsmanager/models.py index 233666983..783f9542d 100644 --- a/moto/secretsmanager/models.py +++ b/moto/secretsmanager/models.py @@ -120,14 +120,16 @@ class FakeSecret: def is_deleted(self): return self.deleted_date is not None - def to_short_dict(self, include_version_stages=False): + def to_short_dict(self, include_version_stages=False, version_id=None): + if not version_id: + version_id = self.default_version_id dct = { "ARN": self.arn, "Name": self.name, - "VersionId": self.default_version_id, + "VersionId": version_id, } if include_version_stages: - dct["VersionStages"] = self.version_stages + dct["VersionStages"] = self.versions[version_id]["version_stages"] return json.dumps(dct) def to_dict(self): @@ -208,6 +210,14 @@ class SecretsManagerBackend(BaseBackend): msg = "ClientRequestToken must be 32-64 characters long." raise InvalidParameterException(msg) + def _from_client_request_token(self, client_request_token): + version_id = client_request_token + if version_id: + self._client_request_token_validator(version_id) + else: + version_id = str(uuid.uuid4()) + return version_id + def get_secret_value(self, secret_id, version_id, version_stage): if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() @@ -309,6 +319,7 @@ class SecretsManagerBackend(BaseBackend): description=None, tags=None, kms_key_id=None, + client_request_token=None, ): # error if secret exists @@ -324,6 +335,7 @@ class SecretsManagerBackend(BaseBackend): description=description, tags=tags, kms_key_id=kms_key_id, + version_id=client_request_token, ) return secret.to_short_dict() @@ -343,10 +355,7 @@ class SecretsManagerBackend(BaseBackend): if version_stages is None: version_stages = ["AWSCURRENT"] - if version_id: - self._client_request_token_validator(version_id) - else: - version_id = str(uuid.uuid4()) + version_id = self._from_client_request_token(version_id) secret_version = { "createdate": int(time.time()), @@ -365,10 +374,10 @@ class SecretsManagerBackend(BaseBackend): secret.update(description, tags, kms_key_id, last_changed_date=update_time) - if "AWSPENDING" in version_stages: - secret.versions[version_id] = secret_version - else: + if "AWSCURRENT" in version_stages: secret.reset_default_version(secret_version, version_id) + else: + secret.versions[version_id] = secret_version else: secret = FakeSecret( region_name=self.region, @@ -403,17 +412,19 @@ class SecretsManagerBackend(BaseBackend): tags = secret.tags description = secret.description + version_id = self._from_client_request_token(client_request_token) + secret = self._add_secret( secret_id, secret_string, secret_binary, - version_id=client_request_token, + version_id=version_id, description=description, tags=tags, version_stages=version_stages, ) - return secret.to_short_dict(include_version_stages=True) + return secret.to_short_dict(include_version_stages=True, version_id=version_id) def describe_secret(self, secret_id): if not self._is_valid_identifier(secret_id): @@ -658,21 +669,6 @@ class SecretsManagerBackend(BaseBackend): self, secret_id, recovery_window_in_days, force_delete_without_recovery ): - if not self._is_valid_identifier(secret_id): - raise SecretNotFoundException() - - if self.secrets[secret_id].is_deleted(): - raise InvalidRequestException( - "An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \ - perform the operation on a secret that's currently marked deleted." - ) - - if recovery_window_in_days and force_delete_without_recovery: - raise InvalidParameterException( - "An error occurred (InvalidParameterException) when calling the DeleteSecret operation: You can't \ - use ForceDeleteWithoutRecovery in conjunction with RecoveryWindowInDays." - ) - if recovery_window_in_days and ( recovery_window_in_days < 7 or recovery_window_in_days > 30 ): @@ -681,22 +677,44 @@ class SecretsManagerBackend(BaseBackend): RecoveryWindowInDays value must be between 7 and 30 days (inclusive)." ) - deletion_date = datetime.datetime.utcnow() + if recovery_window_in_days and force_delete_without_recovery: + raise InvalidParameterException( + "An error occurred (InvalidParameterException) when calling the DeleteSecret operation: You can't \ + use ForceDeleteWithoutRecovery in conjunction with RecoveryWindowInDays." + ) - if force_delete_without_recovery: - secret = self.secrets.pop(secret_id, None) + if not self._is_valid_identifier(secret_id): + if not force_delete_without_recovery: + raise SecretNotFoundException() + else: + secret = FakeSecret(self.region, secret_id) + arn = secret.arn + name = secret.name + deletion_date = datetime.datetime.utcnow() + return arn, name, self._unix_time_secs(deletion_date) else: - deletion_date += datetime.timedelta(days=recovery_window_in_days or 30) - self.secrets[secret_id].delete(self._unix_time_secs(deletion_date)) - secret = self.secrets.get(secret_id, None) + if self.secrets[secret_id].is_deleted(): + raise InvalidRequestException( + "An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \ + perform the operation on a secret that's currently marked deleted." + ) - if not secret: - raise SecretNotFoundException() + deletion_date = datetime.datetime.utcnow() - arn = secret.arn - name = secret.name + if force_delete_without_recovery: + secret = self.secrets.pop(secret_id, None) + else: + deletion_date += datetime.timedelta(days=recovery_window_in_days or 30) + self.secrets[secret_id].delete(self._unix_time_secs(deletion_date)) + secret = self.secrets.get(secret_id, None) - return arn, name, self._unix_time_secs(deletion_date) + if not secret: + raise SecretNotFoundException() + + arn = secret.arn + name = secret.name + + return arn, name, self._unix_time_secs(deletion_date) def restore_secret(self, secret_id): diff --git a/moto/secretsmanager/responses.py b/moto/secretsmanager/responses.py index a96f5fbfa..0aada95d6 100644 --- a/moto/secretsmanager/responses.py +++ b/moto/secretsmanager/responses.py @@ -45,6 +45,7 @@ class SecretsManagerResponse(BaseResponse): description = self._get_param("Description", if_none="") tags = self._get_param("Tags", if_none=[]) kms_key_id = self._get_param("KmsKeyId", if_none=None) + client_request_token = self._get_param("ClientRequestToken", if_none=None) return secretsmanager_backends[self.region].create_secret( name=name, secret_string=secret_string, @@ -52,6 +53,7 @@ class SecretsManagerResponse(BaseResponse): description=description, tags=tags, kms_key_id=kms_key_id, + client_request_token=client_request_token, ) def update_secret(self): diff --git a/tests/test_secretsmanager/test_secretsmanager.py b/tests/test_secretsmanager/test_secretsmanager.py index a0f7718c7..62ebc721b 100644 --- a/tests/test_secretsmanager/test_secretsmanager.py +++ b/tests/test_secretsmanager/test_secretsmanager.py @@ -26,6 +26,20 @@ def test_get_secret_value(): assert result["SecretString"] == "foosecret" +@mock_secretsmanager +def test_create_secret_with_client_request_token(): + conn = boto3.client("secretsmanager", region_name="us-west-2") + + version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce71" + create_dict = conn.create_secret( + Name=DEFAULT_SECRET_NAME, + SecretString="secret_string", + ClientRequestToken=version_id, + ) + assert create_dict + assert create_dict["VersionId"] == version_id + + @mock_secretsmanager def test_get_secret_value_by_arn(): conn = boto3.client("secretsmanager", region_name="us-west-2") @@ -228,6 +242,17 @@ def test_delete_secret_force(): result = conn.get_secret_value(SecretId="test-secret") +@mock_secretsmanager +def test_delete_secret_force_no_such_secret(): + conn = boto3.client("secretsmanager", region_name="us-west-2") + + deleted_secret = conn.delete_secret( + SecretId=DEFAULT_SECRET_NAME, ForceDeleteWithoutRecovery=True + ) + assert deleted_secret + assert deleted_secret["Name"] == DEFAULT_SECRET_NAME + + @mock_secretsmanager def test_delete_secret_force_with_arn(): conn = boto3.client("secretsmanager", region_name="us-west-2") @@ -251,7 +276,7 @@ def test_delete_secret_that_does_not_exist(): conn = boto3.client("secretsmanager", region_name="us-west-2") with pytest.raises(ClientError): - conn.delete_secret(SecretId="i-dont-exist", ForceDeleteWithoutRecovery=True) + conn.delete_secret(SecretId="i-dont-exist") @mock_secretsmanager @@ -288,6 +313,18 @@ def test_delete_secret_recovery_window_too_long(): conn.delete_secret(SecretId="test-secret", RecoveryWindowInDays=31) +@mock_secretsmanager +def test_delete_secret_force_no_such_secret_with_invalid_recovery_window(): + conn = boto3.client("secretsmanager", region_name="us-west-2") + + with pytest.raises(ClientError): + conn.delete_secret( + SecretId=DEFAULT_SECRET_NAME, + ForceDeleteWithoutRecovery=True, + RecoveryWindowInDays=4, + ) + + @mock_secretsmanager def test_delete_secret_that_is_marked_deleted(): conn = boto3.client("secretsmanager", region_name="us-west-2") @@ -952,6 +989,118 @@ def test_can_list_secret_version_ids(): assert [first_version_id, second_version_id].sort() == returned_version_ids.sort() +@mock_secretsmanager +def test_put_secret_value_version_stages_response(): + conn = boto3.client("secretsmanager", region_name="us-west-2") + + # Creation. + first_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce71" + conn.create_secret( + Name=DEFAULT_SECRET_NAME, + SecretString="first_secret_string", + ClientRequestToken=first_version_id, + ) + + # Use PutSecretValue to push a new version with new version stages. + second_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce72" + second_version_stages = ["SAMPLESTAGE1", "SAMPLESTAGE0"] + second_put_res_dict = conn.put_secret_value( + SecretId=DEFAULT_SECRET_NAME, + SecretString="second_secret_string", + VersionStages=second_version_stages, + ClientRequestToken=second_version_id, + ) + assert second_put_res_dict + assert second_put_res_dict["VersionId"] == second_version_id + assert second_put_res_dict["VersionStages"] == second_version_stages + + +@mock_secretsmanager +def test_put_secret_value_version_stages_pending_response(): + conn = boto3.client("secretsmanager", region_name="us-west-2") + + # Creation. + first_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce71" + conn.create_secret( + Name=DEFAULT_SECRET_NAME, + SecretString="first_secret_string", + ClientRequestToken=first_version_id, + ) + + # Use PutSecretValue to push a new version with new version stages. + second_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce72" + second_version_stages = ["AWSPENDING"] + second_put_res_dict = conn.put_secret_value( + SecretId=DEFAULT_SECRET_NAME, + SecretString="second_secret_string", + VersionStages=second_version_stages, + ClientRequestToken=second_version_id, + ) + assert second_put_res_dict + assert second_put_res_dict["VersionId"] == second_version_id + assert second_put_res_dict["VersionStages"] == second_version_stages + + +@mock_secretsmanager +def test_after_put_secret_value_version_stages_can_get_current(): + conn = boto3.client("secretsmanager", region_name="us-west-2") + + # Creation. + first_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce71" + first_secret_string = "first_secret_string" + conn.create_secret( + Name=DEFAULT_SECRET_NAME, + SecretString=first_secret_string, + ClientRequestToken=first_version_id, + ) + + # Use PutSecretValue to push a new version with new version stages. + second_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce72" + conn.put_secret_value( + SecretId=DEFAULT_SECRET_NAME, + SecretString="second_secret_string", + VersionStages=["SAMPLESTAGE1", "SAMPLESTAGE0"], + ClientRequestToken=second_version_id, + ) + + # Get current. + get_dict = conn.get_secret_value(SecretId=DEFAULT_SECRET_NAME) + assert get_dict + assert get_dict["VersionId"] == first_version_id + assert get_dict["SecretString"] == first_secret_string + assert get_dict["VersionStages"] == ["AWSCURRENT"] + + +@mock_secretsmanager +def test_after_put_secret_value_version_stages_pending_can_get_current(): + conn = boto3.client("secretsmanager", region_name="us-west-2") + + # Creation. + first_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce71" + first_secret_string = "first_secret_string" + conn.create_secret( + Name=DEFAULT_SECRET_NAME, + SecretString=first_secret_string, + ClientRequestToken=first_version_id, + ) + + # Use PutSecretValue to push a new version with new version stages. + pending_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce72" + conn.put_secret_value( + SecretId=DEFAULT_SECRET_NAME, + SecretString="second_secret_string", + VersionStages=["AWSPENDING"], + ClientRequestToken=pending_version_id, + ) + + # Get current. + get_dict = conn.get_secret_value(SecretId=DEFAULT_SECRET_NAME) + assert get_dict + assert get_dict["VersionId"] == first_version_id + assert get_dict["SecretString"] == first_secret_string + assert get_dict["VersionStages"] == ["AWSCURRENT"] + + @mock_secretsmanager @pytest.mark.parametrize("pass_arn", [True, False]) def test_update_secret(pass_arn): diff --git a/tests/test_secretsmanager/test_server.py b/tests/test_secretsmanager/test_server.py index bbe95f36f..f213ebf4a 100644 --- a/tests/test_secretsmanager/test_server.py +++ b/tests/test_secretsmanager/test_server.py @@ -774,6 +774,7 @@ def test_update_secret_version_stage(pass_arn): headers={"X-Amz-Target": "secretsmanager.PutSecretValue"}, ) put_secret = json.loads(put_secret.data.decode("utf-8")) + assert put_secret["VersionStages"] == [custom_stage] new_version = put_secret["VersionId"] describe_secret = test_client.post( @@ -785,7 +786,7 @@ def test_update_secret_version_stage(pass_arn): json_data = json.loads(describe_secret.data.decode("utf-8")) stages = json_data["SecretVersionsToStages"] assert len(stages) == 2 - assert stages[initial_version] == ["AWSPREVIOUS"] + assert stages[initial_version] == ["AWSCURRENT"] assert stages[new_version] == [custom_stage] test_client.post( @@ -808,7 +809,7 @@ def test_update_secret_version_stage(pass_arn): json_data = json.loads(describe_secret.data.decode("utf-8")) stages = json_data["SecretVersionsToStages"] assert len(stages) == 2 - assert stages[initial_version] == ["AWSPREVIOUS", custom_stage] + assert stages[initial_version] == ["AWSCURRENT", custom_stage] assert stages[new_version] == []