3302 - Make Secret ARN persistent

This commit is contained in:
Bert Blommers 2020-09-28 14:49:14 +01:00
parent ce60e9e3b8
commit 369f6bbfc9
3 changed files with 154 additions and 163 deletions

View File

@ -7,21 +7,21 @@ def _matcher(pattern, str):
def name(secret, names): def name(secret, names):
for n in names: for n in names:
if _matcher(n, secret["name"]): if _matcher(n, secret.name):
return True return True
return False return False
def description(secret, descriptions): def description(secret, descriptions):
for d in descriptions: for d in descriptions:
if _matcher(d, secret["description"]): if _matcher(d, secret.description):
return True return True
return False return False
def tag_key(secret, tag_keys): def tag_key(secret, tag_keys):
for k in tag_keys: for k in tag_keys:
for tag in secret["tags"]: for tag in secret.tags:
if _matcher(k, tag["Key"]): if _matcher(k, tag["Key"]):
return True return True
return False return False
@ -29,7 +29,7 @@ def tag_key(secret, tag_keys):
def tag_value(secret, tag_values): def tag_value(secret, tag_values):
for v in tag_values: for v in tag_values:
for tag in secret["tags"]: for tag in secret.tags:
if _matcher(v, tag["Value"]): if _matcher(v, tag["Value"]):
return True return True
return False return False

View File

