[LocalStack] Fixes to secretsmanager's PutSecretValue, CreateSecret, DeleteSecret (#4851)

This commit is contained in:
MEP 2022-02-11 13:49:14 +01:00 committed by GitHub
parent dce8cc0c04
commit 7194456d0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 211 additions and 41 deletions

View File

@ -120,14 +120,16 @@ class FakeSecret:
def is_deleted(self):
return self.deleted_date is not None
def to_short_dict(self, include_version_stages=False):
def to_short_dict(self, include_version_stages=False, version_id=None):
if not version_id:
version_id = self.default_version_id
dct = {
"ARN": self.arn,
"Name": self.name,
"VersionId": self.default_version_id,
"VersionId": version_id,
}
if include_version_stages:
dct["VersionStages"] = self.version_stages
dct["VersionStages"] = self.versions[version_id]["version_stages"]
return json.dumps(dct)
def to_dict(self):
@ -208,6 +210,14 @@ class SecretsManagerBackend(BaseBackend):
msg = "ClientRequestToken must be 32-64 characters long."
raise InvalidParameterException(msg)
def _from_client_request_token(self, client_request_token):
version_id = client_request_token
if version_id:
self._client_request_token_validator(version_id)
else:
version_id = str(uuid.uuid4())
return version_id
def get_secret_value(self, secret_id, version_id, version_stage):
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
@ -309,6 +319,7 @@ class SecretsManagerBackend(BaseBackend):
description=None,
tags=None,
kms_key_id=None,
client_request_token=None,
):
# error if secret exists
@ -324,6 +335,7 @@ class SecretsManagerBackend(BaseBackend):
description=description,
tags=tags,
kms_key_id=kms_key_id,
version_id=client_request_token,
)
return secret.to_short_dict()
@ -343,10 +355,7 @@ class SecretsManagerBackend(BaseBackend):
if version_stages is None:
version_stages = ["AWSCURRENT"]
if version_id:
self._client_request_token_validator(version_id)
else:
version_id = str(uuid.uuid4())
version_id = self._from_client_request_token(version_id)
secret_version = {
"createdate": int(time.time()),
@ -365,10 +374,10 @@ class SecretsManagerBackend(BaseBackend):
secret.update(description, tags, kms_key_id, last_changed_date=update_time)
if "AWSPENDING" in version_stages:
secret.versions[version_id] = secret_version
else:
if "AWSCURRENT" in version_stages:
secret.reset_default_version(secret_version, version_id)
else:
secret.versions[version_id] = secret_version
else:
secret = FakeSecret(
region_name=self.region,
@ -403,17 +412,19 @@ class SecretsManagerBackend(BaseBackend):
tags = secret.tags
description = secret.description
version_id = self._from_client_request_token(client_request_token)
secret = self._add_secret(
secret_id,
secret_string,
secret_binary,
version_id=client_request_token,
version_id=version_id,
description=description,
tags=tags,
version_stages=version_stages,
)
return secret.to_short_dict(include_version_stages=True)
return secret.to_short_dict(include_version_stages=True, version_id=version_id)
def describe_secret(self, secret_id):
if not self._is_valid_identifier(secret_id):
@ -658,21 +669,6 @@ class SecretsManagerBackend(BaseBackend):
self, secret_id, recovery_window_in_days, force_delete_without_recovery
):
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
if self.secrets[secret_id].is_deleted():
raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \
perform the operation on a secret that's currently marked deleted."
)
if recovery_window_in_days and force_delete_without_recovery:
raise InvalidParameterException(
"An error occurred (InvalidParameterException) when calling the DeleteSecret operation: You can't \
use ForceDeleteWithoutRecovery in conjunction with RecoveryWindowInDays."
)
if recovery_window_in_days and (
recovery_window_in_days < 7 or recovery_window_in_days > 30
):
@ -681,22 +677,44 @@ class SecretsManagerBackend(BaseBackend):
RecoveryWindowInDays value must be between 7 and 30 days (inclusive)."
)
deletion_date = datetime.datetime.utcnow()
if recovery_window_in_days and force_delete_without_recovery:
raise InvalidParameterException(
"An error occurred (InvalidParameterException) when calling the DeleteSecret operation: You can't \
use ForceDeleteWithoutRecovery in conjunction with RecoveryWindowInDays."
)
if force_delete_without_recovery:
secret = self.secrets.pop(secret_id, None)
if not self._is_valid_identifier(secret_id):
if not force_delete_without_recovery:
raise SecretNotFoundException()
else:
secret = FakeSecret(self.region, secret_id)
arn = secret.arn
name = secret.name
deletion_date = datetime.datetime.utcnow()
return arn, name, self._unix_time_secs(deletion_date)
else:
deletion_date += datetime.timedelta(days=recovery_window_in_days or 30)
self.secrets[secret_id].delete(self._unix_time_secs(deletion_date))
secret = self.secrets.get(secret_id, None)
if self.secrets[secret_id].is_deleted():
raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \
perform the operation on a secret that's currently marked deleted."
)
if not secret:
raise SecretNotFoundException()
deletion_date = datetime.datetime.utcnow()
arn = secret.arn
name = secret.name
if force_delete_without_recovery:
secret = self.secrets.pop(secret_id, None)
else:
deletion_date += datetime.timedelta(days=recovery_window_in_days or 30)
self.secrets[secret_id].delete(self._unix_time_secs(deletion_date))
secret = self.secrets.get(secret_id, None)
return arn, name, self._unix_time_secs(deletion_date)
if not secret:
raise SecretNotFoundException()
arn = secret.arn
name = secret.name
return arn, name, self._unix_time_secs(deletion_date)
def restore_secret(self, secret_id):

