SecretsManager: Allow creation/update of secrets without values (#6720)

This commit is contained in:
Bert Blommers 2023-08-24 13:48:53 +00:00 committed by GitHub
parent 8ff53ff417
commit 3cf4f6315b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 199 additions and 81 deletions

View File

@ -61,12 +61,13 @@ class FakeSecret:
account_id: str,
region_name: str,
secret_id: str,
secret_version: Dict[str, Any],
version_id: str,
secret_string: Optional[str] = None,
secret_binary: Optional[str] = None,
description: Optional[str] = None,
tags: Optional[List[Dict[str, str]]] = None,
kms_key_id: Optional[str] = None,
version_id: Optional[str] = None,
version_stages: Optional[List[str]] = None,
last_changed_date: Optional[int] = None,
created_date: Optional[int] = None,
@ -79,10 +80,11 @@ class FakeSecret:
self.description = description
self.tags = tags or []
self.kms_key_id = kms_key_id
self.version_id = version_id
self.version_stages = version_stages
self.last_changed_date = last_changed_date
self.created_date = created_date
# We should only return Rotation details after it's been requested
self.rotation_requested = False
self.rotation_enabled = False
self.rotation_lambda_arn = ""
self.auto_rotate_after_days = 0
@ -91,6 +93,13 @@ class FakeSecret:
self.next_rotation_date: Optional[int] = None
self.last_rotation_date: Optional[int] = None
self.versions: Dict[str, Dict[str, Any]] = {}
if secret_string or secret_binary:
self.versions = {version_id: secret_version}
self.set_default_version_id(version_id)
else:
self.set_default_version_id(None)
def update(
self,
description: Optional[str] = None,
@ -106,10 +115,7 @@ class FakeSecret:
if kms_key_id is not None:
self.kms_key_id = kms_key_id
def set_versions(self, versions: Dict[str, Dict[str, Any]]) -> None:
self.versions = versions
def set_default_version_id(self, version_id: str) -> None:
def set_default_version_id(self, version_id: Optional[str]) -> None:
self.default_version_id = version_id
def reset_default_version(
@ -122,7 +128,7 @@ class FakeSecret:
# set old AWSCURRENT secret to AWSPREVIOUS
previous_current_version_id = self.default_version_id
self.versions[previous_current_version_id]["version_stages"] = ["AWSPREVIOUS"]
self.versions[previous_current_version_id]["version_stages"] = ["AWSPREVIOUS"] # type: ignore
self.versions[version_id] = secret_version
self.default_version_id = version_id
@ -145,40 +151,61 @@ class FakeSecret:
return self.deleted_date is not None
def to_short_dict(
self, include_version_stages: bool = False, version_id: Optional[str] = None
self,
include_version_stages: bool = False,
version_id: Optional[str] = None,
include_version_id: bool = True,
) -> str:
if not version_id:
version_id = self.default_version_id
dct = {
"ARN": self.arn,
"Name": self.name,
"VersionId": version_id,
}
if include_version_stages:
if include_version_id and version_id:
dct["VersionId"] = version_id
if version_id and include_version_stages:
dct["VersionStages"] = self.versions[version_id]["version_stages"]
return json.dumps(dct)
def to_dict(self) -> Dict[str, Any]:
version_id_to_stages = self._form_version_ids_to_stages()
return {
dct: Dict[str, Any] = {
"ARN": self.arn,
"Name": self.name,
"Description": self.description or "",
"KmsKeyId": self.kms_key_id,
"RotationEnabled": self.rotation_enabled,
"RotationLambdaARN": self.rotation_lambda_arn,
"RotationRules": {"AutomaticallyAfterDays": self.auto_rotate_after_days},
"LastRotatedDate": self.last_rotation_date,
"LastChangedDate": self.last_changed_date,
"LastAccessedDate": None,
"NextRotationDate": self.next_rotation_date,
"DeletedDate": self.deleted_date,
"Tags": self.tags,
"VersionIdsToStages": version_id_to_stages,
"SecretVersionsToStages": version_id_to_stages,
"CreatedDate": self.created_date,
}
if self.tags:
dct["Tags"] = self.tags
if self.description:
dct["Description"] = self.description
if self.versions:
dct.update(
{
# Key used by describe_secret
"VersionIdsToStages": version_id_to_stages,
# Key used by list_secrets
"SecretVersionsToStages": version_id_to_stages,
}
)
if self.rotation_requested:
dct.update(
{
"RotationEnabled": self.rotation_enabled,
"RotationLambdaARN": self.rotation_lambda_arn,
"RotationRules": {
"AutomaticallyAfterDays": self.auto_rotate_after_days
},
"LastRotatedDate": self.last_rotation_date,
}
)
return dct
def _form_version_ids_to_stages(self) -> Dict[str, str]:
version_id_to_stages = {}
@ -296,6 +323,7 @@ class SecretsManagerBackend(BaseBackend):
):
raise SecretStageVersionMismatchException()
version_id_provided = version_id is not None
if not version_id and version_stage:
# set version_id to match version_stage
versions_dict = self.secrets[secret_id].versions
@ -314,13 +342,13 @@ class SecretsManagerBackend(BaseBackend):
)
secret = self.secrets[secret_id]
version_id = version_id or secret.default_version_id
version_id = version_id or secret.default_version_id or "AWSCURRENT"
secret_version = secret.versions.get(version_id)
if not secret_version:
_type = "staging label" if not version_id_provided else "VersionId"
raise ResourceNotFoundException(
"An error occurred (ResourceNotFoundException) when calling the GetSecretValue operation: Secrets "
f"Manager can't find the specified secret value for VersionId: {version_id}"
f"Secrets Manager can't find the specified secret value for {_type}: {version_id}"
)
response_data = {
@ -369,7 +397,7 @@ class SecretsManagerBackend(BaseBackend):
tags = secret.tags
description = description or secret.description
secret = self._add_secret(
secret, new_version = self._add_secret(
secret_id,
secret_string=secret_string,
secret_binary=secret_binary,
@ -379,7 +407,7 @@ class SecretsManagerBackend(BaseBackend):
kms_key_id=kms_key_id,
)
return secret.to_short_dict()
return secret.to_short_dict(include_version_id=new_version)
def create_secret(
self,
@ -398,7 +426,7 @@ class SecretsManagerBackend(BaseBackend):
"A resource with the ID you requested already exists."
)
secret = self._add_secret(
secret, new_version = self._add_secret(
name,
secret_string=secret_string,
secret_binary=secret_binary,
@ -408,7 +436,7 @@ class SecretsManagerBackend(BaseBackend):
version_id=client_request_token,
)
return secret.to_short_dict()
return secret.to_short_dict(include_version_id=new_version)
def _add_secret(
self,
@ -420,7 +448,7 @@ class SecretsManagerBackend(BaseBackend):
kms_key_id: Optional[str] = None,
version_id: Optional[str] = None,
version_stages: Optional[List[str]] = None,
) -> FakeSecret:
) -> Tuple[FakeSecret, bool]:
if version_stages is None:
version_stages = ["AWSCURRENT"]
@ -438,12 +466,15 @@ class SecretsManagerBackend(BaseBackend):
if secret_binary is not None:
secret_version["secret_binary"] = secret_binary
new_version = secret_string is not None or secret_binary is not None
update_time = int(time.time())
if secret_id in self.secrets:
secret = self.secrets[secret_id]
secret.update(description, tags, kms_key_id, last_changed_date=update_time)
if new_version:
if "AWSCURRENT" in version_stages:
secret.reset_default_version(secret_version, version_id)
else:
@ -461,12 +492,12 @@ class SecretsManagerBackend(BaseBackend):
kms_key_id=kms_key_id,
last_changed_date=update_time,
created_date=update_time,
version_id=version_id,
secret_version=secret_version,
)
secret.set_versions({version_id: secret_version})
secret.set_default_version_id(version_id)
self.secrets[secret_id] = secret
return secret
return secret, new_version
def put_secret_value(
self,
@ -486,7 +517,7 @@ class SecretsManagerBackend(BaseBackend):
version_id = self._from_client_request_token(client_request_token)
secret = self._add_secret(
secret, _ = self._add_secret(
secret_id,
secret_string,
secret_binary,
@ -513,7 +544,6 @@ class SecretsManagerBackend(BaseBackend):
rotation_lambda_arn: Optional[str] = None,
rotation_rules: Optional[Dict[str, Any]] = None,
) -> str:
rotation_days = "AutomaticallyAfterDays"
if not self._is_valid_identifier(secret_id):
@ -569,7 +599,8 @@ class SecretsManagerBackend(BaseBackend):
# Pending is not present in any version
pass
old_secret_version = secret.versions[secret.default_version_id]
if secret.versions:
old_secret_version = secret.versions[secret.default_version_id] # type: ignore
if client_request_token:
self._client_request_token_validator(client_request_token)
@ -593,6 +624,8 @@ class SecretsManagerBackend(BaseBackend):
version_id=new_version_id,
version_stages=["AWSPENDING"],
)
secret.rotation_requested = True
secret.rotation_lambda_arn = rotation_lambda_arn or ""
if rotation_rules:
secret.auto_rotate_after_days = rotation_rules.get(rotation_days, 0)
@ -628,11 +661,15 @@ class SecretsManagerBackend(BaseBackend):
)
secret.set_default_version_id(new_version_id)
else:
elif secret.versions:
# AWS will always require a Lambda ARN
# without that, Moto can still apply the 'AWSCURRENT'-label
# This only makes sense if we have a version
secret.reset_default_version(
secret.versions[new_version_id], new_version_id
)
secret.versions[new_version_id]["version_stages"] = ["AWSCURRENT"]
self.secrets[secret_id].last_rotation_date = int(time.time())
return secret.to_short_dict()
@ -741,11 +778,8 @@ class SecretsManagerBackend(BaseBackend):
if not force_delete_without_recovery:
raise SecretNotFoundException()
else:
unknown_secret = FakeSecret(
self.account_id, self.region_name, secret_id
)
arn = unknown_secret.arn
name = unknown_secret.name
arn = secret_arn(self.account_id, self.region_name, secret_id=secret_id)
name = secret_id
deletion_date = datetime.datetime.utcnow()
return arn, name, self._unix_time_secs(deletion_date)
else:

View File

@ -14,7 +14,7 @@ import pytest
from moto import mock_secretsmanager, mock_lambda, settings
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
DEFAULT_SECRET_NAME = "test-secret"
DEFAULT_SECRET_NAME = "test-secret7"
@mock_secretsmanager
@ -124,10 +124,10 @@ def test_get_secret_value_that_is_marked_deleted():
def test_get_secret_that_has_no_value():
conn = boto3.client("secretsmanager", region_name="us-west-2")
conn.create_secret(Name="java-util-test-password")
conn.create_secret(Name="secret-no-value")
with pytest.raises(ClientError) as cm:
conn.get_secret_value(SecretId="java-util-test-password")
conn.get_secret_value(SecretId="secret-no-value")
assert (
"Secrets Manager can't find the specified secret value for staging label: AWSCURRENT"
@ -139,7 +139,7 @@ def test_get_secret_that_has_no_value():
def test_get_secret_version_that_does_not_exist():
conn = boto3.client("secretsmanager", region_name="us-west-2")
result = conn.create_secret(Name="java-util-test-password")
result = conn.create_secret(Name="java-util-test-password", SecretString="v")
secret_arn = result["ARN"]
missing_version_id = "00000000-0000-0000-0000-000000000000"
@ -147,8 +147,7 @@ def test_get_secret_version_that_does_not_exist():
conn.get_secret_value(SecretId=secret_arn, VersionId=missing_version_id)
assert (
"An error occurred (ResourceNotFoundException) when calling the "
"GetSecretValue operation: Secrets Manager can't find the specified "
"Secrets Manager can't find the specified "
"secret value for VersionId: 00000000-0000-0000-0000-000000000000"
) == cm.value.response["Error"]["Message"]
@ -251,6 +250,92 @@ def test_create_secret_with_tags_and_description():
assert secret_details["Description"] == "desc"
@mock_secretsmanager
def test_create_secret_without_value():
conn = boto3.client("secretsmanager", region_name="us-east-2")
secret_name = f"secret-{str(uuid4())[0:6]}"
create = conn.create_secret(Name=secret_name)
assert set(create.keys()) == {"ARN", "Name", "ResponseMetadata"}
describe = conn.describe_secret(SecretId=secret_name)
assert set(describe.keys()) == {
"ARN",
"Name",
"LastChangedDate",
"CreatedDate",
"ResponseMetadata",
}
with pytest.raises(ClientError) as exc:
conn.get_secret_value(SecretId=secret_name)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
updated = conn.update_secret(
SecretId=secret_name,
Description="new desc",
)
assert set(updated.keys()) == {"ARN", "Name", "ResponseMetadata"}
describe = conn.describe_secret(SecretId=secret_name)
assert set(describe.keys()) == {
"ARN",
"Name",
"Description",
"LastChangedDate",
"CreatedDate",
"ResponseMetadata",
}
deleted = conn.delete_secret(SecretId=secret_name)
assert set(deleted.keys()) == {"ARN", "Name", "DeletionDate", "ResponseMetadata"}
@mock_secretsmanager
def test_update_secret_without_value():
conn = boto3.client("secretsmanager", region_name="us-east-2")
secret_name = f"secret-{str(uuid4())[0:6]}"
create = conn.create_secret(Name=secret_name, SecretString="foosecret")
assert set(create.keys()) == {"ARN", "Name", "VersionId", "ResponseMetadata"}
version_id = create["VersionId"]
describe1 = conn.describe_secret(SecretId=secret_name)
assert set(describe1.keys()) == {
"ARN",
"Name",
"LastChangedDate",
"VersionIdsToStages",
"CreatedDate",
"ResponseMetadata",
}
conn.get_secret_value(SecretId=secret_name)
updated = conn.update_secret(SecretId=secret_name, Description="desc")
assert set(updated.keys()) == {"ARN", "Name", "ResponseMetadata"}
describe2 = conn.describe_secret(SecretId=secret_name)
# AWS also includes 'LastAccessedDate'
assert set(describe2.keys()) == {
"ARN",
"Name",
"Description",
"LastChangedDate",
"VersionIdsToStages",
"CreatedDate",
"ResponseMetadata",
}
assert describe1["VersionIdsToStages"] == describe2["VersionIdsToStages"]
value = conn.get_secret_value(SecretId=secret_name)
assert value["SecretString"] == "foosecret"
assert value["VersionId"] == version_id
conn.delete_secret(SecretId=secret_name)
@mock_secretsmanager
def test_delete_secret():
conn = boto3.client("secretsmanager", region_name="us-west-2")
@ -302,7 +387,7 @@ def test_delete_secret_force():
assert result["Name"] == "test-secret"
with pytest.raises(ClientError):
result = conn.get_secret_value(SecretId="test-secret")
conn.get_secret_value(SecretId="test-secret")
@mock_secretsmanager
@ -331,7 +416,7 @@ def test_delete_secret_force_with_arn():
assert result["Name"] == "test-secret"
with pytest.raises(ClientError):
result = conn.get_secret_value(SecretId="test-secret")
conn.get_secret_value(SecretId="test-secret")
@mock_secretsmanager
@ -756,16 +841,17 @@ def test_rotate_secret():
@mock_secretsmanager
def test_rotate_secret_without_secretstring():
conn = boto3.client("secretsmanager", region_name="us-west-2")
# This test just verifies that Moto does not fail
conn = boto3.client("secretsmanager", region_name="us-east-2")
conn.create_secret(Name=DEFAULT_SECRET_NAME, Description="foodescription")
# AWS will always require a Lambda ARN to do the actual rotating
rotated_secret = conn.rotate_secret(SecretId=DEFAULT_SECRET_NAME)
assert rotated_secret
assert rotated_secret["ARN"] == rotated_secret["ARN"]
assert rotated_secret["Name"] == DEFAULT_SECRET_NAME
assert rotated_secret["VersionId"] == rotated_secret["VersionId"]
# Without secret-value, and without actual rotating, we can't verify much
# Just that the secret exists/can be described
# We cannot verify any versions info (as that is not created without a secret-value)
describe_secret = conn.describe_secret(SecretId=DEFAULT_SECRET_NAME)
assert describe_secret["Description"] == "foodescription"
@ -776,9 +862,7 @@ def test_rotate_secret_enable_rotation():
conn.create_secret(Name=DEFAULT_SECRET_NAME, SecretString="foosecret")
initial_description = conn.describe_secret(SecretId=DEFAULT_SECRET_NAME)
assert initial_description
assert initial_description["RotationEnabled"] is False
assert initial_description["RotationRules"]["AutomaticallyAfterDays"] == 0
assert "RotationEnabled" not in initial_description
conn.rotate_secret(
SecretId=DEFAULT_SECRET_NAME, RotationRules={"AutomaticallyAfterDays": 42}