From 3cf4f6315b9c8cf77a1f8e1dc9f4613ca131f2db Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Thu, 24 Aug 2023 13:48:53 +0000 Subject: [PATCH] SecretsManager: Allow creation/update of secrets without values (#6720) --- moto/secretsmanager/models.py | 164 +++++++++++------- .../test_secretsmanager.py | 116 +++++++++++-- 2 files changed, 199 insertions(+), 81 deletions(-) diff --git a/moto/secretsmanager/models.py b/moto/secretsmanager/models.py index a011f8b59..567b2ca56 100644 --- a/moto/secretsmanager/models.py +++ b/moto/secretsmanager/models.py @@ -61,12 +61,13 @@ class FakeSecret: account_id: str, region_name: str, secret_id: str, + secret_version: Dict[str, Any], + version_id: str, secret_string: Optional[str] = None, secret_binary: Optional[str] = None, description: Optional[str] = None, tags: Optional[List[Dict[str, str]]] = None, kms_key_id: Optional[str] = None, - version_id: Optional[str] = None, version_stages: Optional[List[str]] = None, last_changed_date: Optional[int] = None, created_date: Optional[int] = None, @@ -79,10 +80,11 @@ class FakeSecret: self.description = description self.tags = tags or [] self.kms_key_id = kms_key_id - self.version_id = version_id self.version_stages = version_stages self.last_changed_date = last_changed_date self.created_date = created_date + # We should only return Rotation details after it's been requested + self.rotation_requested = False self.rotation_enabled = False self.rotation_lambda_arn = "" self.auto_rotate_after_days = 0 @@ -91,6 +93,13 @@ class FakeSecret: self.next_rotation_date: Optional[int] = None self.last_rotation_date: Optional[int] = None + self.versions: Dict[str, Dict[str, Any]] = {} + if secret_string or secret_binary: + self.versions = {version_id: secret_version} + self.set_default_version_id(version_id) + else: + self.set_default_version_id(None) + def update( self, description: Optional[str] = None, @@ -106,10 +115,7 @@ class FakeSecret: if kms_key_id is not None: self.kms_key_id = kms_key_id - def set_versions(self, versions: Dict[str, Dict[str, Any]]) -> None: - self.versions = versions - - def set_default_version_id(self, version_id: str) -> None: + def set_default_version_id(self, version_id: Optional[str]) -> None: self.default_version_id = version_id def reset_default_version( @@ -122,7 +128,7 @@ class FakeSecret: # set old AWSCURRENT secret to AWSPREVIOUS previous_current_version_id = self.default_version_id - self.versions[previous_current_version_id]["version_stages"] = ["AWSPREVIOUS"] + self.versions[previous_current_version_id]["version_stages"] = ["AWSPREVIOUS"] # type: ignore self.versions[version_id] = secret_version self.default_version_id = version_id @@ -145,40 +151,61 @@ class FakeSecret: return self.deleted_date is not None def to_short_dict( - self, include_version_stages: bool = False, version_id: Optional[str] = None + self, + include_version_stages: bool = False, + version_id: Optional[str] = None, + include_version_id: bool = True, ) -> str: if not version_id: version_id = self.default_version_id dct = { "ARN": self.arn, "Name": self.name, - "VersionId": version_id, } - if include_version_stages: + if include_version_id and version_id: + dct["VersionId"] = version_id + if version_id and include_version_stages: dct["VersionStages"] = self.versions[version_id]["version_stages"] return json.dumps(dct) def to_dict(self) -> Dict[str, Any]: version_id_to_stages = self._form_version_ids_to_stages() - return { + dct: Dict[str, Any] = { "ARN": self.arn, "Name": self.name, - "Description": self.description or "", "KmsKeyId": self.kms_key_id, - "RotationEnabled": self.rotation_enabled, - "RotationLambdaARN": self.rotation_lambda_arn, - "RotationRules": {"AutomaticallyAfterDays": self.auto_rotate_after_days}, - "LastRotatedDate": self.last_rotation_date, "LastChangedDate": self.last_changed_date, "LastAccessedDate": None, "NextRotationDate": self.next_rotation_date, "DeletedDate": self.deleted_date, - "Tags": self.tags, - "VersionIdsToStages": version_id_to_stages, - "SecretVersionsToStages": version_id_to_stages, "CreatedDate": self.created_date, } + if self.tags: + dct["Tags"] = self.tags + if self.description: + dct["Description"] = self.description + if self.versions: + dct.update( + { + # Key used by describe_secret + "VersionIdsToStages": version_id_to_stages, + # Key used by list_secrets + "SecretVersionsToStages": version_id_to_stages, + } + ) + if self.rotation_requested: + dct.update( + { + "RotationEnabled": self.rotation_enabled, + "RotationLambdaARN": self.rotation_lambda_arn, + "RotationRules": { + "AutomaticallyAfterDays": self.auto_rotate_after_days + }, + "LastRotatedDate": self.last_rotation_date, + } + ) + return dct def _form_version_ids_to_stages(self) -> Dict[str, str]: version_id_to_stages = {} @@ -296,6 +323,7 @@ class SecretsManagerBackend(BaseBackend): ): raise SecretStageVersionMismatchException() + version_id_provided = version_id is not None if not version_id and version_stage: # set version_id to match version_stage versions_dict = self.secrets[secret_id].versions @@ -314,13 +342,13 @@ class SecretsManagerBackend(BaseBackend): ) secret = self.secrets[secret_id] - version_id = version_id or secret.default_version_id + version_id = version_id or secret.default_version_id or "AWSCURRENT" secret_version = secret.versions.get(version_id) if not secret_version: + _type = "staging label" if not version_id_provided else "VersionId" raise ResourceNotFoundException( - "An error occurred (ResourceNotFoundException) when calling the GetSecretValue operation: Secrets " - f"Manager can't find the specified secret value for VersionId: {version_id}" + f"Secrets Manager can't find the specified secret value for {_type}: {version_id}" ) response_data = { @@ -369,7 +397,7 @@ class SecretsManagerBackend(BaseBackend): tags = secret.tags description = description or secret.description - secret = self._add_secret( + secret, new_version = self._add_secret( secret_id, secret_string=secret_string, secret_binary=secret_binary, @@ -379,7 +407,7 @@ class SecretsManagerBackend(BaseBackend): kms_key_id=kms_key_id, ) - return secret.to_short_dict() + return secret.to_short_dict(include_version_id=new_version) def create_secret( self, @@ -398,7 +426,7 @@ class SecretsManagerBackend(BaseBackend): "A resource with the ID you requested already exists." ) - secret = self._add_secret( + secret, new_version = self._add_secret( name, secret_string=secret_string, secret_binary=secret_binary, @@ -408,7 +436,7 @@ class SecretsManagerBackend(BaseBackend): version_id=client_request_token, ) - return secret.to_short_dict() + return secret.to_short_dict(include_version_id=new_version) def _add_secret( self, @@ -420,7 +448,7 @@ class SecretsManagerBackend(BaseBackend): kms_key_id: Optional[str] = None, version_id: Optional[str] = None, version_stages: Optional[List[str]] = None, - ) -> FakeSecret: + ) -> Tuple[FakeSecret, bool]: if version_stages is None: version_stages = ["AWSCURRENT"] @@ -438,17 +466,20 @@ class SecretsManagerBackend(BaseBackend): if secret_binary is not None: secret_version["secret_binary"] = secret_binary + new_version = secret_string is not None or secret_binary is not None + update_time = int(time.time()) if secret_id in self.secrets: secret = self.secrets[secret_id] secret.update(description, tags, kms_key_id, last_changed_date=update_time) - if "AWSCURRENT" in version_stages: - secret.reset_default_version(secret_version, version_id) - else: - secret.remove_version_stages_from_old_versions(version_stages) - secret.versions[version_id] = secret_version + if new_version: + if "AWSCURRENT" in version_stages: + secret.reset_default_version(secret_version, version_id) + else: + secret.remove_version_stages_from_old_versions(version_stages) + secret.versions[version_id] = secret_version else: secret = FakeSecret( account_id=self.account_id, @@ -461,12 +492,12 @@ class SecretsManagerBackend(BaseBackend): kms_key_id=kms_key_id, last_changed_date=update_time, created_date=update_time, + version_id=version_id, + secret_version=secret_version, ) - secret.set_versions({version_id: secret_version}) - secret.set_default_version_id(version_id) self.secrets[secret_id] = secret - return secret + return secret, new_version def put_secret_value( self, @@ -486,7 +517,7 @@ class SecretsManagerBackend(BaseBackend): version_id = self._from_client_request_token(client_request_token) - secret = self._add_secret( + secret, _ = self._add_secret( secret_id, secret_string, secret_binary, @@ -513,7 +544,6 @@ class SecretsManagerBackend(BaseBackend): rotation_lambda_arn: Optional[str] = None, rotation_rules: Optional[Dict[str, Any]] = None, ) -> str: - rotation_days = "AutomaticallyAfterDays" if not self._is_valid_identifier(secret_id): @@ -569,30 +599,33 @@ class SecretsManagerBackend(BaseBackend): # Pending is not present in any version pass - old_secret_version = secret.versions[secret.default_version_id] + if secret.versions: + old_secret_version = secret.versions[secret.default_version_id] # type: ignore - if client_request_token: - self._client_request_token_validator(client_request_token) - new_version_id = client_request_token - else: - new_version_id = str(mock_random.uuid4()) + if client_request_token: + self._client_request_token_validator(client_request_token) + new_version_id = client_request_token + else: + new_version_id = str(mock_random.uuid4()) - # We add the new secret version as "pending". The previous version remains - # as "current" for now. Once we've passed the new secret through the lambda - # rotation function (if provided) we can then update the status to "current". - old_secret_version_secret_string = ( - old_secret_version["secret_string"] - if "secret_string" in old_secret_version - else None - ) - self._add_secret( - secret_id, - old_secret_version_secret_string, - description=secret.description, - tags=secret.tags, - version_id=new_version_id, - version_stages=["AWSPENDING"], - ) + # We add the new secret version as "pending". The previous version remains + # as "current" for now. Once we've passed the new secret through the lambda + # rotation function (if provided) we can then update the status to "current". + old_secret_version_secret_string = ( + old_secret_version["secret_string"] + if "secret_string" in old_secret_version + else None + ) + self._add_secret( + secret_id, + old_secret_version_secret_string, + description=secret.description, + tags=secret.tags, + version_id=new_version_id, + version_stages=["AWSPENDING"], + ) + + secret.rotation_requested = True secret.rotation_lambda_arn = rotation_lambda_arn or "" if rotation_rules: secret.auto_rotate_after_days = rotation_rules.get(rotation_days, 0) @@ -628,11 +661,15 @@ class SecretsManagerBackend(BaseBackend): ) secret.set_default_version_id(new_version_id) - else: + elif secret.versions: + # AWS will always require a Lambda ARN + # without that, Moto can still apply the 'AWSCURRENT'-label + # This only makes sense if we have a version secret.reset_default_version( secret.versions[new_version_id], new_version_id ) secret.versions[new_version_id]["version_stages"] = ["AWSCURRENT"] + self.secrets[secret_id].last_rotation_date = int(time.time()) return secret.to_short_dict() @@ -741,11 +778,8 @@ class SecretsManagerBackend(BaseBackend): if not force_delete_without_recovery: raise SecretNotFoundException() else: - unknown_secret = FakeSecret( - self.account_id, self.region_name, secret_id - ) - arn = unknown_secret.arn - name = unknown_secret.name + arn = secret_arn(self.account_id, self.region_name, secret_id=secret_id) + name = secret_id deletion_date = datetime.datetime.utcnow() return arn, name, self._unix_time_secs(deletion_date) else: diff --git a/tests/test_secretsmanager/test_secretsmanager.py b/tests/test_secretsmanager/test_secretsmanager.py index b57937103..8a495bb9f 100644 --- a/tests/test_secretsmanager/test_secretsmanager.py +++ b/tests/test_secretsmanager/test_secretsmanager.py @@ -14,7 +14,7 @@ import pytest from moto import mock_secretsmanager, mock_lambda, settings from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID -DEFAULT_SECRET_NAME = "test-secret" +DEFAULT_SECRET_NAME = "test-secret7" @mock_secretsmanager @@ -124,10 +124,10 @@ def test_get_secret_value_that_is_marked_deleted(): def test_get_secret_that_has_no_value(): conn = boto3.client("secretsmanager", region_name="us-west-2") - conn.create_secret(Name="java-util-test-password") + conn.create_secret(Name="secret-no-value") with pytest.raises(ClientError) as cm: - conn.get_secret_value(SecretId="java-util-test-password") + conn.get_secret_value(SecretId="secret-no-value") assert ( "Secrets Manager can't find the specified secret value for staging label: AWSCURRENT" @@ -139,7 +139,7 @@ def test_get_secret_that_has_no_value(): def test_get_secret_version_that_does_not_exist(): conn = boto3.client("secretsmanager", region_name="us-west-2") - result = conn.create_secret(Name="java-util-test-password") + result = conn.create_secret(Name="java-util-test-password", SecretString="v") secret_arn = result["ARN"] missing_version_id = "00000000-0000-0000-0000-000000000000" @@ -147,8 +147,7 @@ def test_get_secret_version_that_does_not_exist(): conn.get_secret_value(SecretId=secret_arn, VersionId=missing_version_id) assert ( - "An error occurred (ResourceNotFoundException) when calling the " - "GetSecretValue operation: Secrets Manager can't find the specified " + "Secrets Manager can't find the specified " "secret value for VersionId: 00000000-0000-0000-0000-000000000000" ) == cm.value.response["Error"]["Message"] @@ -251,6 +250,92 @@ def test_create_secret_with_tags_and_description(): assert secret_details["Description"] == "desc" +@mock_secretsmanager +def test_create_secret_without_value(): + conn = boto3.client("secretsmanager", region_name="us-east-2") + secret_name = f"secret-{str(uuid4())[0:6]}" + + create = conn.create_secret(Name=secret_name) + assert set(create.keys()) == {"ARN", "Name", "ResponseMetadata"} + + describe = conn.describe_secret(SecretId=secret_name) + assert set(describe.keys()) == { + "ARN", + "Name", + "LastChangedDate", + "CreatedDate", + "ResponseMetadata", + } + + with pytest.raises(ClientError) as exc: + conn.get_secret_value(SecretId=secret_name) + err = exc.value.response["Error"] + assert err["Code"] == "ResourceNotFoundException" + + updated = conn.update_secret( + SecretId=secret_name, + Description="new desc", + ) + assert set(updated.keys()) == {"ARN", "Name", "ResponseMetadata"} + + describe = conn.describe_secret(SecretId=secret_name) + assert set(describe.keys()) == { + "ARN", + "Name", + "Description", + "LastChangedDate", + "CreatedDate", + "ResponseMetadata", + } + + deleted = conn.delete_secret(SecretId=secret_name) + assert set(deleted.keys()) == {"ARN", "Name", "DeletionDate", "ResponseMetadata"} + + +@mock_secretsmanager +def test_update_secret_without_value(): + conn = boto3.client("secretsmanager", region_name="us-east-2") + secret_name = f"secret-{str(uuid4())[0:6]}" + + create = conn.create_secret(Name=secret_name, SecretString="foosecret") + assert set(create.keys()) == {"ARN", "Name", "VersionId", "ResponseMetadata"} + version_id = create["VersionId"] + + describe1 = conn.describe_secret(SecretId=secret_name) + assert set(describe1.keys()) == { + "ARN", + "Name", + "LastChangedDate", + "VersionIdsToStages", + "CreatedDate", + "ResponseMetadata", + } + + conn.get_secret_value(SecretId=secret_name) + + updated = conn.update_secret(SecretId=secret_name, Description="desc") + assert set(updated.keys()) == {"ARN", "Name", "ResponseMetadata"} + + describe2 = conn.describe_secret(SecretId=secret_name) + # AWS also includes 'LastAccessedDate' + assert set(describe2.keys()) == { + "ARN", + "Name", + "Description", + "LastChangedDate", + "VersionIdsToStages", + "CreatedDate", + "ResponseMetadata", + } + assert describe1["VersionIdsToStages"] == describe2["VersionIdsToStages"] + + value = conn.get_secret_value(SecretId=secret_name) + assert value["SecretString"] == "foosecret" + assert value["VersionId"] == version_id + + conn.delete_secret(SecretId=secret_name) + + @mock_secretsmanager def test_delete_secret(): conn = boto3.client("secretsmanager", region_name="us-west-2") @@ -302,7 +387,7 @@ def test_delete_secret_force(): assert result["Name"] == "test-secret" with pytest.raises(ClientError): - result = conn.get_secret_value(SecretId="test-secret") + conn.get_secret_value(SecretId="test-secret") @mock_secretsmanager @@ -331,7 +416,7 @@ def test_delete_secret_force_with_arn(): assert result["Name"] == "test-secret" with pytest.raises(ClientError): - result = conn.get_secret_value(SecretId="test-secret") + conn.get_secret_value(SecretId="test-secret") @mock_secretsmanager @@ -756,16 +841,17 @@ def test_rotate_secret(): @mock_secretsmanager def test_rotate_secret_without_secretstring(): - conn = boto3.client("secretsmanager", region_name="us-west-2") + # This test just verifies that Moto does not fail + conn = boto3.client("secretsmanager", region_name="us-east-2") conn.create_secret(Name=DEFAULT_SECRET_NAME, Description="foodescription") + # AWS will always require a Lambda ARN to do the actual rotating rotated_secret = conn.rotate_secret(SecretId=DEFAULT_SECRET_NAME) - - assert rotated_secret - assert rotated_secret["ARN"] == rotated_secret["ARN"] assert rotated_secret["Name"] == DEFAULT_SECRET_NAME - assert rotated_secret["VersionId"] == rotated_secret["VersionId"] + # Without secret-value, and without actual rotating, we can't verify much + # Just that the secret exists/can be described + # We cannot verify any versions info (as that is not created without a secret-value) describe_secret = conn.describe_secret(SecretId=DEFAULT_SECRET_NAME) assert describe_secret["Description"] == "foodescription" @@ -776,9 +862,7 @@ def test_rotate_secret_enable_rotation(): conn.create_secret(Name=DEFAULT_SECRET_NAME, SecretString="foosecret") initial_description = conn.describe_secret(SecretId=DEFAULT_SECRET_NAME) - assert initial_description - assert initial_description["RotationEnabled"] is False - assert initial_description["RotationRules"]["AutomaticallyAfterDays"] == 0 + assert "RotationEnabled" not in initial_description conn.rotate_secret( SecretId=DEFAULT_SECRET_NAME, RotationRules={"AutomaticallyAfterDays": 42}