From 369f6bbfc94782e656a418241e981bd5390ce14c Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Mon, 28 Sep 2020 14:49:14 +0100 Subject: [PATCH] 3302 - Make Secret ARN persistent --- moto/secretsmanager/list_secrets/filters.py | 8 +- moto/secretsmanager/models.py | 304 +++++++++--------- .../test_secretsmanager.py | 5 +- 3 files changed, 154 insertions(+), 163 deletions(-) diff --git a/moto/secretsmanager/list_secrets/filters.py b/moto/secretsmanager/list_secrets/filters.py index 813b1f544..c888ebe64 100644 --- a/moto/secretsmanager/list_secrets/filters.py +++ b/moto/secretsmanager/list_secrets/filters.py @@ -7,21 +7,21 @@ def _matcher(pattern, str): def name(secret, names): for n in names: - if _matcher(n, secret["name"]): + if _matcher(n, secret.name): return True return False def description(secret, descriptions): for d in descriptions: - if _matcher(d, secret["description"]): + if _matcher(d, secret.description): return True return False def tag_key(secret, tag_keys): for k in tag_keys: - for tag in secret["tags"]: + for tag in secret.tags: if _matcher(k, tag["Key"]): return True return False @@ -29,7 +29,7 @@ def tag_key(secret, tag_keys): def tag_value(secret, tag_values): for v in tag_values: - for tag in secret["tags"]: + for tag in secret.tags: if _matcher(v, tag["Value"]): return True return False diff --git a/moto/secretsmanager/models.py b/moto/secretsmanager/models.py index 0339dc575..41b70bc1f 100644 --- a/moto/secretsmanager/models.py +++ b/moto/secretsmanager/models.py @@ -50,6 +50,101 @@ class SecretsManager(BaseModel): self.region = region_name +class FakeSecret: + def __init__( + self, + region_name, + secret_id, + secret_string=None, + secret_binary=None, + description=None, + tags=[], + version_id=None, + version_stages=None, + ): + self.secret_id = secret_id + self.name = secret_id + self.arn = secret_arn(region_name, secret_id) + self.secret_string = secret_string + self.secret_binary = secret_binary + self.description = description + self.tags = tags + self.version_id = version_id + self.version_stages = version_stages + self.rotation_enabled = False + self.rotation_lambda_arn = "" + self.auto_rotate_after_days = 0 + self.deleted_date = None + + def update(self, description=None, tags=[]): + self.description = description + self.tags = tags + + def set_versions(self, versions): + self.versions = versions + + def set_default_version_id(self, version_id): + self.default_version_id = version_id + + def reset_default_version(self, secret_version, version_id): + # remove all old AWSPREVIOUS stages + for old_version in self.versions.values(): + if "AWSPREVIOUS" in old_version["version_stages"]: + old_version["version_stages"].remove("AWSPREVIOUS") + + # set old AWSCURRENT secret to AWSPREVIOUS + previous_current_version_id = self.default_version_id + self.versions[previous_current_version_id]["version_stages"] = ["AWSPREVIOUS"] + + self.versions[version_id] = secret_version + self.default_version_id = version_id + + def delete(self, deleted_date): + self.deleted_date = deleted_date + + def restore(self): + self.deleted_date = None + + def is_deleted(self): + return self.deleted_date is not None + + def to_short_dict(self, include_version_stages=False): + dct = { + "ARN": self.arn, + "Name": self.name, + "VersionId": self.default_version_id, + } + if include_version_stages: + dct["VersionStages"] = self.version_stages + return json.dumps(dct) + + def to_dict(self): + version_id_to_stages = self._form_version_ids_to_stages() + + return { + "ARN": self.arn, + "Name": self.name, + "Description": self.description or "", + "KmsKeyId": "", + "RotationEnabled": self.rotation_enabled, + "RotationLambdaARN": self.rotation_lambda_arn, + "RotationRules": {"AutomaticallyAfterDays": self.auto_rotate_after_days}, + "LastRotatedDate": None, + "LastChangedDate": None, + "LastAccessedDate": None, + "DeletedDate": self.deleted_date, + "Tags": self.tags, + "VersionIdsToStages": version_id_to_stages, + } + + def _form_version_ids_to_stages(self): + version_id_to_stages = {} + for key, value in self.versions.items(): + version_id_to_stages[key] = value["version_stages"] + + return version_id_to_stages + + class SecretsStore(dict): def __setitem__(self, key, value): new_key = get_secret_name_from_arn(key) @@ -92,7 +187,7 @@ class SecretsManagerBackend(BaseBackend): if not version_id and version_stage: # set version_id to match version_stage - versions_dict = self.secrets[secret_id]["versions"] + versions_dict = self.secrets[secret_id].versions for ver_id, ver_val in versions_dict.items(): if version_stage in ver_val["version_stages"]: version_id = ver_id @@ -101,20 +196,20 @@ class SecretsManagerBackend(BaseBackend): raise SecretNotFoundException() # TODO check this part - if "deleted_date" in self.secrets[secret_id]: + if self.secrets[secret_id].is_deleted(): raise InvalidRequestException( "An error occurred (InvalidRequestException) when calling the GetSecretValue operation: You tried to \ perform the operation on a secret that's currently marked deleted." ) secret = self.secrets[secret_id] - version_id = version_id or secret["default_version_id"] + version_id = version_id or secret.default_version_id - secret_version = secret["versions"][version_id] + secret_version = secret.versions[version_id] response_data = { - "ARN": secret_arn(self.region, secret["secret_id"]), - "Name": secret["name"], + "ARN": secret.arn, + "Name": secret.name, "VersionId": secret_version["version_id"], "VersionStages": secret_version["version_stages"], "CreatedDate": secret_version["createdate"], @@ -144,17 +239,17 @@ class SecretsManagerBackend(BaseBackend): if secret_id not in self.secrets.keys(): raise SecretNotFoundException() - if "deleted_date" in self.secrets[secret_id]: + if self.secrets[secret_id].is_deleted(): raise InvalidRequestException( "An error occurred (InvalidRequestException) when calling the UpdateSecret operation: " "You can't perform this operation on the secret because it was marked for deletion." ) secret = self.secrets[secret_id] - tags = secret["tags"] - description = secret["description"] + tags = secret.tags + description = secret.description - version_id = self._add_secret( + secret = self._add_secret( secret_id, secret_string=secret_string, secret_binary=secret_binary, @@ -162,15 +257,7 @@ class SecretsManagerBackend(BaseBackend): tags=tags, ) - response = json.dumps( - { - "ARN": secret_arn(self.region, secret_id), - "Name": secret_id, - "VersionId": version_id, - } - ) - - return response + return secret.to_short_dict() def create_secret( self, @@ -188,7 +275,7 @@ class SecretsManagerBackend(BaseBackend): "A resource with the ID you requested already exists." ) - version_id = self._add_secret( + secret = self._add_secret( name, secret_string=secret_string, secret_binary=secret_binary, @@ -196,15 +283,7 @@ class SecretsManagerBackend(BaseBackend): tags=tags, ) - response = json.dumps( - { - "ARN": secret_arn(self.region, name), - "Name": name, - "VersionId": version_id, - } - ) - - return response + return secret.to_short_dict() def _add_secret( self, @@ -228,7 +307,6 @@ class SecretsManagerBackend(BaseBackend): "version_id": version_id, "version_stages": version_stages, } - if secret_string is not None: secret_version["secret_string"] = secret_string @@ -236,49 +314,35 @@ class SecretsManagerBackend(BaseBackend): secret_version["secret_binary"] = secret_binary if secret_id in self.secrets: - # remove all old AWSPREVIOUS stages - for secret_verion_to_look_at in self.secrets[secret_id][ - "versions" - ].values(): - if "AWSPREVIOUS" in secret_verion_to_look_at["version_stages"]: - secret_verion_to_look_at["version_stages"].remove("AWSPREVIOUS") - - # set old AWSCURRENT secret to AWSPREVIOUS - previous_current_version_id = self.secrets[secret_id]["default_version_id"] - self.secrets[secret_id]["versions"][previous_current_version_id][ - "version_stages" - ] = ["AWSPREVIOUS"] - - self.secrets[secret_id]["versions"][version_id] = secret_version - self.secrets[secret_id]["default_version_id"] = version_id + secret = self.secrets[secret_id] + secret.update(description, tags) + secret.reset_default_version(secret_version, version_id) else: - self.secrets[secret_id] = { - "versions": {version_id: secret_version}, - "default_version_id": version_id, - } + secret = FakeSecret( + region_name=self.region, + secret_id=secret_id, + secret_string=secret_string, + secret_binary=secret_binary, + description=description, + tags=tags, + ) + secret.set_versions({version_id: secret_version}) + secret.set_default_version_id(version_id) + self.secrets[secret_id] = secret - secret = self.secrets[secret_id] - secret["secret_id"] = secret_id - secret["name"] = secret_id - secret["rotation_enabled"] = False - secret["rotation_lambda_arn"] = "" - secret["auto_rotate_after_days"] = 0 - secret["tags"] = tags - secret["description"] = description - - return version_id + return secret def put_secret_value(self, secret_id, secret_string, secret_binary, version_stages): if secret_id in self.secrets.keys(): secret = self.secrets[secret_id] - tags = secret["tags"] - description = secret["description"] + tags = secret.tags + description = secret.description else: tags = [] description = "" - version_id = self._add_secret( + secret = self._add_secret( secret_id, secret_string, secret_binary, @@ -287,45 +351,15 @@ class SecretsManagerBackend(BaseBackend): version_stages=version_stages, ) - response = json.dumps( - { - "ARN": secret_arn(self.region, secret_id), - "Name": secret_id, - "VersionId": version_id, - "VersionStages": version_stages, - } - ) - - return response + return secret.to_short_dict(include_version_stages=True) def describe_secret(self, secret_id): if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() secret = self.secrets[secret_id] - version_id_to_stages = self.form_version_ids_to_stages(secret["versions"]) - response = json.dumps( - { - "ARN": secret_arn(self.region, secret["secret_id"]), - "Name": secret["name"], - "Description": secret.get("description", ""), - "KmsKeyId": "", - "RotationEnabled": secret["rotation_enabled"], - "RotationLambdaARN": secret["rotation_lambda_arn"], - "RotationRules": { - "AutomaticallyAfterDays": secret["auto_rotate_after_days"] - }, - "LastRotatedDate": None, - "LastChangedDate": None, - "LastAccessedDate": None, - "DeletedDate": secret.get("deleted_date", None), - "Tags": secret["tags"], - "VersionIdsToStages": version_id_to_stages, - } - ) - - return response + return json.dumps(secret.to_dict()) def rotate_secret( self, @@ -340,7 +374,7 @@ class SecretsManagerBackend(BaseBackend): if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() - if "deleted_date" in self.secrets[secret_id]: + if self.secrets[secret_id].is_deleted(): raise InvalidRequestException( "An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \ perform the operation on a secret that's currently marked deleted." @@ -368,36 +402,28 @@ class SecretsManagerBackend(BaseBackend): secret = self.secrets[secret_id] - old_secret_version = secret["versions"][secret["default_version_id"]] + old_secret_version = secret.versions[secret.default_version_id] new_version_id = client_request_token or str(uuid.uuid4()) self._add_secret( secret_id, old_secret_version["secret_string"], - secret["description"], - secret["tags"], + secret.description, + secret.tags, version_id=new_version_id, version_stages=["AWSCURRENT"], ) - secret["rotation_lambda_arn"] = rotation_lambda_arn or "" + secret.rotation_lambda_arn = rotation_lambda_arn or "" if rotation_rules: - secret["auto_rotate_after_days"] = rotation_rules.get(rotation_days, 0) - if secret["auto_rotate_after_days"] > 0: - secret["rotation_enabled"] = True + secret.auto_rotate_after_days = rotation_rules.get(rotation_days, 0) + if secret.auto_rotate_after_days > 0: + secret.rotation_enabled = True if "AWSCURRENT" in old_secret_version["version_stages"]: old_secret_version["version_stages"].remove("AWSCURRENT") - response = json.dumps( - { - "ARN": secret_arn(self.region, secret["secret_id"]), - "Name": secret["name"], - "VersionId": new_version_id, - } - ) - - return response + return secret.to_short_dict() def get_random_password( self, @@ -446,7 +472,7 @@ class SecretsManagerBackend(BaseBackend): secret = self.secrets[secret_id] version_list = [] - for version_id, version in secret["versions"].items(): + for version_id, version in secret.versions.items(): version_list.append( { "CreatedDate": int(time.time()), @@ -458,8 +484,8 @@ class SecretsManagerBackend(BaseBackend): response = json.dumps( { - "ARN": secret["secret_id"], - "Name": secret["name"], + "ARN": secret.secret_id, + "Name": secret.name, "NextToken": "", "Versions": version_list, } @@ -473,29 +499,7 @@ class SecretsManagerBackend(BaseBackend): secret_list = [] for secret in self.secrets.values(): if _matches(secret, filters): - versions_to_stages = {} - for version_id, version in secret["versions"].items(): - versions_to_stages[version_id] = version["version_stages"] - - secret_list.append( - { - "ARN": secret_arn(self.region, secret["secret_id"]), - "DeletedDate": secret.get("deleted_date", None), - "Description": secret.get("description", ""), - "KmsKeyId": "", - "LastAccessedDate": None, - "LastChangedDate": None, - "LastRotatedDate": None, - "Name": secret["name"], - "RotationEnabled": secret["rotation_enabled"], - "RotationLambdaARN": secret["rotation_lambda_arn"], - "RotationRules": { - "AutomaticallyAfterDays": secret["auto_rotate_after_days"] - }, - "SecretVersionsToStages": versions_to_stages, - "Tags": secret["tags"], - } - ) + secret_list.append(secret.to_dict()) return secret_list, None @@ -506,7 +510,7 @@ class SecretsManagerBackend(BaseBackend): if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() - if "deleted_date" in self.secrets[secret_id]: + if self.secrets[secret_id].is_deleted(): raise InvalidRequestException( "An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \ perform the operation on a secret that's currently marked deleted." @@ -532,16 +536,14 @@ class SecretsManagerBackend(BaseBackend): secret = self.secrets.pop(secret_id, None) else: deletion_date += datetime.timedelta(days=recovery_window_in_days or 30) - self.secrets[secret_id]["deleted_date"] = self._unix_time_secs( - deletion_date - ) + self.secrets[secret_id].delete(self._unix_time_secs(deletion_date)) secret = self.secrets.get(secret_id, None) if not secret: raise SecretNotFoundException() - arn = secret_arn(self.region, secret["secret_id"]) - name = secret["name"] + arn = secret.arn + name = secret.name return arn, name, self._unix_time_secs(deletion_date) @@ -550,14 +552,10 @@ class SecretsManagerBackend(BaseBackend): if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() - self.secrets[secret_id].pop("deleted_date", None) - secret = self.secrets[secret_id] + secret.restore() - arn = secret_arn(self.region, secret["secret_id"]) - name = secret["name"] - - return arn, name + return secret.arn, secret.name @staticmethod def get_resource_policy(secret_id): @@ -583,14 +581,6 @@ class SecretsManagerBackend(BaseBackend): } ) - @staticmethod - def form_version_ids_to_stages(secret): - version_id_to_stages = {} - for key, value in secret.items(): - version_id_to_stages[key] = value["version_stages"] - - return version_id_to_stages - secretsmanager_backends = {} for region in Session().get_available_regions("secretsmanager"): diff --git a/tests/test_secretsmanager/test_secretsmanager.py b/tests/test_secretsmanager/test_secretsmanager.py index 69e055bb2..94e745659 100644 --- a/tests/test_secretsmanager/test_secretsmanager.py +++ b/tests/test_secretsmanager/test_secretsmanager.py @@ -439,8 +439,9 @@ def test_describe_secret_with_arn(): secret_description = conn.describe_secret(SecretId=results["ARN"]) assert secret_description # Returned dict is not empty - assert secret_description["Name"] == ("test-secret") - assert secret_description["ARN"] != results["ARN"] + secret_description["Name"].should.equal("test-secret") + secret_description["ARN"].should.equal(results["ARN"]) + conn.list_secrets()["SecretList"][0]["ARN"].should.equal(results["ARN"]) @mock_secretsmanager