From f54f4a666fba7367440b57fb95f7c1a0e0b7fba0 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sat, 22 Apr 2023 15:39:48 +0000 Subject: [PATCH] Techdebt: MyPy SecretsManager (#6244) --- moto/secretsmanager/exceptions.py | 18 +- moto/secretsmanager/list_secrets/filters.py | 24 +- moto/secretsmanager/models.py | 277 +++++++++++--------- moto/secretsmanager/responses.py | 54 ++-- moto/secretsmanager/utils.py | 26 +- setup.cfg | 2 +- tests/test_secretsmanager/test_server.py | 5 +- 7 files changed, 219 insertions(+), 187 deletions(-) diff --git a/moto/secretsmanager/exceptions.py b/moto/secretsmanager/exceptions.py index c28b542fc..9abd0775c 100644 --- a/moto/secretsmanager/exceptions.py +++ b/moto/secretsmanager/exceptions.py @@ -6,13 +6,13 @@ class SecretsManagerClientError(JsonRESTError): class ResourceNotFoundException(SecretsManagerClientError): - def __init__(self, message): + def __init__(self, message: str): self.code = 404 super().__init__("ResourceNotFoundException", message) class SecretNotFoundException(SecretsManagerClientError): - def __init__(self): + def __init__(self) -> None: self.code = 404 super().__init__( "ResourceNotFoundException", @@ -21,7 +21,7 @@ class SecretNotFoundException(SecretsManagerClientError): class SecretHasNoValueException(SecretsManagerClientError): - def __init__(self, version_stage): + def __init__(self, version_stage: str): self.code = 404 super().__init__( "ResourceNotFoundException", @@ -31,7 +31,7 @@ class SecretHasNoValueException(SecretsManagerClientError): class SecretStageVersionMismatchException(SecretsManagerClientError): - def __init__(self): + def __init__(self) -> None: self.code = 404 super().__init__( "InvalidRequestException", @@ -40,25 +40,25 @@ class SecretStageVersionMismatchException(SecretsManagerClientError): class ClientError(SecretsManagerClientError): - def __init__(self, message): + def __init__(self, message: str): super().__init__("InvalidParameterValue", message) class InvalidParameterException(SecretsManagerClientError): - def __init__(self, message): + def __init__(self, message: str): super().__init__("InvalidParameterException", message) class ResourceExistsException(SecretsManagerClientError): - def __init__(self, message): + def __init__(self, message: str): super().__init__("ResourceExistsException", message) class InvalidRequestException(SecretsManagerClientError): - def __init__(self, message): + def __init__(self, message: str): super().__init__("InvalidRequestException", message) class ValidationException(SecretsManagerClientError): - def __init__(self, message): + def __init__(self, message: str): super().__init__("ValidationException", message) diff --git a/moto/secretsmanager/list_secrets/filters.py b/moto/secretsmanager/list_secrets/filters.py index 57d442562..62e1d4669 100644 --- a/moto/secretsmanager/list_secrets/filters.py +++ b/moto/secretsmanager/list_secrets/filters.py @@ -1,30 +1,36 @@ -def name_filter(secret, names): +from typing import List, TYPE_CHECKING + +if TYPE_CHECKING: + from ..models import FakeSecret + + +def name_filter(secret: "FakeSecret", names: List[str]) -> bool: return _matcher(names, [secret.name]) -def description_filter(secret, descriptions): - return _matcher(descriptions, [secret.description]) +def description_filter(secret: "FakeSecret", descriptions: List[str]) -> bool: + return _matcher(descriptions, [secret.description]) # type: ignore -def tag_key(secret, tag_keys): +def tag_key(secret: "FakeSecret", tag_keys: List[str]) -> bool: return _matcher(tag_keys, [tag["Key"] for tag in secret.tags]) -def tag_value(secret, tag_values): +def tag_value(secret: "FakeSecret", tag_values: List[str]) -> bool: return _matcher(tag_values, [tag["Value"] for tag in secret.tags]) -def filter_all(secret, values): +def filter_all(secret: "FakeSecret", values: List[str]) -> bool: attributes = ( [secret.name, secret.description] + [tag["Key"] for tag in secret.tags] + [tag["Value"] for tag in secret.tags] ) - return _matcher(values, attributes) + return _matcher(values, attributes) # type: ignore -def _matcher(patterns, strings): +def _matcher(patterns: List[str], strings: List[str]) -> bool: for pattern in [p for p in patterns if p.startswith("!")]: for string in strings: if _match_pattern(pattern[1:], string): @@ -37,7 +43,7 @@ def _matcher(patterns, strings): return False -def _match_pattern(pattern, value): +def _match_pattern(pattern: str, value: str) -> bool: for word in pattern.split(" "): if word not in value: return False diff --git a/moto/secretsmanager/models.py b/moto/secretsmanager/models.py index 0fde0faaf..bb492c4ab 100644 --- a/moto/secretsmanager/models.py +++ b/moto/secretsmanager/models.py @@ -2,7 +2,7 @@ import time import json import datetime -from typing import List, Tuple +from typing import Any, Dict, List, Tuple, Optional from moto.core import BaseBackend, BackendDict, BaseModel from moto.moto_api._internal import mock_random @@ -35,41 +35,41 @@ _filter_functions = { } -def filter_keys(): +def filter_keys() -> List[str]: return list(_filter_functions.keys()) -def _matches(secret, filters): +def _matches(secret: "FakeSecret", filters: List[Dict[str, Any]]) -> bool: is_match = True for f in filters: # Filter names are pre-validated in the resource layer filter_function = _filter_functions.get(f["Key"]) - is_match = is_match and filter_function(secret, f["Values"]) + is_match = is_match and filter_function(secret, f["Values"]) # type: ignore return is_match class SecretsManager(BaseModel): - def __init__(self, region_name): + def __init__(self, region_name: str): self.region = region_name class FakeSecret: def __init__( self, - account_id, - region_name, - secret_id, - secret_string=None, - secret_binary=None, - description=None, - tags=None, - kms_key_id=None, - version_id=None, - version_stages=None, - last_changed_date=None, - created_date=None, + account_id: str, + region_name: str, + secret_id: str, + secret_string: Optional[str] = None, + secret_binary: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[List[Dict[str, str]]] = None, + kms_key_id: Optional[str] = None, + version_id: Optional[str] = None, + version_stages: Optional[List[str]] = None, + last_changed_date: Optional[int] = None, + created_date: Optional[int] = None, ): self.secret_id = secret_id self.name = secret_id @@ -86,12 +86,16 @@ class FakeSecret: self.rotation_enabled = False self.rotation_lambda_arn = "" self.auto_rotate_after_days = 0 - self.deleted_date = None - self.policy = None + self.deleted_date: Optional[float] = None + self.policy: Optional[str] = None def update( - self, description=None, tags=None, kms_key_id=None, last_changed_date=None - ): + self, + description: Optional[str] = None, + tags: Optional[List[Dict[str, str]]] = None, + kms_key_id: Optional[str] = None, + last_changed_date: Optional[int] = None, + ) -> None: self.description = description self.tags = tags or [] if last_changed_date is not None: @@ -100,13 +104,15 @@ class FakeSecret: if kms_key_id is not None: self.kms_key_id = kms_key_id - def set_versions(self, versions): + def set_versions(self, versions: Dict[str, Dict[str, Any]]) -> None: self.versions = versions - def set_default_version_id(self, version_id): + def set_default_version_id(self, version_id: str) -> None: self.default_version_id = version_id - def reset_default_version(self, secret_version, version_id): + def reset_default_version( + self, secret_version: Dict[str, Any], version_id: str + ) -> None: # remove all old AWSPREVIOUS stages for old_version in self.versions.values(): if "AWSPREVIOUS" in old_version["version_stages"]: @@ -119,22 +125,26 @@ class FakeSecret: self.versions[version_id] = secret_version self.default_version_id = version_id - def remove_version_stages_from_old_versions(self, version_stages): + def remove_version_stages_from_old_versions( + self, version_stages: List[str] + ) -> None: for version_stage in version_stages: for old_version in self.versions.values(): if version_stage in old_version["version_stages"]: old_version["version_stages"].remove(version_stage) - def delete(self, deleted_date): + def delete(self, deleted_date: float) -> None: self.deleted_date = deleted_date - def restore(self): + def restore(self) -> None: self.deleted_date = None - def is_deleted(self): + def is_deleted(self) -> bool: return self.deleted_date is not None - def to_short_dict(self, include_version_stages=False, version_id=None): + def to_short_dict( + self, include_version_stages: bool = False, version_id: Optional[str] = None + ) -> str: if not version_id: version_id = self.default_version_id dct = { @@ -146,7 +156,7 @@ class FakeSecret: dct["VersionStages"] = self.versions[version_id]["version_stages"] return json.dumps(dct) - def to_dict(self): + def to_dict(self) -> Dict[str, Any]: version_id_to_stages = self._form_version_ids_to_stages() return { @@ -167,7 +177,7 @@ class FakeSecret: "CreatedDate": self.created_date, } - def _form_version_ids_to_stages(self): + def _form_version_ids_to_stages(self) -> Dict[str, str]: version_id_to_stages = {} for key, value in self.versions.items(): version_id_to_stages[key] = value["version_stages"] @@ -175,86 +185,87 @@ class FakeSecret: return version_id_to_stages -class SecretsStore(dict): +class SecretsStore(Dict[str, FakeSecret]): # Parameters to this dictionary can be three possible values: # names, full ARNs, and partial ARNs # Every retrieval method should check which type of input it receives - def __setitem__(self, key, value): + def __setitem__(self, key: str, value: FakeSecret) -> None: super().__setitem__(key, value) - def __getitem__(self, key): + def __getitem__(self, key: str) -> FakeSecret: for secret in dict.values(self): if secret.arn == key or secret.name == key: return secret name = get_secret_name_from_partial_arn(key) return super().__getitem__(name) - def __contains__(self, key): + def __contains__(self, key: str) -> bool: # type: ignore for secret in dict.values(self): if secret.arn == key or secret.name == key: return True name = get_secret_name_from_partial_arn(key) - return dict.__contains__(self, name) + return dict.__contains__(self, name) # type: ignore - def get(self, key, *args, **kwargs): + def get(self, key: str) -> Optional[FakeSecret]: # type: ignore for secret in dict.values(self): if secret.arn == key or secret.name == key: return secret name = get_secret_name_from_partial_arn(key) - return super().get(name, *args, **kwargs) + return super().get(name) - def pop(self, key, *args, **kwargs): + def pop(self, key: str) -> Optional[FakeSecret]: # type: ignore for secret in dict.values(self): if secret.arn == key or secret.name == key: key = secret.name name = get_secret_name_from_partial_arn(key) - return super().pop(name, *args, **kwargs) + return super().pop(name, None) class SecretsManagerBackend(BaseBackend): - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.secrets = SecretsStore() @staticmethod - def default_vpc_endpoint_service(service_region, zones): + def default_vpc_endpoint_service( + service_region: str, zones: List[str] + ) -> List[Dict[str, str]]: """Default VPC endpoint services.""" return BaseBackend.default_vpc_endpoint_service_factory( service_region, zones, "secretsmanager" ) - def _is_valid_identifier(self, identifier): + def _is_valid_identifier(self, identifier: str) -> bool: return identifier in self.secrets - def _unix_time_secs(self, dt): + def _unix_time_secs(self, dt: datetime.datetime) -> float: epoch = datetime.datetime.utcfromtimestamp(0) return (dt - epoch).total_seconds() - def _client_request_token_validator(self, client_request_token): + def _client_request_token_validator(self, client_request_token: str) -> None: token_length = len(client_request_token) if token_length < 32 or token_length > 64: msg = "ClientRequestToken must be 32-64 characters long." raise InvalidParameterException(msg) - def _from_client_request_token(self, client_request_token): - version_id = client_request_token - if version_id: - self._client_request_token_validator(version_id) + def _from_client_request_token(self, client_request_token: Optional[str]) -> str: + if client_request_token: + self._client_request_token_validator(client_request_token) + return client_request_token else: - version_id = str(mock_random.uuid4()) - return version_id + return str(mock_random.uuid4()) - def cancel_rotate_secret(self, secret_id: str): + def cancel_rotate_secret(self, secret_id: str) -> str: if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() - if self.secrets[secret_id].is_deleted(): + secret = self.secrets[secret_id] + if secret.is_deleted(): raise InvalidRequestException( "You tried to perform the operation on a secret that's currently marked deleted." ) - secret = self.secrets.get(key=secret_id) if not secret.rotation_lambda_arn: # This response doesn't make much sense for `CancelRotateSecret`, but this is what AWS has documented ... # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_CancelRotateSecret.html @@ -268,7 +279,9 @@ class SecretsManagerBackend(BaseBackend): secret.rotation_enabled = False return secret.to_short_dict() - def get_secret_value(self, secret_id, version_id, version_stage): + def get_secret_value( + self, secret_id: str, version_id: str, version_stage: str + ) -> Dict[str, Any]: if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() @@ -331,12 +344,12 @@ class SecretsManagerBackend(BaseBackend): def update_secret( self, - secret_id, - secret_string=None, - secret_binary=None, - client_request_token=None, - kms_key_id=None, - ): + secret_id: str, + secret_string: Optional[str] = None, + secret_binary: Optional[str] = None, + client_request_token: Optional[str] = None, + kms_key_id: Optional[str] = None, + ) -> str: # error if secret does not exist if secret_id not in self.secrets: @@ -366,14 +379,14 @@ class SecretsManagerBackend(BaseBackend): def create_secret( self, - name, - secret_string=None, - secret_binary=None, - description=None, - tags=None, - kms_key_id=None, - client_request_token=None, - ): + name: str, + secret_string: Optional[str] = None, + secret_binary: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[List[Dict[str, str]]] = None, + kms_key_id: Optional[str] = None, + client_request_token: Optional[str] = None, + ) -> str: # error if secret exists if name in self.secrets.keys(): @@ -395,15 +408,15 @@ class SecretsManagerBackend(BaseBackend): def _add_secret( self, - secret_id, - secret_string=None, - secret_binary=None, - description=None, - tags=None, - kms_key_id=None, - version_id=None, - version_stages=None, - ): + secret_id: str, + secret_string: Optional[str] = None, + secret_binary: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[List[Dict[str, str]]] = None, + kms_key_id: Optional[str] = None, + version_id: Optional[str] = None, + version_stages: Optional[List[str]] = None, + ) -> FakeSecret: if version_stages is None: version_stages = ["AWSCURRENT"] @@ -453,12 +466,12 @@ class SecretsManagerBackend(BaseBackend): def put_secret_value( self, - secret_id, - secret_string, - secret_binary, - client_request_token, - version_stages, - ): + secret_id: str, + secret_string: str, + secret_binary: str, + client_request_token: str, + version_stages: List[str], + ) -> str: if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() @@ -481,7 +494,7 @@ class SecretsManagerBackend(BaseBackend): return secret.to_short_dict(include_version_stages=True, version_id=version_id) - def describe_secret(self, secret_id): + def describe_secret(self, secret_id: str) -> Dict[str, Any]: if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() @@ -491,11 +504,11 @@ class SecretsManagerBackend(BaseBackend): def rotate_secret( self, - secret_id, - client_request_token=None, - rotation_lambda_arn=None, - rotation_rules=None, - ): + secret_id: str, + client_request_token: Optional[str] = None, + rotation_lambda_arn: Optional[str] = None, + rotation_rules: Optional[Dict[str, Any]] = None, + ) -> str: rotation_days = "AutomaticallyAfterDays" @@ -584,8 +597,8 @@ class SecretsManagerBackend(BaseBackend): lambda_backend = lambda_backends[self.account_id][self.region_name] - request_headers = {} - response_headers = {} + request_headers: Dict[str, Any] = {} + response_headers: Dict[str, Any] = {} try: func = lambda_backend.get_function(secret.rotation_lambda_arn) @@ -617,15 +630,15 @@ class SecretsManagerBackend(BaseBackend): def get_random_password( self, - password_length, - exclude_characters, - exclude_numbers, - exclude_punctuation, - exclude_uppercase, - exclude_lowercase, - include_space, - require_each_included_type, - ): + password_length: int, + exclude_characters: str, + exclude_numbers: bool, + exclude_punctuation: bool, + exclude_uppercase: bool, + exclude_lowercase: bool, + include_space: bool, + require_each_included_type: bool, + ) -> str: # password size must have value less than or equal to 4096 if password_length > 4096: raise ClientError( @@ -639,7 +652,7 @@ class SecretsManagerBackend(BaseBackend): when calling the GetRandomPassword operation: Password length is too short based on the required types." ) - response = json.dumps( + return json.dumps( { "RandomPassword": random_password( password_length, @@ -654,9 +667,7 @@ class SecretsManagerBackend(BaseBackend): } ) - return response - - def list_secret_version_ids(self, secret_id): + def list_secret_version_ids(self, secret_id: str) -> str: secret = self.secrets[secret_id] version_list = [] @@ -670,7 +681,7 @@ class SecretsManagerBackend(BaseBackend): } ) - response = json.dumps( + return json.dumps( { "ARN": secret.secret_id, "Name": secret.name, @@ -679,11 +690,12 @@ class SecretsManagerBackend(BaseBackend): } ) - return response - def list_secrets( - self, filters: List, max_results: int = 100, next_token: str = None - ) -> Tuple[List, str]: + self, + filters: List[Dict[str, Any]], + max_results: int = 100, + next_token: Optional[str] = None, + ) -> Tuple[List[Dict[str, Any]], Optional[str]]: secret_list = [] for secret in self.secrets.values(): if _matches(secret, filters): @@ -697,8 +709,11 @@ class SecretsManagerBackend(BaseBackend): return secret_page, new_next_token def delete_secret( - self, secret_id, recovery_window_in_days, force_delete_without_recovery - ): + self, + secret_id: str, + recovery_window_in_days: int, + force_delete_without_recovery: bool, + ) -> Tuple[str, str, float]: if recovery_window_in_days and ( recovery_window_in_days < 7 or recovery_window_in_days > 30 @@ -718,9 +733,11 @@ class SecretsManagerBackend(BaseBackend): if not force_delete_without_recovery: raise SecretNotFoundException() else: - secret = FakeSecret(self.account_id, self.region_name, secret_id) - arn = secret.arn - name = secret.name + unknown_secret = FakeSecret( + self.account_id, self.region_name, secret_id + ) + arn = unknown_secret.arn + name = unknown_secret.name deletion_date = datetime.datetime.utcnow() return arn, name, self._unix_time_secs(deletion_date) else: @@ -733,11 +750,11 @@ class SecretsManagerBackend(BaseBackend): deletion_date = datetime.datetime.utcnow() if force_delete_without_recovery: - secret = self.secrets.pop(secret_id, None) + secret = self.secrets.pop(secret_id) else: deletion_date += datetime.timedelta(days=recovery_window_in_days or 30) self.secrets[secret_id].delete(self._unix_time_secs(deletion_date)) - secret = self.secrets.get(secret_id, None) + secret = self.secrets.get(secret_id) if not secret: raise SecretNotFoundException() @@ -747,7 +764,7 @@ class SecretsManagerBackend(BaseBackend): return arn, name, self._unix_time_secs(deletion_date) - def restore_secret(self, secret_id): + def restore_secret(self, secret_id: str) -> Tuple[str, str]: if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() @@ -757,7 +774,7 @@ class SecretsManagerBackend(BaseBackend): return secret.arn, secret.name - def tag_resource(self, secret_id, tags): + def tag_resource(self, secret_id: str, tags: List[Dict[str, str]]) -> None: if secret_id not in self.secrets: raise SecretNotFoundException() @@ -778,9 +795,7 @@ class SecretsManagerBackend(BaseBackend): old_tags.remove(existing_key_name) old_tags.append(tag) - return secret_id - - def untag_resource(self, secret_id, tag_keys): + def untag_resource(self, secret_id: str, tag_keys: List[str]) -> None: if secret_id not in self.secrets: raise SecretNotFoundException() @@ -792,11 +807,13 @@ class SecretsManagerBackend(BaseBackend): if tag["Key"] in tag_keys: tags.remove(tag) - return secret_id - def update_secret_version_stage( - self, secret_id, version_stage, remove_from_version_id, move_to_version_id - ): + self, + secret_id: str, + version_stage: str, + remove_from_version_id: str, + move_to_version_id: str, + ) -> Tuple[str, str]: if secret_id not in self.secrets: raise SecretNotFoundException() @@ -839,9 +856,9 @@ class SecretsManagerBackend(BaseBackend): if "AWSPREVIOUS" in stages: stages.remove("AWSPREVIOUS") - return secret_id + return secret.arn, secret.name - def put_resource_policy(self, secret_id: str, policy: str): + def put_resource_policy(self, secret_id: str, policy: str) -> Tuple[str, str]: """ The BlockPublicPolicy-parameter is not yet implemented """ @@ -852,7 +869,7 @@ class SecretsManagerBackend(BaseBackend): secret.policy = policy return secret.arn, secret.name - def get_resource_policy(self, secret_id): + def get_resource_policy(self, secret_id: str) -> str: if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() @@ -865,7 +882,7 @@ class SecretsManagerBackend(BaseBackend): resp["ResourcePolicy"] = secret.policy return json.dumps(resp) - def delete_resource_policy(self, secret_id): + def delete_resource_policy(self, secret_id: str) -> Tuple[str, str]: if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() diff --git a/moto/secretsmanager/responses.py b/moto/secretsmanager/responses.py index 917579c41..3fa1e3983 100644 --- a/moto/secretsmanager/responses.py +++ b/moto/secretsmanager/responses.py @@ -1,3 +1,4 @@ +from typing import Any, Dict, List from moto.core.responses import BaseResponse from moto.secretsmanager.exceptions import ( InvalidRequestException, @@ -5,12 +6,12 @@ from moto.secretsmanager.exceptions import ( ValidationException, ) -from .models import secretsmanager_backends, filter_keys +from .models import secretsmanager_backends, filter_keys, SecretsManagerBackend import json -def _validate_filters(filters): +def _validate_filters(filters: List[Dict[str, Any]]) -> None: for idx, f in enumerate(filters): filter_key = f.get("Key", None) filter_values = f.get("Values", None) @@ -28,18 +29,18 @@ def _validate_filters(filters): class SecretsManagerResponse(BaseResponse): - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="secretsmanager") @property - def backend(self): + def backend(self) -> SecretsManagerBackend: return secretsmanager_backends[self.current_account][self.region] - def cancel_rotate_secret(self): + def cancel_rotate_secret(self) -> str: secret_id = self._get_param("SecretId") return self.backend.cancel_rotate_secret(secret_id=secret_id) - def get_secret_value(self): + def get_secret_value(self) -> str: secret_id = self._get_param("SecretId") version_id = self._get_param("VersionId") version_stage = self._get_param("VersionStage") @@ -48,7 +49,7 @@ class SecretsManagerResponse(BaseResponse): ) return json.dumps(value) - def create_secret(self): + def create_secret(self) -> str: name = self._get_param("Name") secret_string = self._get_param("SecretString") secret_binary = self._get_param("SecretBinary") @@ -66,7 +67,7 @@ class SecretsManagerResponse(BaseResponse): client_request_token=client_request_token, ) - def update_secret(self): + def update_secret(self) -> str: secret_id = self._get_param("SecretId") secret_string = self._get_param("SecretString") secret_binary = self._get_param("SecretBinary") @@ -80,7 +81,7 @@ class SecretsManagerResponse(BaseResponse): kms_key_id=kms_key_id, ) - def get_random_password(self): + def get_random_password(self) -> str: password_length = self._get_param("PasswordLength", if_none=32) exclude_characters = self._get_param("ExcludeCharacters", if_none="") exclude_numbers = self._get_param("ExcludeNumbers", if_none=False) @@ -102,12 +103,12 @@ class SecretsManagerResponse(BaseResponse): require_each_included_type=require_each_included_type, ) - def describe_secret(self): + def describe_secret(self) -> str: secret_id = self._get_param("SecretId") secret = self.backend.describe_secret(secret_id=secret_id) return json.dumps(secret) - def rotate_secret(self): + def rotate_secret(self) -> str: client_request_token = self._get_param("ClientRequestToken") rotation_lambda_arn = self._get_param("RotationLambdaARN") rotation_rules = self._get_param("RotationRules") @@ -119,7 +120,7 @@ class SecretsManagerResponse(BaseResponse): rotation_rules=rotation_rules, ) - def put_secret_value(self): + def put_secret_value(self) -> str: secret_id = self._get_param("SecretId", if_none="") secret_string = self._get_param("SecretString") secret_binary = self._get_param("SecretBinary") @@ -140,11 +141,11 @@ class SecretsManagerResponse(BaseResponse): client_request_token=client_request_token, ) - def list_secret_version_ids(self): + def list_secret_version_ids(self) -> str: secret_id = self._get_param("SecretId", if_none="") return self.backend.list_secret_version_ids(secret_id=secret_id) - def list_secrets(self): + def list_secrets(self) -> str: filters = self._get_param("Filters", if_none=[]) _validate_filters(filters) max_results = self._get_int_param("MaxResults") @@ -154,7 +155,7 @@ class SecretsManagerResponse(BaseResponse): ) return json.dumps(dict(SecretList=secret_list, NextToken=next_token)) - def delete_secret(self): + def delete_secret(self) -> str: secret_id = self._get_param("SecretId") recovery_window_in_days = self._get_param("RecoveryWindowInDays") force_delete_without_recovery = self._get_param("ForceDeleteWithoutRecovery") @@ -165,44 +166,47 @@ class SecretsManagerResponse(BaseResponse): ) return json.dumps(dict(ARN=arn, Name=name, DeletionDate=deletion_date)) - def restore_secret(self): + def restore_secret(self) -> str: secret_id = self._get_param("SecretId") arn, name = self.backend.restore_secret(secret_id=secret_id) return json.dumps(dict(ARN=arn, Name=name)) - def get_resource_policy(self): + def get_resource_policy(self) -> str: secret_id = self._get_param("SecretId") return self.backend.get_resource_policy(secret_id=secret_id) - def put_resource_policy(self): + def put_resource_policy(self) -> str: secret_id = self._get_param("SecretId") policy = self._get_param("ResourcePolicy") arn, name = self.backend.put_resource_policy(secret_id, policy) return json.dumps(dict(ARN=arn, Name=name)) - def delete_resource_policy(self): + def delete_resource_policy(self) -> str: secret_id = self._get_param("SecretId") arn, name = self.backend.delete_resource_policy(secret_id) return json.dumps(dict(ARN=arn, Name=name)) - def tag_resource(self): + def tag_resource(self) -> str: secret_id = self._get_param("SecretId") tags = self._get_param("Tags", if_none=[]) - return self.backend.tag_resource(secret_id, tags) + self.backend.tag_resource(secret_id, tags) + return "{}" - def untag_resource(self): + def untag_resource(self) -> str: secret_id = self._get_param("SecretId") tag_keys = self._get_param("TagKeys", if_none=[]) - return self.backend.untag_resource(secret_id=secret_id, tag_keys=tag_keys) + self.backend.untag_resource(secret_id=secret_id, tag_keys=tag_keys) + return "{}" - def update_secret_version_stage(self): + def update_secret_version_stage(self) -> str: secret_id = self._get_param("SecretId") version_stage = self._get_param("VersionStage") remove_from_version_id = self._get_param("RemoveFromVersionId") move_to_version_id = self._get_param("MoveToVersionId") - return self.backend.update_secret_version_stage( + arn, name = self.backend.update_secret_version_stage( secret_id=secret_id, version_stage=version_stage, remove_from_version_id=remove_from_version_id, move_to_version_id=move_to_version_id, ) + return json.dumps({"ARN": arn, "Name": name}) diff --git a/moto/secretsmanager/utils.py b/moto/secretsmanager/utils.py index 215237110..c2052d3ab 100644 --- a/moto/secretsmanager/utils.py +++ b/moto/secretsmanager/utils.py @@ -4,15 +4,15 @@ from moto.moto_api._internal import mock_random as random def random_password( - password_length, - exclude_characters, - exclude_numbers, - exclude_punctuation, - exclude_uppercase, - exclude_lowercase, - include_space, - require_each_included_type, -): + password_length: int, + exclude_characters: str, + exclude_numbers: bool, + exclude_punctuation: bool, + exclude_uppercase: bool, + exclude_lowercase: bool, + include_space: bool, + require_each_included_type: bool, +) -> str: password = "" required_characters = "" @@ -61,7 +61,7 @@ def random_password( return password -def secret_arn(account_id, region, secret_id): +def secret_arn(account_id: str, region: str, secret_id: str) -> str: id_string = "".join(random.choice(string.ascii_letters) for _ in range(6)) return ( f"arn:aws:secretsmanager:{region}:{account_id}:secret:{secret_id}-{id_string}" @@ -84,7 +84,7 @@ def get_secret_name_from_partial_arn(partial_arn: str) -> str: return partial_arn -def _exclude_characters(password, exclude_characters): +def _exclude_characters(password: str, exclude_characters: str) -> str: for c in exclude_characters: if c in string.punctuation: # Escape punctuation regex usage @@ -93,7 +93,9 @@ def _exclude_characters(password, exclude_characters): return password -def _add_password_require_each_included_type(password, required_characters): +def _add_password_require_each_included_type( + password: str, required_characters: str +) -> str: password_with_required_char = password[: -len(required_characters)] password_with_required_char += required_characters diff --git a/setup.cfg b/setup.cfg index 50374d3aa..c3ef08ec2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -239,7 +239,7 @@ disable = W,C,R,E enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import [mypy] -files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/f*,moto/g*,moto/i*,moto/k*,moto/l*,moto/m*,moto/n*,moto/o*,moto/p*,moto/q*,moto/r*,moto/s3*,moto/sagemaker,moto/scheduler +files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/f*,moto/g*,moto/i*,moto/k*,moto/l*,moto/m*,moto/n*,moto/o*,moto/p*,moto/q*,moto/r*,moto/s3*,moto/sagemaker,moto/secretsmanager,moto/scheduler show_column_numbers=True show_error_codes = True disable_error_code=abstract diff --git a/tests/test_secretsmanager/test_server.py b/tests/test_secretsmanager/test_server.py index bf7136615..e8953339c 100644 --- a/tests/test_secretsmanager/test_server.py +++ b/tests/test_secretsmanager/test_server.py @@ -791,7 +791,7 @@ def test_update_secret_version_stage(pass_arn): assert stages[initial_version] == ["AWSCURRENT"] assert stages[new_version] == [custom_stage] - test_client.post( + resp = test_client.post( "/", data={ "SecretId": secret_id, @@ -801,6 +801,9 @@ def test_update_secret_version_stage(pass_arn): }, headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, ) + resp = json.loads(resp.data.decode("utf-8")) + assert resp.get("ARN") == create_secret["ARN"] + assert resp.get("Name") == DEFAULT_SECRET_NAME describe_secret = test_client.post( "/",