View File

@ -45,6 +45,7 @@ class SecretsManagerResponse(BaseResponse):
description = self._get_param("Description", if_none="")
tags = self._get_param("Tags", if_none=[])
kms_key_id = self._get_param("KmsKeyId", if_none=None)
client_request_token = self._get_param("ClientRequestToken", if_none=None)
return secretsmanager_backends[self.region].create_secret(
name=name,
secret_string=secret_string,
@ -52,6 +53,7 @@ class SecretsManagerResponse(BaseResponse):
description=description,
tags=tags,
kms_key_id=kms_key_id,
client_request_token=client_request_token,
)
def update_secret(self):

View File

@ -26,6 +26,20 @@ def test_get_secret_value():
assert result["SecretString"] == "foosecret"
@mock_secretsmanager
def test_create_secret_with_client_request_token():
conn = boto3.client("secretsmanager", region_name="us-west-2")
version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce71"
create_dict = conn.create_secret(
Name=DEFAULT_SECRET_NAME,
SecretString="secret_string",
ClientRequestToken=version_id,
)
assert create_dict
assert create_dict["VersionId"] == version_id
@mock_secretsmanager
def test_get_secret_value_by_arn():
conn = boto3.client("secretsmanager", region_name="us-west-2")
@ -228,6 +242,17 @@ def test_delete_secret_force():
result = conn.get_secret_value(SecretId="test-secret")
@mock_secretsmanager
def test_delete_secret_force_no_such_secret():
conn = boto3.client("secretsmanager", region_name="us-west-2")
deleted_secret = conn.delete_secret(
SecretId=DEFAULT_SECRET_NAME, ForceDeleteWithoutRecovery=True
)
assert deleted_secret
assert deleted_secret["Name"] == DEFAULT_SECRET_NAME
@mock_secretsmanager
def test_delete_secret_force_with_arn():
conn = boto3.client("secretsmanager", region_name="us-west-2")
@ -251,7 +276,7 @@ def test_delete_secret_that_does_not_exist():
conn = boto3.client("secretsmanager", region_name="us-west-2")
with pytest.raises(ClientError):
conn.delete_secret(SecretId="i-dont-exist", ForceDeleteWithoutRecovery=True)
conn.delete_secret(SecretId="i-dont-exist")
@mock_secretsmanager
@ -288,6 +313,18 @@ def test_delete_secret_recovery_window_too_long():
conn.delete_secret(SecretId="test-secret", RecoveryWindowInDays=31)
@mock_secretsmanager
def test_delete_secret_force_no_such_secret_with_invalid_recovery_window():
conn = boto3.client("secretsmanager", region_name="us-west-2")
with pytest.raises(ClientError):
conn.delete_secret(
SecretId=DEFAULT_SECRET_NAME,
ForceDeleteWithoutRecovery=True,
RecoveryWindowInDays=4,
)
@mock_secretsmanager
def test_delete_secret_that_is_marked_deleted():
conn = boto3.client("secretsmanager", region_name="us-west-2")
@ -952,6 +989,118 @@ def test_can_list_secret_version_ids():
assert [first_version_id, second_version_id].sort() == returned_version_ids.sort()
@mock_secretsmanager
def test_put_secret_value_version_stages_response():
conn = boto3.client("secretsmanager", region_name="us-west-2")
# Creation.
first_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce71"
conn.create_secret(
Name=DEFAULT_SECRET_NAME,
SecretString="first_secret_string",
ClientRequestToken=first_version_id,
)
# Use PutSecretValue to push a new version with new version stages.
second_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce72"
second_version_stages = ["SAMPLESTAGE1", "SAMPLESTAGE0"]
second_put_res_dict = conn.put_secret_value(
SecretId=DEFAULT_SECRET_NAME,
SecretString="second_secret_string",
VersionStages=second_version_stages,
ClientRequestToken=second_version_id,
)
assert second_put_res_dict
assert second_put_res_dict["VersionId"] == second_version_id
assert second_put_res_dict["VersionStages"] == second_version_stages
@mock_secretsmanager
def test_put_secret_value_version_stages_pending_response():
conn = boto3.client("secretsmanager", region_name="us-west-2")
# Creation.
first_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce71"
conn.create_secret(
Name=DEFAULT_SECRET_NAME,
SecretString="first_secret_string",
ClientRequestToken=first_version_id,
)
# Use PutSecretValue to push a new version with new version stages.
second_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce72"
second_version_stages = ["AWSPENDING"]
second_put_res_dict = conn.put_secret_value(
SecretId=DEFAULT_SECRET_NAME,
SecretString="second_secret_string",
VersionStages=second_version_stages,
ClientRequestToken=second_version_id,
)
assert second_put_res_dict
assert second_put_res_dict["VersionId"] == second_version_id
assert second_put_res_dict["VersionStages"] == second_version_stages
@mock_secretsmanager
def test_after_put_secret_value_version_stages_can_get_current():
conn = boto3.client("secretsmanager", region_name="us-west-2")
# Creation.
first_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce71"
first_secret_string = "first_secret_string"
conn.create_secret(
Name=DEFAULT_SECRET_NAME,
SecretString=first_secret_string,
ClientRequestToken=first_version_id,
)
# Use PutSecretValue to push a new version with new version stages.
second_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce72"
conn.put_secret_value(
SecretId=DEFAULT_SECRET_NAME,
SecretString="second_secret_string",
VersionStages=["SAMPLESTAGE1", "SAMPLESTAGE0"],
ClientRequestToken=second_version_id,
)
# Get current.
get_dict = conn.get_secret_value(SecretId=DEFAULT_SECRET_NAME)
assert get_dict
assert get_dict["VersionId"] == first_version_id
assert get_dict["SecretString"] == first_secret_string
assert get_dict["VersionStages"] == ["AWSCURRENT"]
@mock_secretsmanager
def test_after_put_secret_value_version_stages_pending_can_get_current():
conn = boto3.client("secretsmanager", region_name="us-west-2")
# Creation.
first_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce71"
first_secret_string = "first_secret_string"
conn.create_secret(
Name=DEFAULT_SECRET_NAME,
SecretString=first_secret_string,
ClientRequestToken=first_version_id,
)
# Use PutSecretValue to push a new version with new version stages.
pending_version_id = "eb41453f-25bb-4025-b7f4-850cfca0ce72"
conn.put_secret_value(
SecretId=DEFAULT_SECRET_NAME,
SecretString="second_secret_string",
VersionStages=["AWSPENDING"],
ClientRequestToken=pending_version_id,
)
# Get current.
get_dict = conn.get_secret_value(SecretId=DEFAULT_SECRET_NAME)
assert get_dict
assert get_dict["VersionId"] == first_version_id
assert get_dict["SecretString"] == first_secret_string
assert get_dict["VersionStages"] == ["AWSCURRENT"]
@mock_secretsmanager
@pytest.mark.parametrize("pass_arn", [True, False])
def test_update_secret(pass_arn):

View File

@ -774,6 +774,7 @@ def test_update_secret_version_stage(pass_arn):
headers={"X-Amz-Target": "secretsmanager.PutSecretValue"},
)
put_secret = json.loads(put_secret.data.decode("utf-8"))
assert put_secret["VersionStages"] == [custom_stage]
new_version = put_secret["VersionId"]
describe_secret = test_client.post(
@ -785,7 +786,7 @@ def test_update_secret_version_stage(pass_arn):
json_data = json.loads(describe_secret.data.decode("utf-8"))
stages = json_data["SecretVersionsToStages"]
assert len(stages) == 2
assert stages[initial_version] == ["AWSPREVIOUS"]
assert stages[initial_version] == ["AWSCURRENT"]
assert stages[new_version] == [custom_stage]
test_client.post(
@ -808,7 +809,7 @@ def test_update_secret_version_stage(pass_arn):
json_data = json.loads(describe_secret.data.decode("utf-8"))
stages = json_data["SecretVersionsToStages"]
assert len(stages) == 2
assert stages[initial_version] == ["AWSPREVIOUS", custom_stage]
assert stages[initial_version] == ["AWSCURRENT", custom_stage]
assert stages[new_version] == []