From 23d9430590dc982880f740c869a1c3fe55cd41a9 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Tue, 21 Feb 2023 10:16:44 -0100 Subject: [PATCH] Techdebt: MyPy EFS (#5954) --- moto/ec2/models/subnets.py | 2 + moto/efs/exceptions.py | 42 ++--- moto/efs/models.py | 308 ++++++++++++++++++++++--------------- moto/efs/responses.py | 52 ++++--- setup.cfg | 2 +- 5 files changed, 229 insertions(+), 177 deletions(-) diff --git a/moto/ec2/models/subnets.py b/moto/ec2/models/subnets.py index 96de49faa..0835e8cc8 100644 --- a/moto/ec2/models/subnets.py +++ b/moto/ec2/models/subnets.py @@ -197,6 +197,8 @@ class Subnet(TaggedEC2Resource, CloudFormationModel): return new_ip + # EFS calls this method as request_ip(str, MountTarget) + # So technically it's not just Instances that are stored def request_ip(self, ip: str, instance: "Instance") -> None: if ipaddress.ip_address(ip) not in self.cidr: raise Exception(f"IP does not fall in the subnet CIDR of {self.cidr}") diff --git a/moto/efs/exceptions.py b/moto/efs/exceptions.py index f46bd9350..92b8829cc 100644 --- a/moto/efs/exceptions.py +++ b/moto/efs/exceptions.py @@ -8,7 +8,7 @@ class EFSError(JsonRESTError): class AccessPointNotFound(EFSError): code = 404 - def __init__(self, access_point_id): + def __init__(self, access_point_id: str): super().__init__( "AccessPointNotFound", f"Access Point {access_point_id} does not exist." ) @@ -17,93 +17,83 @@ class AccessPointNotFound(EFSError): class FileSystemAlreadyExists(EFSError): code = 409 - def __init__(self, creation_token, *args, **kwargs): + def __init__(self, creation_token: str): super().__init__( "FileSystemAlreadyExists", f"File system with {creation_token} already exists.", - *args, - **kwargs, ) class FileSystemNotFound(EFSError): code = 404 - def __init__(self, file_system_id, *args, **kwargs): + def __init__(self, file_system_id: str): super().__init__( "FileSystemNotFound", f"File system {file_system_id} does not exist.", - *args, - **kwargs, ) class FileSystemInUse(EFSError): code = 409 - def __init__(self, msg, *args, **kwargs): - super().__init__("FileSystemInUse", msg, *args, **kwargs) + def __init__(self, msg: str): + super().__init__("FileSystemInUse", msg) class MountTargetConflict(EFSError): code = 409 - def __init__(self, msg, *args, **kwargs): - super().__init__("MountTargetConflict", msg, *args, **kwargs) + def __init__(self, msg: str): + super().__init__("MountTargetConflict", msg) class MountTargetNotFound(EFSError): code = 404 - def __init__(self, mount_target_id, *args, **kwargs): + def __init__(self, mount_target_id: str): super().__init__( "MountTargetNotFound", f"Mount target '{mount_target_id}' does not exist.", - *args, - **kwargs, ) class BadRequest(EFSError): code = 400 - def __init__(self, msg, *args, **kwargs): - super().__init__("BadRequest", msg, *args, **kwargs) + def __init__(self, msg: str): + super().__init__("BadRequest", msg) class PolicyNotFound(EFSError): code = 404 - def __init__(self, *args, **kwargs): - super().__init__("PolicyNotFound", *args, **kwargs) + def __init__(self, msg: str): + super().__init__("PolicyNotFound", msg) class SubnetNotFound(EFSError): code = 404 - def __init__(self, subnet_id, *args, **kwargs): + def __init__(self, subnet_id: str): super().__init__( "SubnetNotFound", f"The subnet ID '{subnet_id}' does not exist", - *args, - **kwargs, ) class SecurityGroupNotFound(EFSError): code = 404 - def __init__(self, security_group_id, *args, **kwargs): + def __init__(self, security_group_id: str): super().__init__( "SecurityGroupNotFound", f"The SecurityGroup ID '{security_group_id}' does not exist", - *args, - **kwargs, ) class SecurityGroupLimitExceeded(EFSError): code = 400 - def __init__(self, msg, *args, **kwargs): - super().__init__("SecurityGroupLimitExceeded", msg, *args, **kwargs) + def __init__(self, msg: str): + super().__init__("SecurityGroupLimitExceeded", msg) diff --git a/moto/efs/models.py b/moto/efs/models.py index 6fee71b66..90bae3e1e 100644 --- a/moto/efs/models.py +++ b/moto/efs/models.py @@ -7,10 +7,13 @@ https://docs.aws.amazon.com/efs/latest/ug/whatisefs.html import json import time from copy import deepcopy +from typing import Any, Dict, List, Optional, Tuple, Set, Iterator, Union from moto.core import BaseBackend, BackendDict, BaseModel, CloudFormationModel from moto.core.utils import camelcase_to_underscores, underscores_to_camelcase from moto.ec2 import ec2_backends +from moto.ec2.models.elastic_network_interfaces import NetworkInterface +from moto.ec2.models.subnets import Subnet from moto.ec2.exceptions import InvalidSubnetIdError from moto.efs.exceptions import ( AccessPointNotFound, @@ -30,25 +33,26 @@ from moto.utilities.tagging_service import TaggingService from moto.utilities.utils import md5_hash -def _lookup_az_id(account_id, az_name): +def _lookup_az_id(account_id: str, az_name: str) -> Optional[str]: """Find the Availability zone ID given the AZ name.""" ec2 = ec2_backends[account_id][az_name[:-1]] for zone in ec2.describe_availability_zones(): if zone.name == az_name: return zone.zone_id + return None class AccessPoint(BaseModel): def __init__( self, - account_id, - region_name, - client_token, - file_system_id, - name, - posix_user, - root_directory, - context, + account_id: str, + region_name: str, + client_token: str, + file_system_id: str, + name: Optional[str], + posix_user: Dict[str, Any], + root_directory: Dict[str, str], + context: "EFSBackend", ): self.access_point_id = mock_random.get_random_hex(8) self.access_point_arn = f"arn:aws:elasticfilesystem:{region_name}:{account_id}:access-point/fsap-{self.access_point_id}" @@ -64,7 +68,7 @@ class AccessPoint(BaseModel): self.root_directory = root_directory self.context = context - def info_json(self): + def info_json(self) -> Dict[str, Any]: tags = self.context.list_tags_for_resource(self.access_point_id) return { "ClientToken": self.client_token, @@ -85,20 +89,18 @@ class FileSystem(CloudFormationModel): def __init__( self, - account_id, - region_name, - creation_token, - file_system_id, - context, - performance_mode, - encrypted, - kms_key_id, - throughput_mode, - provisioned_throughput_in_mibps, - availability_zone_name, - backup, - lifecycle_policies=None, - file_system_policy=None, + account_id: str, + region_name: str, + creation_token: str, + file_system_id: str, + context: "EFSBackend", + performance_mode: str, + encrypted: bool, + kms_key_id: str, + throughput_mode: str, + provisioned_throughput_in_mibps: int, + availability_zone_name: str, + backup: bool, ): if availability_zone_name: backup = True @@ -119,8 +121,8 @@ class FileSystem(CloudFormationModel): account_id, self.availability_zone_name ) self._backup = backup - self.lifecycle_policies = lifecycle_policies or [] - self.file_system_policy = file_system_policy + self.lifecycle_policies: List[Dict[str, str]] = [] + self.file_system_policy: Optional[str] = None self._context = context @@ -132,11 +134,11 @@ class FileSystem(CloudFormationModel): # Initialize some state parameters self.life_cycle_state = "available" - self._mount_targets = {} + self._mount_targets: Dict[str, MountTarget] = {} self._size_value = 0 @property - def size_in_bytes(self): + def size_in_bytes(self) -> Dict[str, Any]: # type: ignore[misc] return { "Value": self._size_value, "ValueInIA": 0, @@ -145,21 +147,21 @@ class FileSystem(CloudFormationModel): } @property - def physical_resource_id(self): + def physical_resource_id(self) -> str: return self.file_system_id @property - def number_of_mount_targets(self): + def number_of_mount_targets(self) -> int: return len(self._mount_targets) @property - def backup_policy(self): + def backup_policy(self) -> Optional[Dict[str, str]]: if self._backup: return {"Status": "ENABLED"} else: - return + return None - def info_json(self): + def info_json(self) -> Dict[str, Any]: ret = { underscores_to_camelcase(k.capitalize()): v for k, v in self.__dict__.items() @@ -180,7 +182,7 @@ class FileSystem(CloudFormationModel): ) return ret - def add_mount_target(self, subnet, mount_target): + def add_mount_target(self, subnet: Subnet, mount_target: "MountTarget") -> None: # Check that the mount target doesn't violate constraints. for other_mount_target in self._mount_targets.values(): if other_mount_target.subnet_vpc_id != subnet.vpc_id: @@ -193,28 +195,33 @@ class FileSystem(CloudFormationModel): self._mount_targets[subnet.availability_zone] = mount_target - def has_mount_target(self, subnet): + def has_mount_target(self, subnet: Subnet) -> bool: return subnet.availability_zone in self._mount_targets - def iter_mount_targets(self): + def iter_mount_targets(self) -> Iterator["MountTarget"]: for mt in self._mount_targets.values(): yield mt - def remove_mount_target(self, subnet): + def remove_mount_target(self, subnet: Subnet) -> None: del self._mount_targets[subnet.availability_zone] @staticmethod - def cloudformation_name_type(): - return + def cloudformation_name_type() -> str: + return "" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: return "AWS::EFS::FileSystem" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "FileSystem": # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-efs-filesystem.html props = deepcopy(cloudformation_json["Properties"]) props = {camelcase_to_underscores(k): v for k, v in props.items()} @@ -238,29 +245,40 @@ class FileSystem(CloudFormationModel): ) @classmethod - def update_from_cloudformation_json( + def update_from_cloudformation_json( # type: ignore[misc] cls, - original_resource, - new_resource_name, - cloudformation_json, - account_id, - region_name, - ): + original_resource: Any, + new_resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + ) -> None: raise NotImplementedError( "Update of EFS File System via cloudformation is not yet implemented." ) @classmethod - def delete_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name - ): - return efs_backends[account_id][region_name].delete_file_system(resource_name) + def delete_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + ) -> None: + efs_backends[account_id][region_name].delete_file_system(resource_name) class MountTarget(CloudFormationModel): """A model for an EFS Mount Target.""" - def __init__(self, account_id, file_system, subnet, ip_address, security_groups): + def __init__( + self, + account_id: str, + file_system: FileSystem, + subnet: Subnet, + ip_address: Optional[str], + security_groups: Optional[List[str]], + ): # Set the simple given parameters. self.file_system_id = file_system.file_system_id self._file_system = file_system @@ -278,10 +296,10 @@ class MountTarget(CloudFormationModel): # Get an IP address if needed, otherwise validate the one we're given. if ip_address is None: - ip_address = subnet.get_available_subnet_ip(self) + ip_address = subnet.get_available_subnet_ip(self) # type: ignore[arg-type] else: try: - subnet.request_ip(ip_address, self) + subnet.request_ip(ip_address, self) # type: ignore[arg-type] except Exception as e: if "IP" in str(e) and "CIDR" in str(e): raise BadRequest( @@ -295,68 +313,76 @@ class MountTarget(CloudFormationModel): self.owner_id = account_id self.mount_target_id = f"fsmt-{mock_random.get_random_hex()}" self.life_cycle_state = "available" - self.network_interface_id = None + self.network_interface_id: Optional[str] = None self.availability_zone_id = subnet.availability_zone_id self.availability_zone_name = subnet.availability_zone - def clean_up(self): + def clean_up(self) -> None: self._file_system.remove_mount_target(self._subnet) self._subnet.del_subnet_ip(self.ip_address) - def set_network_interface(self, network_interface): + def set_network_interface(self, network_interface: NetworkInterface) -> None: self.network_interface_id = network_interface.id - def info_json(self): - ret = { + def info_json(self) -> Dict[str, Any]: + return { underscores_to_camelcase(k.capitalize()): v for k, v in self.__dict__.items() if not k.startswith("_") } - return ret @property - def physical_resource_id(self): - return self.mounted_target_id + def physical_resource_id(self) -> str: + return self.mount_target_id @property - def subnet_vpc_id(self): + def subnet_vpc_id(self) -> str: return self._subnet.vpc_id @staticmethod - def cloudformation_name_type(): - pass + def cloudformation_name_type() -> str: + return "" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: return "AWS::EFS::MountTarget" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "MountTarget": # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-efs-mounttarget.html props = deepcopy(cloudformation_json["Properties"]) props = {camelcase_to_underscores(k): v for k, v in props.items()} return efs_backends[account_id][region_name].create_mount_target(**props) @classmethod - def update_from_cloudformation_json( + def update_from_cloudformation_json( # type: ignore[misc] cls, - original_resource, - new_resource_name, - cloudformation_json, - account_id, - region_name, - ): + original_resource: Any, + new_resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + ) -> None: raise NotImplementedError( "Updates of EFS Mount Target via cloudformation are not yet implemented." ) @classmethod - def delete_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name - ): - return efs_backends[account_id][region_name].delete_mount_target(resource_name) + def delete_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + ) -> None: + efs_backends[account_id][region_name].delete_mount_target(resource_name) class EFSBackend(BaseBackend): @@ -367,16 +393,18 @@ class EFSBackend(BaseBackend): such resources should always go through this class. """ - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.creation_tokens = set() - self.access_points = dict() - self.file_systems_by_id = {} - self.mount_targets_by_id = {} - self.next_markers = {} + self.creation_tokens: Set[str] = set() + self.access_points: Dict[str, AccessPoint] = dict() + self.file_systems_by_id: Dict[str, FileSystem] = {} + self.mount_targets_by_id: Dict[str, MountTarget] = {} + self.next_markers: Dict[str, Union[List[MountTarget], List[FileSystem]]] = {} self.tagging_service = TaggingService() - def _mark_description(self, corpus, max_items): + def _mark_description( + self, corpus: Union[List[MountTarget], List[FileSystem]], max_items: int + ) -> Optional[str]: if max_items < len(corpus): new_corpus = corpus[max_items:] new_corpus_dict = [c.info_json() for c in new_corpus] @@ -388,21 +416,21 @@ class EFSBackend(BaseBackend): return next_marker @property - def ec2_backend(self): + def ec2_backend(self) -> Any: # type: ignore[misc] return ec2_backends[self.account_id][self.region_name] def create_file_system( self, - creation_token, - performance_mode, - encrypted, - kms_key_id, - throughput_mode, - provisioned_throughput_in_mibps, - availability_zone_name, - backup, - tags, - ): + creation_token: str, + performance_mode: str, + encrypted: bool, + kms_key_id: str, + throughput_mode: str, + provisioned_throughput_in_mibps: int, + availability_zone_name: str, + backup: bool, + tags: List[Dict[str, str]], + ) -> FileSystem: """Create a new EFS File System Volume. https://docs.aws.amazon.com/efs/latest/ug/API_CreateFileSystem.html @@ -413,7 +441,7 @@ class EFSBackend(BaseBackend): raise FileSystemAlreadyExists(creation_token) # Create a new file system ID: - def make_id(): + def make_id() -> str: return f"fs-{mock_random.get_random_hex()}" fsid = make_id() @@ -438,13 +466,17 @@ class EFSBackend(BaseBackend): return self.file_systems_by_id[fsid] def describe_file_systems( - self, marker=None, max_items=10, creation_token=None, file_system_id=None - ): + self, + marker: Optional[str] = None, + max_items: int = 10, + creation_token: Optional[str] = None, + file_system_id: Optional[str] = None, + ) -> Tuple[Optional[str], List[FileSystem]]: """Describe all the EFS File Systems, or specific File Systems. https://docs.aws.amazon.com/efs/latest/ug/API_DescribeFileSystems.html """ - # Restrict the possible corpus of resules based on inputs. + # Restrict the possible corpus of results based on inputs. if creation_token and file_system_id: raise BadRequest( "Request cannot contain both a file system ID and a creation token." @@ -464,7 +496,7 @@ class EFSBackend(BaseBackend): # Handle the case that a marker is given. if marker not in self.next_markers: raise BadRequest("Invalid Marker") - corpus = self.next_markers[marker] + corpus = self.next_markers[marker] # type: ignore[assignment] else: # Handle the vanilla case. corpus = [fs for fs in self.file_systems_by_id.values()] @@ -475,8 +507,12 @@ class EFSBackend(BaseBackend): return next_marker, file_systems def create_mount_target( - self, file_system_id, subnet_id, ip_address=None, security_groups=None - ): + self, + file_system_id: str, + subnet_id: str, + ip_address: Optional[str] = None, + security_groups: Optional[List[str]] = None, + ) -> MountTarget: """Create a new EFS Mount Target for a given File System to a given subnet. Note that you can only create one mount target for each availability zone @@ -516,8 +552,13 @@ class EFSBackend(BaseBackend): return mount_target def describe_mount_targets( - self, max_items, file_system_id, mount_target_id, access_point_id, marker - ): + self, + max_items: int, + file_system_id: Optional[str], + mount_target_id: Optional[str], + access_point_id: Optional[str], + marker: Optional[str], + ) -> Tuple[Optional[str], List[MountTarget]]: """Describe the mount targets given an access point ID, mount target ID or a file system ID. https://docs.aws.amazon.com/efs/latest/ug/API_DescribeMountTargets.html @@ -549,7 +590,7 @@ class EFSBackend(BaseBackend): if marker not in self.next_markers: raise BadRequest("Invalid Marker") corpus_mtids = {m.mount_target_id for m in corpus} - marked_mtids = {m.mount_target_id for m in self.next_markers[marker]} + marked_mtids = {m.mount_target_id for m in self.next_markers[marker]} # type: ignore[union-attr] mt_ids = corpus_mtids & marked_mtids corpus = [self.mount_targets_by_id[mt_id] for mt_id in mt_ids] @@ -558,7 +599,7 @@ class EFSBackend(BaseBackend): next_marker = self._mark_description(corpus, max_items) return next_marker, mount_targets - def delete_file_system(self, file_system_id): + def delete_file_system(self, file_system_id: str) -> None: """Delete the file system specified by the given file_system_id. Note that mount targets must be deleted first. @@ -576,9 +617,8 @@ class EFSBackend(BaseBackend): del self.file_systems_by_id[file_system_id] self.creation_tokens.remove(file_system.creation_token) - return - def delete_mount_target(self, mount_target_id): + def delete_mount_target(self, mount_target_id: str) -> None: """Delete a mount target specified by the given mount_target_id. Note that this will also delete a network interface. @@ -592,32 +632,39 @@ class EFSBackend(BaseBackend): self.ec2_backend.delete_network_interface(mount_target.network_interface_id) del self.mount_targets_by_id[mount_target_id] mount_target.clean_up() - return - def describe_backup_policy(self, file_system_id): + def describe_backup_policy(self, file_system_id: str) -> Dict[str, str]: backup_policy = self.file_systems_by_id[file_system_id].backup_policy if not backup_policy: raise PolicyNotFound("None") return backup_policy - def put_lifecycle_configuration(self, file_system_id, policies): + def put_lifecycle_configuration( + self, file_system_id: str, policies: List[Dict[str, str]] + ) -> None: _, fss = self.describe_file_systems(file_system_id=file_system_id) file_system = fss[0] file_system.lifecycle_policies = policies - def describe_lifecycle_configuration(self, file_system_id): + def describe_lifecycle_configuration( + self, file_system_id: str + ) -> List[Dict[str, str]]: _, fss = self.describe_file_systems(file_system_id=file_system_id) file_system = fss[0] return file_system.lifecycle_policies - def describe_mount_target_security_groups(self, mount_target_id): + def describe_mount_target_security_groups( + self, mount_target_id: str + ) -> Optional[List[str]]: if mount_target_id not in self.mount_targets_by_id: raise MountTargetNotFound(mount_target_id) mount_target = self.mount_targets_by_id[mount_target_id] return mount_target.security_groups - def modify_mount_target_security_groups(self, mount_target_id, security_groups): + def modify_mount_target_security_groups( + self, mount_target_id: str, security_groups: List[str] + ) -> None: if mount_target_id not in self.mount_targets_by_id: raise MountTargetNotFound(mount_target_id) @@ -629,8 +676,13 @@ class EFSBackend(BaseBackend): ) def create_access_point( - self, client_token, tags, file_system_id, posix_user, root_directory - ): + self, + client_token: str, + tags: List[Dict[str, str]], + file_system_id: str, + posix_user: Dict[str, Any], + root_directory: Dict[str, Any], + ) -> AccessPoint: name = next((tag["Value"] for tag in tags if tag["Key"] == "Name"), None) access_point = AccessPoint( self.account_id, @@ -646,7 +698,7 @@ class EFSBackend(BaseBackend): self.access_points[access_point.access_point_id] = access_point return access_point - def describe_access_points(self, access_point_id): + def describe_access_points(self, access_point_id: str) -> List[AccessPoint]: """ Pagination is not yet implemented """ @@ -654,18 +706,18 @@ class EFSBackend(BaseBackend): if access_point_id not in self.access_points: raise AccessPointNotFound(access_point_id) return [self.access_points[access_point_id]] - return self.access_points.values() + return list(self.access_points.values()) - def delete_access_point(self, access_point_id): + def delete_access_point(self, access_point_id: str) -> None: self.access_points.pop(access_point_id, None) - def list_tags_for_resource(self, resource_id): + def list_tags_for_resource(self, resource_id: str) -> List[Dict[str, str]]: return self.tagging_service.list_tags_for_resource(resource_id)["Tags"] - def tag_resource(self, resource_id, tags): + def tag_resource(self, resource_id: str, tags: List[Dict[str, str]]) -> None: self.tagging_service.tag_resource(resource_id, tags) - def untag_resource(self, resource_id, tag_keys): + def untag_resource(self, resource_id: str, tag_keys: List[str]) -> None: self.tagging_service.untag_resource_using_names(resource_id, tag_keys) diff --git a/moto/efs/responses.py b/moto/efs/responses.py index 156bfed35..c2b4f4412 100644 --- a/moto/efs/responses.py +++ b/moto/efs/responses.py @@ -1,19 +1,23 @@ import json +from typing import Any, Dict, Tuple, Union from moto.core.responses import BaseResponse -from .models import efs_backends +from .models import efs_backends, EFSBackend + + +TYPE_RESPONSE = Tuple[str, Dict[str, Union[str, int]]] class EFSResponse(BaseResponse): - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="efs") @property - def efs_backend(self): + def efs_backend(self) -> EFSBackend: return efs_backends[self.current_account][self.region] - def create_file_system(self): + def create_file_system(self) -> TYPE_RESPONSE: creation_token = self._get_param("CreationToken") performance_mode = self._get_param("PerformanceMode") encrypted = self._get_param("Encrypted") @@ -41,7 +45,7 @@ class EFSResponse(BaseResponse): {"status": 201, "Content-Type": "application/json"}, ) - def describe_file_systems(self): + def describe_file_systems(self) -> TYPE_RESPONSE: max_items = self._get_int_param("MaxItems", 10) marker = self._get_param("Marker") creation_token = self._get_param("CreationToken") @@ -52,14 +56,16 @@ class EFSResponse(BaseResponse): creation_token=creation_token, file_system_id=file_system_id, ) - resp_json = {"FileSystems": [fs.info_json() for fs in file_systems]} + resp_json: Dict[str, Any] = { + "FileSystems": [fs.info_json() for fs in file_systems] + } if marker: resp_json["Marker"] = marker if next_marker: resp_json["NextMarker"] = next_marker return json.dumps(resp_json), {"Content-Type": "application/json"} - def create_mount_target(self): + def create_mount_target(self) -> TYPE_RESPONSE: file_system_id = self._get_param("FileSystemId") subnet_id = self._get_param("SubnetId") ip_address = self._get_param("IpAddress") @@ -75,7 +81,7 @@ class EFSResponse(BaseResponse): {"Content-Type": "application/json"}, ) - def describe_mount_targets(self): + def describe_mount_targets(self) -> TYPE_RESPONSE: max_items = self._get_int_param("MaxItems", 10) marker = self._get_param("Marker") file_system_id = self._get_param("FileSystemId") @@ -88,30 +94,32 @@ class EFSResponse(BaseResponse): access_point_id=access_point_id, marker=marker, ) - resp_json = {"MountTargets": [mt.info_json() for mt in mount_targets]} + resp_json: Dict[str, Any] = { + "MountTargets": [mt.info_json() for mt in mount_targets] + } if marker: resp_json["Marker"] = marker if next_marker: resp_json["NextMarker"] = next_marker return json.dumps(resp_json), {"Content-Type": "application/json"} - def delete_file_system(self): + def delete_file_system(self) -> TYPE_RESPONSE: file_system_id = self._get_param("FileSystemId") self.efs_backend.delete_file_system(file_system_id) return json.dumps(dict()), {"status": 204, "Content-Type": "application/json"} - def delete_mount_target(self): + def delete_mount_target(self) -> TYPE_RESPONSE: mount_target_id = self._get_param("MountTargetId") self.efs_backend.delete_mount_target(mount_target_id) return json.dumps(dict()), {"status": 204, "Content-Type": "application/json"} - def describe_backup_policy(self): + def describe_backup_policy(self) -> TYPE_RESPONSE: file_system_id = self._get_param("FileSystemId") backup_policy = self.efs_backend.describe_backup_policy(file_system_id) resp = {"BackupPolicy": backup_policy} return json.dumps(resp), {"Content-Type": "application/json"} - def put_lifecycle_configuration(self): + def put_lifecycle_configuration(self) -> TYPE_RESPONSE: file_system_id = self._get_param("FileSystemId") policies = self._get_param("LifecyclePolicies") self.efs_backend.put_lifecycle_configuration(file_system_id, policies) @@ -119,14 +127,14 @@ class EFSResponse(BaseResponse): "Content-Type": "application/json" } - def describe_lifecycle_configuration(self): + def describe_lifecycle_configuration(self) -> TYPE_RESPONSE: file_system_id = self._get_param("FileSystemId") policies = self.efs_backend.describe_lifecycle_configuration(file_system_id) return json.dumps({"LifecyclePolicies": policies}), { "Content-Type": "application/json" } - def describe_mount_target_security_groups(self): + def describe_mount_target_security_groups(self) -> TYPE_RESPONSE: mount_target_id = self._get_param("MountTargetId") security_groups = self.efs_backend.describe_mount_target_security_groups( mount_target_id @@ -135,7 +143,7 @@ class EFSResponse(BaseResponse): "Content-Type": "application/json" } - def modify_mount_target_security_groups(self): + def modify_mount_target_security_groups(self) -> TYPE_RESPONSE: mount_target_id = self._get_param("MountTargetId") security_groups = self._get_param("SecurityGroups") self.efs_backend.modify_mount_target_security_groups( @@ -143,7 +151,7 @@ class EFSResponse(BaseResponse): ) return "{}", {"Content-Type": "application/json"} - def create_access_point(self): + def create_access_point(self) -> TYPE_RESPONSE: client_token = self._get_param("ClientToken") tags = self._get_param("Tags") or [] file_system_id = self._get_param("FileSystemId") @@ -160,29 +168,29 @@ class EFSResponse(BaseResponse): "Content-Type": "application/json" } - def describe_access_points(self): + def describe_access_points(self) -> TYPE_RESPONSE: access_point_id = self._get_param("AccessPointId") access_points = self.efs_backend.describe_access_points(access_point_id) resp = [ap.info_json() for ap in access_points] return json.dumps({"AccessPoints": resp}), {"Content-Type": "application/json"} - def delete_access_point(self): + def delete_access_point(self) -> TYPE_RESPONSE: access_point_id = self._get_param("AccessPointId") self.efs_backend.delete_access_point(access_point_id) return "{}", {"Content-Type": "application/json"} - def list_tags_for_resource(self): + def list_tags_for_resource(self) -> TYPE_RESPONSE: resource_id = self._get_param("ResourceId") tags = self.efs_backend.list_tags_for_resource(resource_id) return json.dumps({"Tags": tags}), {"Content-Type": "application/json"} - def tag_resource(self): + def tag_resource(self) -> TYPE_RESPONSE: resource_id = self._get_param("ResourceId") tags = self._get_param("Tags") self.efs_backend.tag_resource(resource_id, tags) return "{}", {"Content-Type": "application/json"} - def untag_resource(self): + def untag_resource(self) -> TYPE_RESPONSE: resource_id = self._get_param("ResourceId") tag_keys = self.querystring.get("tagKeys", []) self.efs_backend.untag_resource(resource_id, tag_keys) diff --git a/setup.cfg b/setup.cfg index 741d0ddf1..de3c1ad34 100644 --- a/setup.cfg +++ b/setup.cfg @@ -229,7 +229,7 @@ disable = W,C,R,E enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import [mypy] -files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/es,moto/moto_api +files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/es,moto/moto_api show_column_numbers=True show_error_codes = True disable_error_code=abstract