@ -50,6 +50,101 @@ class SecretsManager(BaseModel):
self.region = region_name self.region = region_name
class FakeSecret:
def __init__(
self,
region_name,
secret_id,
secret_string=None,
secret_binary=None,
description=None,
tags=[],
version_id=None,
version_stages=None,
):
self.secret_id = secret_id
self.name = secret_id
self.arn = secret_arn(region_name, secret_id)
self.secret_string = secret_string
self.secret_binary = secret_binary
self.description = description
self.tags = tags
self.version_id = version_id
self.version_stages = version_stages
self.rotation_enabled = False
self.rotation_lambda_arn = ""
self.auto_rotate_after_days = 0
self.deleted_date = None
def update(self, description=None, tags=[]):
self.description = description
self.tags = tags
def set_versions(self, versions):
self.versions = versions
def set_default_version_id(self, version_id):
self.default_version_id = version_id
def reset_default_version(self, secret_version, version_id):
# remove all old AWSPREVIOUS stages
for old_version in self.versions.values():
if "AWSPREVIOUS" in old_version["version_stages"]:
old_version["version_stages"].remove("AWSPREVIOUS")
# set old AWSCURRENT secret to AWSPREVIOUS
previous_current_version_id = self.default_version_id
self.versions[previous_current_version_id]["version_stages"] = ["AWSPREVIOUS"]
self.versions[version_id] = secret_version
self.default_version_id = version_id
def delete(self, deleted_date):
self.deleted_date = deleted_date
def restore(self):
self.deleted_date = None
def is_deleted(self):
return self.deleted_date is not None
def to_short_dict(self, include_version_stages=False):
dct = {
"ARN": self.arn,
"Name": self.name,
"VersionId": self.default_version_id,
}
if include_version_stages:
dct["VersionStages"] = self.version_stages
return json.dumps(dct)
def to_dict(self):
version_id_to_stages = self._form_version_ids_to_stages()
return {
"ARN": self.arn,
"Name": self.name,
"Description": self.description or "",
"KmsKeyId": "",
"RotationEnabled": self.rotation_enabled,
"RotationLambdaARN": self.rotation_lambda_arn,
"RotationRules": {"AutomaticallyAfterDays": self.auto_rotate_after_days},
"LastRotatedDate": None,
"LastChangedDate": None,
"LastAccessedDate": None,
"DeletedDate": self.deleted_date,
"Tags": self.tags,
"VersionIdsToStages": version_id_to_stages,
}
def _form_version_ids_to_stages(self):
version_id_to_stages = {}
for key, value in self.versions.items():
version_id_to_stages[key] = value["version_stages"]
return version_id_to_stages
class SecretsStore(dict): class SecretsStore(dict):
def __setitem__(self, key, value): def __setitem__(self, key, value):
new_key = get_secret_name_from_arn(key) new_key = get_secret_name_from_arn(key)
@ -92,7 +187,7 @@ class SecretsManagerBackend(BaseBackend):
if not version_id and version_stage: if not version_id and version_stage:
# set version_id to match version_stage # set version_id to match version_stage
versions_dict = self.secrets[secret_id]["versions"] versions_dict = self.secrets[secret_id].versions
for ver_id, ver_val in versions_dict.items(): for ver_id, ver_val in versions_dict.items():
if version_stage in ver_val["version_stages"]: if version_stage in ver_val["version_stages"]:
version_id = ver_id version_id = ver_id
@ -101,20 +196,20 @@ class SecretsManagerBackend(BaseBackend):
raise SecretNotFoundException() raise SecretNotFoundException()
# TODO check this part # TODO check this part
if "deleted_date" in self.secrets[secret_id]: if self.secrets[secret_id].is_deleted():
raise InvalidRequestException( raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the GetSecretValue operation: You tried to \ "An error occurred (InvalidRequestException) when calling the GetSecretValue operation: You tried to \
perform the operation on a secret that's currently marked deleted." perform the operation on a secret that's currently marked deleted."
) )
secret = self.secrets[secret_id] secret = self.secrets[secret_id]
version_id = version_id or secret["default_version_id"] version_id = version_id or secret.default_version_id
secret_version = secret["versions"][version_id] secret_version = secret.versions[version_id]
response_data = { response_data = {
"ARN": secret_arn(self.region, secret["secret_id"]), "ARN": secret.arn,
"Name": secret["name"], "Name": secret.name,
"VersionId": secret_version["version_id"], "VersionId": secret_version["version_id"],
"VersionStages": secret_version["version_stages"], "VersionStages": secret_version["version_stages"],
"CreatedDate": secret_version["createdate"], "CreatedDate": secret_version["createdate"],
@ -144,17 +239,17 @@ class SecretsManagerBackend(BaseBackend):
if secret_id not in self.secrets.keys(): if secret_id not in self.secrets.keys():
raise SecretNotFoundException() raise SecretNotFoundException()
if "deleted_date" in self.secrets[secret_id]: if self.secrets[secret_id].is_deleted():
raise InvalidRequestException( raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the UpdateSecret operation: " "An error occurred (InvalidRequestException) when calling the UpdateSecret operation: "
"You can't perform this operation on the secret because it was marked for deletion." "You can't perform this operation on the secret because it was marked for deletion."
) )
secret = self.secrets[secret_id] secret = self.secrets[secret_id]
tags = secret["tags"] tags = secret.tags
description = secret["description"] description = secret.description
version_id = self._add_secret( secret = self._add_secret(
secret_id, secret_id,
secret_string=secret_string, secret_string=secret_string,
secret_binary=secret_binary, secret_binary=secret_binary,
@ -162,15 +257,7 @@ class SecretsManagerBackend(BaseBackend):
tags=tags, tags=tags,
) )
response = json.dumps( return secret.to_short_dict()
{
"ARN": secret_arn(self.region, secret_id),
"Name": secret_id,
"VersionId": version_id,
}
)
return response
def create_secret( def create_secret(
self, self,
@ -188,7 +275,7 @@ class SecretsManagerBackend(BaseBackend):
"A resource with the ID you requested already exists." "A resource with the ID you requested already exists."
) )
version_id = self._add_secret( secret = self._add_secret(
name, name,
secret_string=secret_string, secret_string=secret_string,
secret_binary=secret_binary, secret_binary=secret_binary,
@ -196,15 +283,7 @@ class SecretsManagerBackend(BaseBackend):
tags=tags, tags=tags,
) )
response = json.dumps( return secret.to_short_dict()
{
"ARN": secret_arn(self.region, name),
"Name": name,
"VersionId": version_id,
}
)
return response
def _add_secret( def _add_secret(
self, self,
@ -228,7 +307,6 @@ class SecretsManagerBackend(BaseBackend):
"version_id": version_id, "version_id": version_id,
"version_stages": version_stages, "version_stages": version_stages,
} }
if secret_string is not None: if secret_string is not None:
secret_version["secret_string"] = secret_string secret_version["secret_string"] = secret_string
@ -236,49 +314,35 @@ class SecretsManagerBackend(BaseBackend):
secret_version["secret_binary"] = secret_binary secret_version["secret_binary"] = secret_binary
if secret_id in self.secrets: if secret_id in self.secrets:
# remove all old AWSPREVIOUS stages
for secret_verion_to_look_at in self.secrets[secret_id][
"versions"
].values():
if "AWSPREVIOUS" in secret_verion_to_look_at["version_stages"]:
secret_verion_to_look_at["version_stages"].remove("AWSPREVIOUS")
# set old AWSCURRENT secret to AWSPREVIOUS
previous_current_version_id = self.secrets[secret_id]["default_version_id"]
self.secrets[secret_id]["versions"][previous_current_version_id][
"version_stages"
] = ["AWSPREVIOUS"]
self.secrets[secret_id]["versions"][version_id] = secret_version
self.secrets[secret_id]["default_version_id"] = version_id
else:
self.secrets[secret_id] = {
"versions": {version_id: secret_version},
"default_version_id": version_id,
}
secret = self.secrets[secret_id] secret = self.secrets[secret_id]
secret["secret_id"] = secret_id secret.update(description, tags)
secret["name"] = secret_id secret.reset_default_version(secret_version, version_id)
secret["rotation_enabled"] = False else:
secret["rotation_lambda_arn"] = "" secret = FakeSecret(
secret["auto_rotate_after_days"] = 0 region_name=self.region,
secret["tags"] = tags secret_id=secret_id,
secret["description"] = description secret_string=secret_string,
secret_binary=secret_binary,
description=description,
tags=tags,
)
secret.set_versions({version_id: secret_version})
secret.set_default_version_id(version_id)
self.secrets[secret_id] = secret
return version_id return secret
def put_secret_value(self, secret_id, secret_string, secret_binary, version_stages): def put_secret_value(self, secret_id, secret_string, secret_binary, version_stages):
if secret_id in self.secrets.keys(): if secret_id in self.secrets.keys():
secret = self.secrets[secret_id] secret = self.secrets[secret_id]
tags = secret["tags"] tags = secret.tags
description = secret["description"] description = secret.description
else: else:
tags = [] tags = []
description = "" description = ""
version_id = self._add_secret( secret = self._add_secret(
secret_id, secret_id,
secret_string, secret_string,
secret_binary, secret_binary,
@ -287,45 +351,15 @@ class SecretsManagerBackend(BaseBackend):
version_stages=version_stages, version_stages=version_stages,
) )
response = json.dumps( return secret.to_short_dict(include_version_stages=True)
{
"ARN": secret_arn(self.region, secret_id),
"Name": secret_id,
"VersionId": version_id,
"VersionStages": version_stages,
}
)
return response
def describe_secret(self, secret_id): def describe_secret(self, secret_id):
if not self._is_valid_identifier(secret_id): if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException() raise SecretNotFoundException()
secret = self.secrets[secret_id] secret = self.secrets[secret_id]
version_id_to_stages = self.form_version_ids_to_stages(secret["versions"])
response = json.dumps( return json.dumps(secret.to_dict())
{
"ARN": secret_arn(self.region, secret["secret_id"]),
"Name": secret["name"],
"Description": secret.get("description", ""),
"KmsKeyId": "",
"RotationEnabled": secret["rotation_enabled"],
"RotationLambdaARN": secret["rotation_lambda_arn"],
"RotationRules": {
"AutomaticallyAfterDays": secret["auto_rotate_after_days"]
},
"LastRotatedDate": None,
"LastChangedDate": None,
"LastAccessedDate": None,
"DeletedDate": secret.get("deleted_date", None),
"Tags": secret["tags"],
"VersionIdsToStages": version_id_to_stages,
}
)
return response
def rotate_secret( def rotate_secret(
self, self,
@ -340,7 +374,7 @@ class SecretsManagerBackend(BaseBackend):
if not self._is_valid_identifier(secret_id): if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException() raise SecretNotFoundException()
if "deleted_date" in self.secrets[secret_id]: if self.secrets[secret_id].is_deleted():
raise InvalidRequestException( raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \ "An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \
perform the operation on a secret that's currently marked deleted." perform the operation on a secret that's currently marked deleted."
@ -368,36 +402,28 @@ class SecretsManagerBackend(BaseBackend):
secret = self.secrets[secret_id] secret = self.secrets[secret_id]
old_secret_version = secret["versions"][secret["default_version_id"]] old_secret_version = secret.versions[secret.default_version_id]
new_version_id = client_request_token or str(uuid.uuid4()) new_version_id = client_request_token or str(uuid.uuid4())
self._add_secret( self._add_secret(
secret_id, secret_id,
old_secret_version["secret_string"], old_secret_version["secret_string"],
secret["description"], secret.description,
secret["tags"], secret.tags,
version_id=new_version_id, version_id=new_version_id,
version_stages=["AWSCURRENT"], version_stages=["AWSCURRENT"],
) )
secret["rotation_lambda_arn"] = rotation_lambda_arn or "" secret.rotation_lambda_arn = rotation_lambda_arn or ""
if rotation_rules: if rotation_rules:
secret["auto_rotate_after_days"] = rotation_rules.get(rotation_days, 0) secret.auto_rotate_after_days = rotation_rules.get(rotation_days, 0)
if secret["auto_rotate_after_days"] > 0: if secret.auto_rotate_after_days > 0:
secret["rotation_enabled"] = True secret.rotation_enabled = True
if "AWSCURRENT" in old_secret_version["version_stages"]: if "AWSCURRENT" in old_secret_version["version_stages"]:
old_secret_version["version_stages"].remove("AWSCURRENT") old_secret_version["version_stages"].remove("AWSCURRENT")
response = json.dumps( return secret.to_short_dict()
{
"ARN": secret_arn(self.region, secret["secret_id"]),
"Name": secret["name"],
"VersionId": new_version_id,
}
)
return response
def get_random_password( def get_random_password(
self, self,
@ -446,7 +472,7 @@ class SecretsManagerBackend(BaseBackend):
secret = self.secrets[secret_id] secret = self.secrets[secret_id]
version_list = [] version_list = []
for version_id, version in secret["versions"].items(): for version_id, version in secret.versions.items():
version_list.append( version_list.append(
{ {
"CreatedDate": int(time.time()), "CreatedDate": int(time.time()),
@ -458,8 +484,8 @@ class SecretsManagerBackend(BaseBackend):
response = json.dumps( response = json.dumps(
{ {
"ARN": secret["secret_id"], "ARN": secret.secret_id,
"Name": secret["name"], "Name": secret.name,
"NextToken": "", "NextToken": "",
"Versions": version_list, "Versions": version_list,
} }
@ -473,29 +499,7 @@ class SecretsManagerBackend(BaseBackend):
secret_list = [] secret_list = []
for secret in self.secrets.values(): for secret in self.secrets.values():
if _matches(secret, filters): if _matches(secret, filters):
versions_to_stages = {} secret_list.append(secret.to_dict())
for version_id, version in secret["versions"].items():
versions_to_stages[version_id] = version["version_stages"]
secret_list.append(
{
"ARN": secret_arn(self.region, secret["secret_id"]),
"DeletedDate": secret.get("deleted_date", None),
"Description": secret.get("description", ""),
"KmsKeyId": "",
"LastAccessedDate": None,
"LastChangedDate": None,
"LastRotatedDate": None,
"Name": secret["name"],
"RotationEnabled": secret["rotation_enabled"],
"RotationLambdaARN": secret["rotation_lambda_arn"],
"RotationRules": {
"AutomaticallyAfterDays": secret["auto_rotate_after_days"]
},
"SecretVersionsToStages": versions_to_stages,
"Tags": secret["tags"],
}
)
return secret_list, None return secret_list, None
@ -506,7 +510,7 @@ class SecretsManagerBackend(BaseBackend):
if not self._is_valid_identifier(secret_id): if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException() raise SecretNotFoundException()
if "deleted_date" in self.secrets[secret_id]: if self.secrets[secret_id].is_deleted():
raise InvalidRequestException( raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \ "An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \
perform the operation on a secret that's currently marked deleted." perform the operation on a secret that's currently marked deleted."
@ -532,16 +536,14 @@ class SecretsManagerBackend(BaseBackend):
secret = self.secrets.pop(secret_id, None) secret = self.secrets.pop(secret_id, None)
else: else:
deletion_date += datetime.timedelta(days=recovery_window_in_days or 30) deletion_date += datetime.timedelta(days=recovery_window_in_days or 30)
self.secrets[secret_id]["deleted_date"] = self._unix_time_secs( self.secrets[secret_id].delete(self._unix_time_secs(deletion_date))
deletion_date
)
secret = self.secrets.get(secret_id, None) secret = self.secrets.get(secret_id, None)
if not secret: if not secret:
raise SecretNotFoundException() raise SecretNotFoundException()
arn = secret_arn(self.region, secret["secret_id"]) arn = secret.arn
name = secret["name"] name = secret.name
return arn, name, self._unix_time_secs(deletion_date) return arn, name, self._unix_time_secs(deletion_date)
@ -550,14 +552,10 @@ class SecretsManagerBackend(BaseBackend):
if not self._is_valid_identifier(secret_id): if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException() raise SecretNotFoundException()
self.secrets[secret_id].pop("deleted_date", None)
secret = self.secrets[secret_id] secret = self.secrets[secret_id]
secret.restore()
arn = secret_arn(self.region, secret["secret_id"]) return secret.arn, secret.name
name = secret["name"]
return arn, name
@staticmethod @staticmethod
def get_resource_policy(secret_id): def get_resource_policy(secret_id):
@ -583,14 +581,6 @@ class SecretsManagerBackend(BaseBackend):
} }
) )
@staticmethod
def form_version_ids_to_stages(secret):
version_id_to_stages = {}
for key, value in secret.items():
version_id_to_stages[key] = value["version_stages"]
return version_id_to_stages
secretsmanager_backends = {} secretsmanager_backends = {}
for region in Session().get_available_regions("secretsmanager"): for region in Session().get_available_regions("secretsmanager"):

View File

@ -439,8 +439,9 @@ def test_describe_secret_with_arn():
secret_description = conn.describe_secret(SecretId=results["ARN"]) secret_description = conn.describe_secret(SecretId=results["ARN"])
assert secret_description # Returned dict is not empty assert secret_description # Returned dict is not empty
assert secret_description["Name"] == ("test-secret") secret_description["Name"].should.equal("test-secret")
assert secret_description["ARN"] != results["ARN"] secret_description["ARN"].should.equal(results["ARN"])
conn.list_secrets()["SecretList"][0]["ARN"].should.equal(results["ARN"])
@mock_secretsmanager @mock_secretsmanager