Fix: Adding ClientRequestToken for SecretsManager update_secret method (#4314)
This commit is contained in:
parent
82158096d6
commit
c65d4ddc3b
@ -189,6 +189,12 @@ class SecretsManagerBackend(BaseBackend):
|
|||||||
epoch = datetime.datetime.utcfromtimestamp(0)
|
epoch = datetime.datetime.utcfromtimestamp(0)
|
||||||
return (dt - epoch).total_seconds()
|
return (dt - epoch).total_seconds()
|
||||||
|
|
||||||
|
def _client_request_token_validator(self, client_request_token):
|
||||||
|
token_length = len(client_request_token)
|
||||||
|
if token_length < 32 or token_length > 64:
|
||||||
|
msg = "ClientRequestToken must be 32-64 characters long."
|
||||||
|
raise InvalidParameterException(msg)
|
||||||
|
|
||||||
def get_secret_value(self, secret_id, version_id, version_stage):
|
def get_secret_value(self, secret_id, version_id, version_stage):
|
||||||
if not self._is_valid_identifier(secret_id):
|
if not self._is_valid_identifier(secret_id):
|
||||||
raise SecretNotFoundException()
|
raise SecretNotFoundException()
|
||||||
@ -251,6 +257,7 @@ class SecretsManagerBackend(BaseBackend):
|
|||||||
secret_id,
|
secret_id,
|
||||||
secret_string=None,
|
secret_string=None,
|
||||||
secret_binary=None,
|
secret_binary=None,
|
||||||
|
client_request_token=None,
|
||||||
kms_key_id=None,
|
kms_key_id=None,
|
||||||
**kwargs
|
**kwargs
|
||||||
):
|
):
|
||||||
@ -274,6 +281,7 @@ class SecretsManagerBackend(BaseBackend):
|
|||||||
secret_string=secret_string,
|
secret_string=secret_string,
|
||||||
secret_binary=secret_binary,
|
secret_binary=secret_binary,
|
||||||
description=description,
|
description=description,
|
||||||
|
version_id=client_request_token,
|
||||||
tags=tags,
|
tags=tags,
|
||||||
kms_key_id=kms_key_id,
|
kms_key_id=kms_key_id,
|
||||||
)
|
)
|
||||||
@ -322,7 +330,9 @@ class SecretsManagerBackend(BaseBackend):
|
|||||||
if version_stages is None:
|
if version_stages is None:
|
||||||
version_stages = ["AWSCURRENT"]
|
version_stages = ["AWSCURRENT"]
|
||||||
|
|
||||||
if not version_id:
|
if version_id:
|
||||||
|
self._client_request_token_validator(version_id)
|
||||||
|
else:
|
||||||
version_id = str(uuid.uuid4())
|
version_id = str(uuid.uuid4())
|
||||||
|
|
||||||
secret_version = {
|
secret_version = {
|
||||||
@ -416,12 +426,6 @@ class SecretsManagerBackend(BaseBackend):
|
|||||||
perform the operation on a secret that's currently marked deleted."
|
perform the operation on a secret that's currently marked deleted."
|
||||||
)
|
)
|
||||||
|
|
||||||
if client_request_token:
|
|
||||||
token_length = len(client_request_token)
|
|
||||||
if token_length < 32 or token_length > 64:
|
|
||||||
msg = "ClientRequestToken " "must be 32-64 characters long."
|
|
||||||
raise InvalidParameterException(msg)
|
|
||||||
|
|
||||||
if rotation_lambda_arn:
|
if rotation_lambda_arn:
|
||||||
if len(rotation_lambda_arn) > 2048:
|
if len(rotation_lambda_arn) > 2048:
|
||||||
msg = "RotationLambdaARN " "must <= 2048 characters long."
|
msg = "RotationLambdaARN " "must <= 2048 characters long."
|
||||||
@ -463,7 +467,12 @@ class SecretsManagerBackend(BaseBackend):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
old_secret_version = secret.versions[secret.default_version_id]
|
old_secret_version = secret.versions[secret.default_version_id]
|
||||||
new_version_id = client_request_token or str(uuid.uuid4())
|
|
||||||
|
if client_request_token:
|
||||||
|
self._client_request_token_validator(client_request_token)
|
||||||
|
new_version_id = client_request_token
|
||||||
|
else:
|
||||||
|
new_version_id = str(uuid.uuid4())
|
||||||
|
|
||||||
# We add the new secret version as "pending". The previous version remains
|
# We add the new secret version as "pending". The previous version remains
|
||||||
# as "current" for now. Once we've passed the new secret through the lambda
|
# as "current" for now. Once we've passed the new secret through the lambda
|
||||||
|
@ -60,11 +60,13 @@ class SecretsManagerResponse(BaseResponse):
|
|||||||
secret_id = self._get_param("SecretId")
|
secret_id = self._get_param("SecretId")
|
||||||
secret_string = self._get_param("SecretString")
|
secret_string = self._get_param("SecretString")
|
||||||
secret_binary = self._get_param("SecretBinary")
|
secret_binary = self._get_param("SecretBinary")
|
||||||
|
client_request_token = self._get_param("ClientRequestToken")
|
||||||
kms_key_id = self._get_param("KmsKeyId", if_none=None)
|
kms_key_id = self._get_param("KmsKeyId", if_none=None)
|
||||||
return secretsmanager_backends[self.region].update_secret(
|
return secretsmanager_backends[self.region].update_secret(
|
||||||
secret_id=secret_id,
|
secret_id=secret_id,
|
||||||
secret_string=secret_string,
|
secret_string=secret_string,
|
||||||
secret_binary=secret_binary,
|
secret_binary=secret_binary,
|
||||||
|
client_request_token=client_request_token,
|
||||||
kms_key_id=kms_key_id,
|
kms_key_id=kms_key_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -5,10 +5,11 @@ import boto3
|
|||||||
|
|
||||||
from moto import mock_secretsmanager, mock_lambda, settings
|
from moto import mock_secretsmanager, mock_lambda, settings
|
||||||
from moto.core import ACCOUNT_ID
|
from moto.core import ACCOUNT_ID
|
||||||
from botocore.exceptions import ClientError
|
from botocore.exceptions import ClientError, ParamValidationError
|
||||||
import string
|
import string
|
||||||
import pytz
|
import pytz
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from uuid import uuid4
|
||||||
import sure # noqa
|
import sure # noqa
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
@ -1184,3 +1185,33 @@ def test_secret_versions_to_stages_attribute_discrepancy():
|
|||||||
assert list_vtos[previous_version_id] == ["AWSPREVIOUS"]
|
assert list_vtos[previous_version_id] == ["AWSPREVIOUS"]
|
||||||
|
|
||||||
assert describe_vtos == list_vtos
|
assert describe_vtos == list_vtos
|
||||||
|
|
||||||
|
|
||||||
|
@mock_secretsmanager
|
||||||
|
def test_update_secret_with_client_request_token():
|
||||||
|
client = boto3.client("secretsmanager", region_name="us-west-2")
|
||||||
|
secret_name = "test-secret"
|
||||||
|
client_request_token = str(uuid4())
|
||||||
|
|
||||||
|
client.create_secret(Name=secret_name, SecretString="first-secret")
|
||||||
|
updated_secret = client.update_secret(
|
||||||
|
SecretId=secret_name,
|
||||||
|
SecretString="second-secret",
|
||||||
|
ClientRequestToken=client_request_token,
|
||||||
|
)
|
||||||
|
assert client_request_token == updated_secret["VersionId"]
|
||||||
|
updated_secret = client.update_secret(
|
||||||
|
SecretId=secret_name, SecretString="third-secret",
|
||||||
|
)
|
||||||
|
assert client_request_token != updated_secret["VersionId"]
|
||||||
|
invalid_request_token = "test-token"
|
||||||
|
with pytest.raises(ParamValidationError) as pve:
|
||||||
|
client.update_secret(
|
||||||
|
SecretId=secret_name,
|
||||||
|
SecretString="fourth-secret",
|
||||||
|
ClientRequestToken=invalid_request_token,
|
||||||
|
)
|
||||||
|
pve.value.response["Error"]["Code"].should.equal("InvalidParameterException")
|
||||||
|
pve.value.response["Error"]["Message"].should.equal(
|
||||||
|
"ClientRequestToken must be 32-64 characters long."
|
||||||
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user