Update get_secret_value to properly check versions and stages match (#5238)
This commit is contained in:
parent
58edb5b840
commit
836a2e6538
@ -30,6 +30,15 @@ class SecretHasNoValueException(SecretsManagerClientError):
|
||||
)
|
||||
|
||||
|
||||
class SecretStageVersionMismatchException(SecretsManagerClientError):
|
||||
def __init__(self):
|
||||
self.code = 404
|
||||
super().__init__(
|
||||
"InvalidRequestException",
|
||||
message="You provided a VersionStage that is not associated to the provided VersionId.",
|
||||
)
|
||||
|
||||
|
||||
class ClientError(SecretsManagerClientError):
|
||||
def __init__(self, message):
|
||||
super().__init__("InvalidParameterValue", message)
|
||||
|
@ -13,6 +13,7 @@ from .exceptions import (
|
||||
InvalidParameterException,
|
||||
ResourceExistsException,
|
||||
ResourceNotFoundException,
|
||||
SecretStageVersionMismatchException,
|
||||
InvalidRequestException,
|
||||
ClientError,
|
||||
)
|
||||
@ -226,6 +227,14 @@ class SecretsManagerBackend(BaseBackend):
|
||||
if not self._is_valid_identifier(secret_id):
|
||||
raise SecretNotFoundException()
|
||||
|
||||
if version_id and version_stage:
|
||||
versions_dict = self.secrets[secret_id].versions
|
||||
if (
|
||||
version_id in versions_dict
|
||||
and version_stage not in versions_dict[version_id]["version_stages"]
|
||||
):
|
||||
raise SecretStageVersionMismatchException()
|
||||
|
||||
if not version_id and version_stage:
|
||||
# set version_id to match version_stage
|
||||
versions_dict = self.secrets[secret_id].versions
|
||||
|
@ -153,6 +153,33 @@ def test_get_secret_version_that_does_not_exist():
|
||||
) == cm.value.response["Error"]["Message"]
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_get_secret_version_stage_mismatch():
|
||||
conn = boto3.client("secretsmanager", region_name="us-west-2")
|
||||
|
||||
result = conn.create_secret(Name="test-secret", SecretString="secret")
|
||||
secret_arn = result["ARN"]
|
||||
|
||||
rotated_secret = conn.rotate_secret(
|
||||
SecretId=secret_arn, RotationRules={"AutomaticallyAfterDays": 42}
|
||||
)
|
||||
|
||||
desc_secret = conn.describe_secret(SecretId=secret_arn)
|
||||
versions_to_stages = desc_secret["VersionIdsToStages"]
|
||||
version_for_test = rotated_secret["VersionId"]
|
||||
stages_for_version = versions_to_stages[version_for_test]
|
||||
|
||||
assert "AWSPENDING" not in stages_for_version
|
||||
with pytest.raises(ClientError) as cm:
|
||||
conn.get_secret_value(
|
||||
SecretId=secret_arn, VersionId=version_for_test, VersionStage="AWSPENDING"
|
||||
)
|
||||
|
||||
assert (
|
||||
"You provided a VersionStage that is not associated to the provided VersionId."
|
||||
) == cm.value.response["Error"]["Message"]
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_create_secret():
|
||||
conn = boto3.client("secretsmanager", region_name="us-east-1")
|
||||
|
@ -618,7 +618,7 @@ def test_put_secret_value_can_get_first_version_if_put_twice():
|
||||
data={
|
||||
"SecretId": DEFAULT_SECRET_NAME,
|
||||
"VersionId": first_secret_version_id,
|
||||
"VersionStage": "AWSCURRENT",
|
||||
"VersionStage": "AWSPREVIOUS",
|
||||
},
|
||||
headers={"X-Amz-Target": "secretsmanager.GetSecretValue"},
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user