Techdebt: MyPy EFS (#5954)

This commit is contained in:
Bert Blommers 2023-02-21 10:16:44 -01:00 committed by GitHub
parent d001f6a226
commit 23d9430590
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 229 additions and 177 deletions

View File

@ -197,6 +197,8 @@ class Subnet(TaggedEC2Resource, CloudFormationModel):
return new_ip
# EFS calls this method as request_ip(str, MountTarget)
# So technically it's not just Instances that are stored
def request_ip(self, ip: str, instance: "Instance") -> None:
if ipaddress.ip_address(ip) not in self.cidr:
raise Exception(f"IP does not fall in the subnet CIDR of {self.cidr}")

View File

@ -8,7 +8,7 @@ class EFSError(JsonRESTError):
class AccessPointNotFound(EFSError):
code = 404
def __init__(self, access_point_id):
def __init__(self, access_point_id: str):
super().__init__(
"AccessPointNotFound", f"Access Point {access_point_id} does not exist."
)
@ -17,93 +17,83 @@ class AccessPointNotFound(EFSError):
class FileSystemAlreadyExists(EFSError):
code = 409
def __init__(self, creation_token, *args, **kwargs):
def __init__(self, creation_token: str):
super().__init__(
"FileSystemAlreadyExists",
f"File system with {creation_token} already exists.",
*args,
**kwargs,
)
class FileSystemNotFound(EFSError):
code = 404
def __init__(self, file_system_id, *args, **kwargs):
def __init__(self, file_system_id: str):
super().__init__(
"FileSystemNotFound",
f"File system {file_system_id} does not exist.",
*args,
**kwargs,
)
class FileSystemInUse(EFSError):
code = 409
def __init__(self, msg, *args, **kwargs):
super().__init__("FileSystemInUse", msg, *args, **kwargs)
def __init__(self, msg: str):
super().__init__("FileSystemInUse", msg)
class MountTargetConflict(EFSError):
code = 409
def __init__(self, msg, *args, **kwargs):
super().__init__("MountTargetConflict", msg, *args, **kwargs)
def __init__(self, msg: str):
super().__init__("MountTargetConflict", msg)
class MountTargetNotFound(EFSError):
code = 404
def __init__(self, mount_target_id, *args, **kwargs):
def __init__(self, mount_target_id: str):
super().__init__(
"MountTargetNotFound",
f"Mount target '{mount_target_id}' does not exist.",
*args,
**kwargs,
)
class BadRequest(EFSError):
code = 400
def __init__(self, msg, *args, **kwargs):
super().__init__("BadRequest", msg, *args, **kwargs)
def __init__(self, msg: str):
super().__init__("BadRequest", msg)
class PolicyNotFound(EFSError):
code = 404
def __init__(self, *args, **kwargs):
super().__init__("PolicyNotFound", *args, **kwargs)
def __init__(self, msg: str):
super().__init__("PolicyNotFound", msg)
class SubnetNotFound(EFSError):
code = 404
def __init__(self, subnet_id, *args, **kwargs):
def __init__(self, subnet_id: str):
super().__init__(
"SubnetNotFound",
f"The subnet ID '{subnet_id}' does not exist",
*args,
**kwargs,
)
class SecurityGroupNotFound(EFSError):
code = 404
def __init__(self, security_group_id, *args, **kwargs):
def __init__(self, security_group_id: str):
super().__init__(
"SecurityGroupNotFound",
f"The SecurityGroup ID '{security_group_id}' does not exist",
*args,
**kwargs,
)
class SecurityGroupLimitExceeded(EFSError):
code = 400
def __init__(self, msg, *args, **kwargs):
super().__init__("SecurityGroupLimitExceeded", msg, *args, **kwargs)
def __init__(self, msg: str):
super().__init__("SecurityGroupLimitExceeded", msg)

View File

@ -7,10 +7,13 @@ https://docs.aws.amazon.com/efs/latest/ug/whatisefs.html
import json
import time
from copy import deepcopy
from typing import Any, Dict, List, Optional, Tuple, Set, Iterator, Union
from moto.core import BaseBackend, BackendDict, BaseModel, CloudFormationModel
from moto.core.utils import camelcase_to_underscores, underscores_to_camelcase
from moto.ec2 import ec2_backends
from moto.ec2.models.elastic_network_interfaces import NetworkInterface
from moto.ec2.models.subnets import Subnet
from moto.ec2.exceptions import InvalidSubnetIdError
from moto.efs.exceptions import (
AccessPointNotFound,
@ -30,25 +33,26 @@ from moto.utilities.tagging_service import TaggingService
from moto.utilities.utils import md5_hash
def _lookup_az_id(account_id, az_name):
def _lookup_az_id(account_id: str, az_name: str) -> Optional[str]:
"""Find the Availability zone ID given the AZ name."""
ec2 = ec2_backends[account_id][az_name[:-1]]
for zone in ec2.describe_availability_zones():
if zone.name == az_name:
return zone.zone_id
return None
class AccessPoint(BaseModel):
def __init__(
self,
account_id,
region_name,
client_token,
file_system_id,
name,
posix_user,
root_directory,
context,
account_id: str,
region_name: str,
client_token: str,
file_system_id: str,
name: Optional[str],
posix_user: Dict[str, Any],
root_directory: Dict[str, str],
context: "EFSBackend",
):
self.access_point_id = mock_random.get_random_hex(8)
self.access_point_arn = f"arn:aws:elasticfilesystem:{region_name}:{account_id}:access-point/fsap-{self.access_point_id}"
@ -64,7 +68,7 @@ class AccessPoint(BaseModel):
self.root_directory = root_directory
self.context = context
def info_json(self):
def info_json(self) -> Dict[str, Any]:
tags = self.context.list_tags_for_resource(self.access_point_id)
return {
"ClientToken": self.client_token,
@ -85,20 +89,18 @@ class FileSystem(CloudFormationModel):
def __init__(
self,
account_id,
region_name,
creation_token,
file_system_id,
context,
performance_mode,
encrypted,
kms_key_id,
throughput_mode,
provisioned_throughput_in_mibps,
availability_zone_name,
backup,
lifecycle_policies=None,
file_system_policy=None,
account_id: str,
region_name: str,
creation_token: str,
file_system_id: str,
context: "EFSBackend",
performance_mode: str,
encrypted: bool,
kms_key_id: str,
throughput_mode: str,
provisioned_throughput_in_mibps: int,
availability_zone_name: str,
backup: bool,
):
if availability_zone_name:
backup = True
@ -119,8 +121,8 @@ class FileSystem(CloudFormationModel):
account_id, self.availability_zone_name
)
self._backup = backup
self.lifecycle_policies = lifecycle_policies or []
self.file_system_policy = file_system_policy
self.lifecycle_policies: List[Dict[str, str]] = []
self.file_system_policy: Optional[str] = None
self._context = context
@ -132,11 +134,11 @@ class FileSystem(CloudFormationModel):
# Initialize some state parameters
self.life_cycle_state = "available"
self._mount_targets = {}
self._mount_targets: Dict[str, MountTarget] = {}
self._size_value = 0
@property
def size_in_bytes(self):
def size_in_bytes(self) -> Dict[str, Any]: # type: ignore[misc]
return {
"Value": self._size_value,
"ValueInIA": 0,
@ -145,21 +147,21 @@ class FileSystem(CloudFormationModel):
}
@property
def physical_resource_id(self):
def physical_resource_id(self) -> str:
return self.file_system_id
@property
def number_of_mount_targets(self):
def number_of_mount_targets(self) -> int:
return len(self._mount_targets)
@property
def backup_policy(self):
def backup_policy(self) -> Optional[Dict[str, str]]:
if self._backup:
return {"Status": "ENABLED"}
else:
return
return None
def info_json(self):
def info_json(self) -> Dict[str, Any]:
ret = {
underscores_to_camelcase(k.capitalize()): v
for k, v in self.__dict__.items()
@ -180,7 +182,7 @@ class FileSystem(CloudFormationModel):
)
return ret
def add_mount_target(self, subnet, mount_target):
def add_mount_target(self, subnet: Subnet, mount_target: "MountTarget") -> None:
# Check that the mount target doesn't violate constraints.
for other_mount_target in self._mount_targets.values():
if other_mount_target.subnet_vpc_id != subnet.vpc_id:
@ -193,28 +195,33 @@ class FileSystem(CloudFormationModel):
self._mount_targets[subnet.availability_zone] = mount_target
def has_mount_target(self, subnet):
def has_mount_target(self, subnet: Subnet) -> bool:
return subnet.availability_zone in self._mount_targets
def iter_mount_targets(self):
def iter_mount_targets(self) -> Iterator["MountTarget"]:
for mt in self._mount_targets.values():
yield mt
def remove_mount_target(self, subnet):
def remove_mount_target(self, subnet: Subnet) -> None:
del self._mount_targets[subnet.availability_zone]
@staticmethod
def cloudformation_name_type():
return
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type():
def cloudformation_type() -> str:
return "AWS::EFS::FileSystem"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "FileSystem":
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-efs-filesystem.html
props = deepcopy(cloudformation_json["Properties"])
props = {camelcase_to_underscores(k): v for k, v in props.items()}
@ -238,29 +245,40 @@ class FileSystem(CloudFormationModel):
)
@classmethod
def update_from_cloudformation_json(
def update_from_cloudformation_json( # type: ignore[misc]
cls,
original_resource,
new_resource_name,
cloudformation_json,
account_id,
region_name,
):
original_resource: Any,
new_resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
) -> None:
raise NotImplementedError(
"Update of EFS File System via cloudformation is not yet implemented."
)
@classmethod
def delete_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name
):
return efs_backends[account_id][region_name].delete_file_system(resource_name)
def delete_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
) -> None:
efs_backends[account_id][region_name].delete_file_system(resource_name)
class MountTarget(CloudFormationModel):
"""A model for an EFS Mount Target."""
def __init__(self, account_id, file_system, subnet, ip_address, security_groups):
def __init__(
self,
account_id: str,
file_system: FileSystem,
subnet: Subnet,
ip_address: Optional[str],
security_groups: Optional[List[str]],
):
# Set the simple given parameters.
self.file_system_id = file_system.file_system_id
self._file_system = file_system
@ -278,10 +296,10 @@ class MountTarget(CloudFormationModel):
# Get an IP address if needed, otherwise validate the one we're given.
if ip_address is None:
ip_address = subnet.get_available_subnet_ip(self)
ip_address = subnet.get_available_subnet_ip(self) # type: ignore[arg-type]
else:
try:
subnet.request_ip(ip_address, self)
subnet.request_ip(ip_address, self) # type: ignore[arg-type]
except Exception as e:
if "IP" in str(e) and "CIDR" in str(e):
raise BadRequest(
@ -295,68 +313,76 @@ class MountTarget(CloudFormationModel):
self.owner_id = account_id
self.mount_target_id = f"fsmt-{mock_random.get_random_hex()}"
self.life_cycle_state = "available"
self.network_interface_id = None
self.network_interface_id: Optional[str] = None
self.availability_zone_id = subnet.availability_zone_id
self.availability_zone_name = subnet.availability_zone
def clean_up(self):
def clean_up(self) -> None:
self._file_system.remove_mount_target(self._subnet)
self._subnet.del_subnet_ip(self.ip_address)
def set_network_interface(self, network_interface):
def set_network_interface(self, network_interface: NetworkInterface) -> None:
self.network_interface_id = network_interface.id
def info_json(self):
ret = {
def info_json(self) -> Dict[str, Any]:
return {
underscores_to_camelcase(k.capitalize()): v
for k, v in self.__dict__.items()
if not k.startswith("_")
}
return ret
@property
def physical_resource_id(self):
return self.mounted_target_id
def physical_resource_id(self) -> str:
return self.mount_target_id
@property
def subnet_vpc_id(self):
def subnet_vpc_id(self) -> str:
return self._subnet.vpc_id
@staticmethod
def cloudformation_name_type():
pass
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type():
def cloudformation_type() -> str:
return "AWS::EFS::MountTarget"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "MountTarget":
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-efs-mounttarget.html
props = deepcopy(cloudformation_json["Properties"])
props = {camelcase_to_underscores(k): v for k, v in props.items()}
return efs_backends[account_id][region_name].create_mount_target(**props)
@classmethod
def update_from_cloudformation_json(
def update_from_cloudformation_json( # type: ignore[misc]
cls,
original_resource,
new_resource_name,
cloudformation_json,
account_id,
region_name,
):
original_resource: Any,
new_resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
) -> None:
raise NotImplementedError(
"Updates of EFS Mount Target via cloudformation are not yet implemented."
)
@classmethod
def delete_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name
):
return efs_backends[account_id][region_name].delete_mount_target(resource_name)
def delete_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
) -> None:
efs_backends[account_id][region_name].delete_mount_target(resource_name)
class EFSBackend(BaseBackend):
@ -367,16 +393,18 @@ class EFSBackend(BaseBackend):
such resources should always go through this class.
"""
def __init__(self, region_name, account_id):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.creation_tokens = set()
self.access_points = dict()
self.file_systems_by_id = {}
self.mount_targets_by_id = {}
self.next_markers = {}
self.creation_tokens: Set[str] = set()
self.access_points: Dict[str, AccessPoint] = dict()
self.file_systems_by_id: Dict[str, FileSystem] = {}
self.mount_targets_by_id: Dict[str, MountTarget] = {}
self.next_markers: Dict[str, Union[List[MountTarget], List[FileSystem]]] = {}
self.tagging_service = TaggingService()
def _mark_description(self, corpus, max_items):
def _mark_description(
self, corpus: Union[List[MountTarget], List[FileSystem]], max_items: int
) -> Optional[str]:
if max_items < len(corpus):
new_corpus = corpus[max_items:]
new_corpus_dict = [c.info_json() for c in new_corpus]
@ -388,21 +416,21 @@ class EFSBackend(BaseBackend):
return next_marker
@property
def ec2_backend(self):
def ec2_backend(self) -> Any: # type: ignore[misc]
return ec2_backends[self.account_id][self.region_name]
def create_file_system(
self,
creation_token,
performance_mode,
encrypted,
kms_key_id,
throughput_mode,
provisioned_throughput_in_mibps,
availability_zone_name,
backup,
tags,
):
creation_token: str,
performance_mode: str,
encrypted: bool,
kms_key_id: str,
throughput_mode: str,
provisioned_throughput_in_mibps: int,
availability_zone_name: str,
backup: bool,
tags: List[Dict[str, str]],
) -> FileSystem:
"""Create a new EFS File System Volume.
https://docs.aws.amazon.com/efs/latest/ug/API_CreateFileSystem.html
@ -413,7 +441,7 @@ class EFSBackend(BaseBackend):
raise FileSystemAlreadyExists(creation_token)
# Create a new file system ID:
def make_id():
def make_id() -> str:
return f"fs-{mock_random.get_random_hex()}"
fsid = make_id()
@ -438,13 +466,17 @@ class EFSBackend(BaseBackend):
return self.file_systems_by_id[fsid]
def describe_file_systems(
self, marker=None, max_items=10, creation_token=None, file_system_id=None
):
self,
marker: Optional[str] = None,
max_items: int = 10,
creation_token: Optional[str] = None,
file_system_id: Optional[str] = None,
) -> Tuple[Optional[str], List[FileSystem]]:
"""Describe all the EFS File Systems, or specific File Systems.
https://docs.aws.amazon.com/efs/latest/ug/API_DescribeFileSystems.html
"""
# Restrict the possible corpus of resules based on inputs.
# Restrict the possible corpus of results based on inputs.
if creation_token and file_system_id:
raise BadRequest(
"Request cannot contain both a file system ID and a creation token."
@ -464,7 +496,7 @@ class EFSBackend(BaseBackend):
# Handle the case that a marker is given.
if marker not in self.next_markers:
raise BadRequest("Invalid Marker")
corpus = self.next_markers[marker]
corpus = self.next_markers[marker] # type: ignore[assignment]
else:
# Handle the vanilla case.
corpus = [fs for fs in self.file_systems_by_id.values()]
@ -475,8 +507,12 @@ class EFSBackend(BaseBackend):
return next_marker, file_systems
def create_mount_target(
self, file_system_id, subnet_id, ip_address=None, security_groups=None
):
self,
file_system_id: str,
subnet_id: str,
ip_address: Optional[str] = None,
security_groups: Optional[List[str]] = None,
) -> MountTarget:
"""Create a new EFS Mount Target for a given File System to a given subnet.
Note that you can only create one mount target for each availability zone
@ -516,8 +552,13 @@ class EFSBackend(BaseBackend):
return mount_target
def describe_mount_targets(
self, max_items, file_system_id, mount_target_id, access_point_id, marker
):
self,
max_items: int,
file_system_id: Optional[str],
mount_target_id: Optional[str],
access_point_id: Optional[str],
marker: Optional[str],
) -> Tuple[Optional[str], List[MountTarget]]:
"""Describe the mount targets given an access point ID, mount target ID or a file system ID.
https://docs.aws.amazon.com/efs/latest/ug/API_DescribeMountTargets.html
@ -549,7 +590,7 @@ class EFSBackend(BaseBackend):
if marker not in self.next_markers:
raise BadRequest("Invalid Marker")
corpus_mtids = {m.mount_target_id for m in corpus}
marked_mtids = {m.mount_target_id for m in self.next_markers[marker]}
marked_mtids = {m.mount_target_id for m in self.next_markers[marker]} # type: ignore[union-attr]
mt_ids = corpus_mtids & marked_mtids
corpus = [self.mount_targets_by_id[mt_id] for mt_id in mt_ids]
@ -558,7 +599,7 @@ class EFSBackend(BaseBackend):
next_marker = self._mark_description(corpus, max_items)
return next_marker, mount_targets
def delete_file_system(self, file_system_id):
def delete_file_system(self, file_system_id: str) -> None:
"""Delete the file system specified by the given file_system_id.
Note that mount targets must be deleted first.
@ -576,9 +617,8 @@ class EFSBackend(BaseBackend):
del self.file_systems_by_id[file_system_id]
self.creation_tokens.remove(file_system.creation_token)
return
def delete_mount_target(self, mount_target_id):
def delete_mount_target(self, mount_target_id: str) -> None:
"""Delete a mount target specified by the given mount_target_id.
Note that this will also delete a network interface.
@ -592,32 +632,39 @@ class EFSBackend(BaseBackend):
self.ec2_backend.delete_network_interface(mount_target.network_interface_id)
del self.mount_targets_by_id[mount_target_id]
mount_target.clean_up()
return
def describe_backup_policy(self, file_system_id):
def describe_backup_policy(self, file_system_id: str) -> Dict[str, str]:
backup_policy = self.file_systems_by_id[file_system_id].backup_policy
if not backup_policy:
raise PolicyNotFound("None")
return backup_policy
def put_lifecycle_configuration(self, file_system_id, policies):
def put_lifecycle_configuration(
self, file_system_id: str, policies: List[Dict[str, str]]
) -> None:
_, fss = self.describe_file_systems(file_system_id=file_system_id)
file_system = fss[0]
file_system.lifecycle_policies = policies
def describe_lifecycle_configuration(self, file_system_id):
def describe_lifecycle_configuration(
self, file_system_id: str
) -> List[Dict[str, str]]:
_, fss = self.describe_file_systems(file_system_id=file_system_id)
file_system = fss[0]
return file_system.lifecycle_policies
def describe_mount_target_security_groups(self, mount_target_id):
def describe_mount_target_security_groups(
self, mount_target_id: str
) -> Optional[List[str]]:
if mount_target_id not in self.mount_targets_by_id:
raise MountTargetNotFound(mount_target_id)
mount_target = self.mount_targets_by_id[mount_target_id]
return mount_target.security_groups
def modify_mount_target_security_groups(self, mount_target_id, security_groups):
def modify_mount_target_security_groups(
self, mount_target_id: str, security_groups: List[str]
) -> None:
if mount_target_id not in self.mount_targets_by_id:
raise MountTargetNotFound(mount_target_id)
@ -629,8 +676,13 @@ class EFSBackend(BaseBackend):
)
def create_access_point(
self, client_token, tags, file_system_id, posix_user, root_directory
):
self,
client_token: str,
tags: List[Dict[str, str]],
file_system_id: str,
posix_user: Dict[str, Any],
root_directory: Dict[str, Any],
) -> AccessPoint:
name = next((tag["Value"] for tag in tags if tag["Key"] == "Name"), None)
access_point = AccessPoint(
self.account_id,
@ -646,7 +698,7 @@ class EFSBackend(BaseBackend):
self.access_points[access_point.access_point_id] = access_point
return access_point
def describe_access_points(self, access_point_id):
def describe_access_points(self, access_point_id: str) -> List[AccessPoint]:
"""
Pagination is not yet implemented
"""
@ -654,18 +706,18 @@ class EFSBackend(BaseBackend):
if access_point_id not in self.access_points:
raise AccessPointNotFound(access_point_id)
return [self.access_points[access_point_id]]
return self.access_points.values()
return list(self.access_points.values())
def delete_access_point(self, access_point_id):
def delete_access_point(self, access_point_id: str) -> None:
self.access_points.pop(access_point_id, None)
def list_tags_for_resource(self, resource_id):
def list_tags_for_resource(self, resource_id: str) -> List[Dict[str, str]]:
return self.tagging_service.list_tags_for_resource(resource_id)["Tags"]
def tag_resource(self, resource_id, tags):
def tag_resource(self, resource_id: str, tags: List[Dict[str, str]]) -> None:
self.tagging_service.tag_resource(resource_id, tags)
def untag_resource(self, resource_id, tag_keys):
def untag_resource(self, resource_id: str, tag_keys: List[str]) -> None:
self.tagging_service.untag_resource_using_names(resource_id, tag_keys)

View File

@ -1,19 +1,23 @@
import json
from typing import Any, Dict, Tuple, Union
from moto.core.responses import BaseResponse
from .models import efs_backends
from .models import efs_backends, EFSBackend
TYPE_RESPONSE = Tuple[str, Dict[str, Union[str, int]]]
class EFSResponse(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="efs")
@property
def efs_backend(self):
def efs_backend(self) -> EFSBackend:
return efs_backends[self.current_account][self.region]
def create_file_system(self):
def create_file_system(self) -> TYPE_RESPONSE:
creation_token = self._get_param("CreationToken")
performance_mode = self._get_param("PerformanceMode")
encrypted = self._get_param("Encrypted")
@ -41,7 +45,7 @@ class EFSResponse(BaseResponse):
{"status": 201, "Content-Type": "application/json"},
)
def describe_file_systems(self):
def describe_file_systems(self) -> TYPE_RESPONSE:
max_items = self._get_int_param("MaxItems", 10)
marker = self._get_param("Marker")
creation_token = self._get_param("CreationToken")
@ -52,14 +56,16 @@ class EFSResponse(BaseResponse):
creation_token=creation_token,
file_system_id=file_system_id,
)
resp_json = {"FileSystems": [fs.info_json() for fs in file_systems]}
resp_json: Dict[str, Any] = {
"FileSystems": [fs.info_json() for fs in file_systems]
}
if marker:
resp_json["Marker"] = marker
if next_marker:
resp_json["NextMarker"] = next_marker
return json.dumps(resp_json), {"Content-Type": "application/json"}
def create_mount_target(self):
def create_mount_target(self) -> TYPE_RESPONSE:
file_system_id = self._get_param("FileSystemId")
subnet_id = self._get_param("SubnetId")
ip_address = self._get_param("IpAddress")
@ -75,7 +81,7 @@ class EFSResponse(BaseResponse):
{"Content-Type": "application/json"},
)
def describe_mount_targets(self):
def describe_mount_targets(self) -> TYPE_RESPONSE:
max_items = self._get_int_param("MaxItems", 10)
marker = self._get_param("Marker")
file_system_id = self._get_param("FileSystemId")
@ -88,30 +94,32 @@ class EFSResponse(BaseResponse):
access_point_id=access_point_id,
marker=marker,
)
resp_json = {"MountTargets": [mt.info_json() for mt in mount_targets]}
resp_json: Dict[str, Any] = {
"MountTargets": [mt.info_json() for mt in mount_targets]
}
if marker:
resp_json["Marker"] = marker
if next_marker:
resp_json["NextMarker"] = next_marker
return json.dumps(resp_json), {"Content-Type": "application/json"}
def delete_file_system(self):
def delete_file_system(self) -> TYPE_RESPONSE:
file_system_id = self._get_param("FileSystemId")
self.efs_backend.delete_file_system(file_system_id)
return json.dumps(dict()), {"status": 204, "Content-Type": "application/json"}
def delete_mount_target(self):
def delete_mount_target(self) -> TYPE_RESPONSE:
mount_target_id = self._get_param("MountTargetId")
self.efs_backend.delete_mount_target(mount_target_id)
return json.dumps(dict()), {"status": 204, "Content-Type": "application/json"}
def describe_backup_policy(self):
def describe_backup_policy(self) -> TYPE_RESPONSE:
file_system_id = self._get_param("FileSystemId")
backup_policy = self.efs_backend.describe_backup_policy(file_system_id)
resp = {"BackupPolicy": backup_policy}
return json.dumps(resp), {"Content-Type": "application/json"}
def put_lifecycle_configuration(self):
def put_lifecycle_configuration(self) -> TYPE_RESPONSE:
file_system_id = self._get_param("FileSystemId")
policies = self._get_param("LifecyclePolicies")
self.efs_backend.put_lifecycle_configuration(file_system_id, policies)
@ -119,14 +127,14 @@ class EFSResponse(BaseResponse):
"Content-Type": "application/json"
}
def describe_lifecycle_configuration(self):
def describe_lifecycle_configuration(self) -> TYPE_RESPONSE:
file_system_id = self._get_param("FileSystemId")
policies = self.efs_backend.describe_lifecycle_configuration(file_system_id)
return json.dumps({"LifecyclePolicies": policies}), {
"Content-Type": "application/json"
}
def describe_mount_target_security_groups(self):
def describe_mount_target_security_groups(self) -> TYPE_RESPONSE:
mount_target_id = self._get_param("MountTargetId")
security_groups = self.efs_backend.describe_mount_target_security_groups(
mount_target_id
@ -135,7 +143,7 @@ class EFSResponse(BaseResponse):
"Content-Type": "application/json"
}
def modify_mount_target_security_groups(self):
def modify_mount_target_security_groups(self) -> TYPE_RESPONSE:
mount_target_id = self._get_param("MountTargetId")
security_groups = self._get_param("SecurityGroups")
self.efs_backend.modify_mount_target_security_groups(
@ -143,7 +151,7 @@ class EFSResponse(BaseResponse):
)
return "{}", {"Content-Type": "application/json"}
def create_access_point(self):
def create_access_point(self) -> TYPE_RESPONSE:
client_token = self._get_param("ClientToken")
tags = self._get_param("Tags") or []
file_system_id = self._get_param("FileSystemId")
@ -160,29 +168,29 @@ class EFSResponse(BaseResponse):
"Content-Type": "application/json"
}
def describe_access_points(self):
def describe_access_points(self) -> TYPE_RESPONSE:
access_point_id = self._get_param("AccessPointId")
access_points = self.efs_backend.describe_access_points(access_point_id)
resp = [ap.info_json() for ap in access_points]
return json.dumps({"AccessPoints": resp}), {"Content-Type": "application/json"}
def delete_access_point(self):
def delete_access_point(self) -> TYPE_RESPONSE:
access_point_id = self._get_param("AccessPointId")
self.efs_backend.delete_access_point(access_point_id)
return "{}", {"Content-Type": "application/json"}
def list_tags_for_resource(self):
def list_tags_for_resource(self) -> TYPE_RESPONSE:
resource_id = self._get_param("ResourceId")
tags = self.efs_backend.list_tags_for_resource(resource_id)
return json.dumps({"Tags": tags}), {"Content-Type": "application/json"}
def tag_resource(self):
def tag_resource(self) -> TYPE_RESPONSE:
resource_id = self._get_param("ResourceId")
tags = self._get_param("Tags")
self.efs_backend.tag_resource(resource_id, tags)
return "{}", {"Content-Type": "application/json"}
def untag_resource(self):
def untag_resource(self) -> TYPE_RESPONSE:
resource_id = self._get_param("ResourceId")
tag_keys = self.querystring.get("tagKeys", [])
self.efs_backend.untag_resource(resource_id, tag_keys)

View File

@ -229,7 +229,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/es,moto/moto_api
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/es,moto/moto_api
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract