parent
242de5bc6f
commit
0ec99fae8b
@ -61,6 +61,7 @@ class FakeSecret:
|
||||
secret_binary=None,
|
||||
description=None,
|
||||
tags=[],
|
||||
kms_key_id=None,
|
||||
version_id=None,
|
||||
version_stages=None,
|
||||
):
|
||||
@ -71,6 +72,7 @@ class FakeSecret:
|
||||
self.secret_binary = secret_binary
|
||||
self.description = description
|
||||
self.tags = tags
|
||||
self.kms_key_id = kms_key_id
|
||||
self.version_id = version_id
|
||||
self.version_stages = version_stages
|
||||
self.rotation_enabled = False
|
||||
@ -78,10 +80,13 @@ class FakeSecret:
|
||||
self.auto_rotate_after_days = 0
|
||||
self.deleted_date = None
|
||||
|
||||
def update(self, description=None, tags=[]):
|
||||
def update(self, description=None, tags=[], kms_key_id=None):
|
||||
self.description = description
|
||||
self.tags = tags
|
||||
|
||||
if kms_key_id is not None:
|
||||
self.kms_key_id = kms_key_id
|
||||
|
||||
def set_versions(self, versions):
|
||||
self.versions = versions
|
||||
|
||||
@ -127,7 +132,7 @@ class FakeSecret:
|
||||
"ARN": self.arn,
|
||||
"Name": self.name,
|
||||
"Description": self.description or "",
|
||||
"KmsKeyId": "",
|
||||
"KmsKeyId": self.kms_key_id,
|
||||
"RotationEnabled": self.rotation_enabled,
|
||||
"RotationLambdaARN": self.rotation_lambda_arn,
|
||||
"RotationRules": {"AutomaticallyAfterDays": self.auto_rotate_after_days},
|
||||
@ -242,7 +247,12 @@ class SecretsManagerBackend(BaseBackend):
|
||||
return response
|
||||
|
||||
def update_secret(
|
||||
self, secret_id, secret_string=None, secret_binary=None, **kwargs
|
||||
self,
|
||||
secret_id,
|
||||
secret_string=None,
|
||||
secret_binary=None,
|
||||
kms_key_id=None,
|
||||
**kwargs
|
||||
):
|
||||
|
||||
# error if secret does not exist
|
||||
@ -265,12 +275,19 @@ class SecretsManagerBackend(BaseBackend):
|
||||
secret_binary=secret_binary,
|
||||
description=description,
|
||||
tags=tags,
|
||||
kms_key_id=kms_key_id,
|
||||
)
|
||||
|
||||
return secret.to_short_dict()
|
||||
|
||||
def create_secret(
|
||||
self, name, secret_string=None, secret_binary=None, description=None, tags=[]
|
||||
self,
|
||||
name,
|
||||
secret_string=None,
|
||||
secret_binary=None,
|
||||
description=None,
|
||||
tags=[],
|
||||
kms_key_id=None,
|
||||
):
|
||||
|
||||
# error if secret exists
|
||||
@ -285,6 +302,7 @@ class SecretsManagerBackend(BaseBackend):
|
||||
secret_binary=secret_binary,
|
||||
description=description,
|
||||
tags=tags,
|
||||
kms_key_id=kms_key_id,
|
||||
)
|
||||
|
||||
return secret.to_short_dict()
|
||||
@ -296,6 +314,7 @@ class SecretsManagerBackend(BaseBackend):
|
||||
secret_binary=None,
|
||||
description=None,
|
||||
tags=[],
|
||||
kms_key_id=None,
|
||||
version_id=None,
|
||||
version_stages=None,
|
||||
):
|
||||
@ -319,7 +338,8 @@ class SecretsManagerBackend(BaseBackend):
|
||||
|
||||
if secret_id in self.secrets:
|
||||
secret = self.secrets[secret_id]
|
||||
secret.update(description, tags)
|
||||
|
||||
secret.update(description, tags, kms_key_id)
|
||||
|
||||
if "AWSPENDING" in version_stages:
|
||||
secret.versions[version_id] = secret_version
|
||||
@ -333,6 +353,7 @@ class SecretsManagerBackend(BaseBackend):
|
||||
secret_binary=secret_binary,
|
||||
description=description,
|
||||
tags=tags,
|
||||
kms_key_id=kms_key_id,
|
||||
)
|
||||
secret.set_versions({version_id: secret_version})
|
||||
secret.set_default_version_id(version_id)
|
||||
|
@ -46,22 +46,26 @@ class SecretsManagerResponse(BaseResponse):
|
||||
secret_binary = self._get_param("SecretBinary")
|
||||
description = self._get_param("Description", if_none="")
|
||||
tags = self._get_param("Tags", if_none=[])
|
||||
kms_key_id = self._get_param("KmsKeyId", if_none=None)
|
||||
return secretsmanager_backends[self.region].create_secret(
|
||||
name=name,
|
||||
secret_string=secret_string,
|
||||
secret_binary=secret_binary,
|
||||
description=description,
|
||||
tags=tags,
|
||||
kms_key_id=kms_key_id,
|
||||
)
|
||||
|
||||
def update_secret(self):
|
||||
secret_id = self._get_param("SecretId")
|
||||
secret_string = self._get_param("SecretString")
|
||||
secret_binary = self._get_param("SecretBinary")
|
||||
kms_key_id = self._get_param("KmsKeyId", if_none=None)
|
||||
return secretsmanager_backends[self.region].update_secret(
|
||||
secret_id=secret_id,
|
||||
secret_string=secret_string,
|
||||
secret_binary=secret_binary,
|
||||
kms_key_id=kms_key_id,
|
||||
)
|
||||
|
||||
def get_random_password(self):
|
||||
|
@ -455,6 +455,21 @@ def test_describe_secret_with_arn():
|
||||
conn.list_secrets()["SecretList"][0]["ARN"].should.equal(results["ARN"])
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_describe_secret_with_KmsKeyId():
|
||||
conn = boto3.client("secretsmanager", region_name="us-west-2")
|
||||
results = conn.create_secret(
|
||||
Name="test-secret", SecretString="foosecret", KmsKeyId="dummy_arn"
|
||||
)
|
||||
|
||||
secret_description = conn.describe_secret(SecretId=results["ARN"])
|
||||
|
||||
secret_description["KmsKeyId"].should.equal("dummy_arn")
|
||||
conn.list_secrets()["SecretList"][0]["KmsKeyId"].should.equal(
|
||||
secret_description["KmsKeyId"]
|
||||
)
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_describe_secret_that_does_not_exist():
|
||||
conn = boto3.client("secretsmanager", region_name="us-west-2")
|
||||
@ -643,7 +658,7 @@ def lambda_handler(event, context):
|
||||
arn = event['SecretId']
|
||||
token = event['ClientRequestToken']
|
||||
step = event['Step']
|
||||
|
||||
|
||||
client = boto3.client("secretsmanager", region_name="us-west-2", endpoint_url="http://motoserver:5000")
|
||||
metadata = client.describe_secret(SecretId=arn)
|
||||
value = client.get_secret_value(SecretId=arn, VersionId=token, VersionStage="AWSPENDING")
|
||||
@ -661,31 +676,31 @@ def lambda_handler(event, context):
|
||||
elif "AWSPENDING" not in versions[token]:
|
||||
print("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
|
||||
raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
|
||||
|
||||
|
||||
if step == 'createSecret':
|
||||
try:
|
||||
client.get_secret_value(SecretId=arn, VersionId=token, VersionStage='AWSPENDING')
|
||||
except client.exceptions.ResourceNotFoundException:
|
||||
client.put_secret_value(
|
||||
SecretId=arn,
|
||||
ClientRequestToken=token,
|
||||
SecretString=json.dumps({'create': True}),
|
||||
SecretId=arn,
|
||||
ClientRequestToken=token,
|
||||
SecretString=json.dumps({'create': True}),
|
||||
VersionStages=['AWSPENDING']
|
||||
)
|
||||
|
||||
if step == 'setSecret':
|
||||
client.put_secret_value(
|
||||
SecretId=arn,
|
||||
ClientRequestToken=token,
|
||||
SecretString='UpdatedValue',
|
||||
SecretId=arn,
|
||||
ClientRequestToken=token,
|
||||
SecretString='UpdatedValue',
|
||||
VersionStages=["AWSPENDING"],
|
||||
)
|
||||
|
||||
|
||||
elif step == 'finishSecret':
|
||||
current_version = next(
|
||||
version
|
||||
for version, stages in metadata['VersionIdsToStages'].items()
|
||||
if 'AWSCURRENT' in stages
|
||||
if 'AWSCURRENT' in stages
|
||||
)
|
||||
print("current: %s new: %s" % (current_version, token))
|
||||
client.update_secret_version_stage(
|
||||
@ -1000,6 +1015,40 @@ def test_update_secret_with_tags_and_description():
|
||||
assert secret_details["Description"] == "desc"
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_update_secret_with_KmsKeyId():
|
||||
conn = boto3.client("secretsmanager", region_name="us-west-2")
|
||||
|
||||
created_secret = conn.create_secret(
|
||||
Name="test-secret", SecretString="foosecret", KmsKeyId="foo_arn"
|
||||
)
|
||||
|
||||
assert created_secret["ARN"]
|
||||
assert created_secret["Name"] == "test-secret"
|
||||
assert created_secret["VersionId"] != ""
|
||||
|
||||
secret = conn.get_secret_value(SecretId="test-secret")
|
||||
assert secret["SecretString"] == "foosecret"
|
||||
|
||||
secret_details = conn.describe_secret(SecretId="test-secret")
|
||||
secret_details["KmsKeyId"].should.equal("foo_arn")
|
||||
|
||||
updated_secret = conn.update_secret(
|
||||
SecretId="test-secret", SecretString="barsecret", KmsKeyId="bar_arn"
|
||||
)
|
||||
|
||||
assert updated_secret["ARN"]
|
||||
assert updated_secret["Name"] == "test-secret"
|
||||
assert updated_secret["VersionId"] != ""
|
||||
|
||||
secret = conn.get_secret_value(SecretId="test-secret")
|
||||
assert secret["SecretString"] == "barsecret"
|
||||
assert created_secret["VersionId"] != updated_secret["VersionId"]
|
||||
|
||||
secret_details = conn.describe_secret(SecretId="test-secret")
|
||||
secret_details["KmsKeyId"].should.equal("bar_arn")
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_update_secret_which_does_not_exit():
|
||||
conn = boto3.client("secretsmanager", region_name="us-west-2")
|
||||
|
Loading…
Reference in New Issue
Block a user