From 8915ef60f9009eff1159260ee8ce266e4a90b9ea Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sat, 4 Feb 2023 11:31:16 -0100 Subject: [PATCH] Techdebt: MyPy EC2 (f-h-models) (#5900) --- moto/ec2/exceptions.py | 10 +-- moto/ec2/models/fleets.py | 113 ++++++++++++++++--------------- moto/ec2/models/flow_logs.py | 107 +++++++++++++++-------------- moto/ec2/models/hosts.py | 33 +++++---- moto/ec2/models/spot_requests.py | 25 +++---- moto/ec2/utils.py | 19 ++++-- setup.cfg | 2 +- 7 files changed, 168 insertions(+), 141 deletions(-) diff --git a/moto/ec2/exceptions.py b/moto/ec2/exceptions.py index 2d55e9830..1109bb6a1 100644 --- a/moto/ec2/exceptions.py +++ b/moto/ec2/exceptions.py @@ -110,7 +110,7 @@ class InvalidSubnetIdError(EC2ClientError): class InvalidFlowLogIdError(EC2ClientError): - def __init__(self, count, flow_log_ids): + def __init__(self, count: int, flow_log_ids: str): super().__init__( "InvalidFlowLogId.NotFound", f"These flow log ids in the input list are not found: [TotalCount: {count}] {flow_log_ids}", @@ -118,7 +118,7 @@ class InvalidFlowLogIdError(EC2ClientError): class FlowLogAlreadyExists(EC2ClientError): - def __init__(self): + def __init__(self) -> None: super().__init__( "FlowLogAlreadyExists", "Error. There is an existing Flow Log with the same configuration and log destination.", @@ -405,7 +405,7 @@ class InvalidNextToken(EC2ClientError): class InvalidDependantParameterError(EC2ClientError): - def __init__(self, dependant_parameter, parameter, parameter_value): + def __init__(self, dependant_parameter: str, parameter: str, parameter_value: str): super().__init__( "InvalidParameter", f"{dependant_parameter} can't be empty if {parameter} is {parameter_value}.", @@ -413,7 +413,7 @@ class InvalidDependantParameterError(EC2ClientError): class InvalidDependantParameterTypeError(EC2ClientError): - def __init__(self, dependant_parameter, parameter_value, parameter): + def __init__(self, dependant_parameter: str, parameter_value: str, parameter: str): super().__init__( "InvalidParameter", f"{dependant_parameter} type must be {parameter_value} if {parameter} is provided.", @@ -421,7 +421,7 @@ class InvalidDependantParameterTypeError(EC2ClientError): class InvalidAggregationIntervalParameterError(EC2ClientError): - def __init__(self, parameter): + def __init__(self, parameter: str): super().__init__("InvalidParameter", f"Invalid {parameter}") diff --git a/moto/ec2/models/fleets.py b/moto/ec2/models/fleets.py index 8597f54ef..6ec9ca40b 100644 --- a/moto/ec2/models/fleets.py +++ b/moto/ec2/models/fleets.py @@ -1,7 +1,8 @@ import datetime from collections import defaultdict +from typing import Any, Dict, List, Optional, Tuple -from moto.ec2.models.spot_requests import SpotFleetLaunchSpec +from moto.ec2.models.spot_requests import SpotFleetLaunchSpec, SpotInstanceRequest from .core import TaggedEC2Resource from ..utils import ( random_fleet_id, @@ -12,19 +13,19 @@ from ..utils import ( class Fleet(TaggedEC2Resource): def __init__( self, - ec2_backend, - fleet_id, - on_demand_options, - spot_options, - target_capacity_specification, - launch_template_configs, - excess_capacity_termination_policy, - replace_unhealthy_instances, - terminate_instances_with_expiration, - fleet_type, - valid_from, - valid_until, - tag_specifications, + ec2_backend: Any, + fleet_id: str, + on_demand_options: Dict[str, Any], + spot_options: Dict[str, Any], + target_capacity_specification: Dict[str, Any], + launch_template_configs: List[Dict[str, Any]], + excess_capacity_termination_policy: str, + replace_unhealthy_instances: bool, + terminate_instances_with_expiration: bool, + fleet_type: str, + valid_from: str, + valid_until: str, + tag_specifications: List[Dict[str, Any]], ): self.ec2_backend = ec2_backend @@ -50,18 +51,18 @@ class Fleet(TaggedEC2Resource): self.fulfilled_on_demand_capacity = 0.0 self.fulfilled_spot_capacity = 0.0 - self.launch_specs = [] + self.launch_specs: List[SpotFleetLaunchSpec] = [] - launch_specs_from_config = [] + launch_specs_from_config: List[Dict[str, Any]] = [] for config in launch_template_configs or []: - spec = config["LaunchTemplateSpecification"] - if "LaunchTemplateId" in spec: + launch_spec = config["LaunchTemplateSpecification"] + if "LaunchTemplateId" in launch_spec: launch_template = self.ec2_backend.get_launch_template( - template_id=spec["LaunchTemplateId"] + template_id=launch_spec["LaunchTemplateId"] ) - elif "LaunchTemplateName" in spec: + elif "LaunchTemplateName" in launch_spec: launch_template = self.ec2_backend.get_launch_template_by_name( - name=spec["LaunchTemplateName"] + name=launch_spec["LaunchTemplateName"] ) else: continue @@ -92,14 +93,14 @@ class Fleet(TaggedEC2Resource): ) ) - self.spot_requests = [] - self.on_demand_instances = [] + self.spot_requests: List[SpotInstanceRequest] = [] + self.on_demand_instances: List[Dict[str, Any]] = [] default_capacity = ( target_capacity_specification.get("DefaultTargetCapacityType") or "on-demand" ) self.target_capacity = int( - target_capacity_specification.get("TotalTargetCapacity") + target_capacity_specification.get("TotalTargetCapacity") # type: ignore[arg-type] ) self.spot_target_capacity = int( target_capacity_specification.get("SpotTargetCapacity", 0) @@ -122,13 +123,13 @@ class Fleet(TaggedEC2Resource): self.create_spot_requests(remaining_capacity) @property - def physical_resource_id(self): + def physical_resource_id(self) -> str: return self.id - def create_spot_requests(self, weight_to_add): + def create_spot_requests(self, weight_to_add: float) -> List[SpotInstanceRequest]: weight_map, added_weight = self.get_launch_spec_counts(weight_to_add) for launch_spec, count in weight_map.items(): - requests = self.ec2_backend.request_spot_instances( + requests = self.ec2_backend.request_spot_instances( # type: ignore[attr-defined] price=launch_spec.spot_price, image_id=launch_spec.image_id, count=count, @@ -153,10 +154,10 @@ class Fleet(TaggedEC2Resource): self.fulfilled_capacity += added_weight return self.spot_requests - def create_on_demand_requests(self, weight_to_add): + def create_on_demand_requests(self, weight_to_add: float) -> List[Dict[str, Any]]: weight_map, added_weight = self.get_launch_spec_counts(weight_to_add) for launch_spec, count in weight_map.items(): - reservation = self.ec2_backend.add_instances( + reservation = self.ec2_backend.add_instances( # type: ignore[attr-defined] image_id=launch_spec.image_id, count=count, instance_type=launch_spec.instance_type, @@ -184,10 +185,12 @@ class Fleet(TaggedEC2Resource): self.fulfilled_capacity += added_weight return self.on_demand_instances - def get_launch_spec_counts(self, weight_to_add): - weight_map = defaultdict(int) + def get_launch_spec_counts( + self, weight_to_add: float + ) -> Tuple[Dict[SpotFleetLaunchSpec, int], float]: + weight_map: Dict[SpotFleetLaunchSpec, int] = defaultdict(int) - weight_so_far = 0 + weight_so_far = 0.0 if ( self.spot_options and self.spot_options["AllocationStrategy"] == "diversified" @@ -217,15 +220,15 @@ class Fleet(TaggedEC2Resource): return weight_map, weight_so_far - def terminate_instances(self): + def terminate_instances(self) -> None: instance_ids = [] new_fulfilled_capacity = self.fulfilled_capacity for req in self.spot_requests + self.on_demand_instances: instance = None try: - instance = req.instance + instance = req.instance # type: ignore except AttributeError: - instance = req["instance"] + instance = req["instance"] # type: ignore[index] if instance.state == "terminated": continue @@ -249,23 +252,23 @@ class Fleet(TaggedEC2Resource): class FleetsBackend: - def __init__(self): - self.fleets = {} + def __init__(self) -> None: + self.fleets: Dict[str, Fleet] = {} def create_fleet( self, - on_demand_options, - spot_options, - target_capacity_specification, - launch_template_configs, - excess_capacity_termination_policy, - replace_unhealthy_instances, - terminate_instances_with_expiration, - fleet_type, - valid_from, - valid_until, - tag_specifications, - ): + on_demand_options: Dict[str, Any], + spot_options: Dict[str, Any], + target_capacity_specification: Dict[str, Any], + launch_template_configs: List[Dict[str, Any]], + excess_capacity_termination_policy: str, + replace_unhealthy_instances: bool, + terminate_instances_with_expiration: bool, + fleet_type: str, + valid_from: str, + valid_until: str, + tag_specifications: List[Dict[str, Any]], + ) -> Fleet: fleet_id = random_fleet_id() fleet = Fleet( @@ -286,24 +289,26 @@ class FleetsBackend: self.fleets[fleet_id] = fleet return fleet - def get_fleet(self, fleet_id): + def get_fleet(self, fleet_id: str) -> Optional[Fleet]: return self.fleets.get(fleet_id) - def describe_fleet_instances(self, fleet_id): + def describe_fleet_instances(self, fleet_id: str) -> List[Any]: fleet = self.get_fleet(fleet_id) if not fleet: return [] return fleet.spot_requests + fleet.on_demand_instances - def describe_fleets(self, fleet_ids): - fleets = self.fleets.values() + def describe_fleets(self, fleet_ids: Optional[List[str]]) -> List[Fleet]: + fleets = list(self.fleets.values()) if fleet_ids: fleets = [fleet for fleet in fleets if fleet.id in fleet_ids] return fleets - def delete_fleets(self, fleet_ids, terminate_instances): + def delete_fleets( + self, fleet_ids: List[str], terminate_instances: bool + ) -> List[Fleet]: fleets = [] for fleet_id in fleet_ids: fleet = self.fleets[fleet_id] diff --git a/moto/ec2/models/flow_logs.py b/moto/ec2/models/flow_logs.py index 32bac4a0a..cf53d34c9 100644 --- a/moto/ec2/models/flow_logs.py +++ b/moto/ec2/models/flow_logs.py @@ -1,5 +1,5 @@ import itertools -from collections import defaultdict +from typing import Any, Dict, List, Optional, Tuple from moto.core import CloudFormationModel from ..exceptions import ( FlowLogAlreadyExists, @@ -19,18 +19,18 @@ from ..utils import ( class FlowLogs(TaggedEC2Resource, CloudFormationModel): def __init__( self, - ec2_backend, - flow_log_id, - resource_id, - traffic_type, - log_destination, - log_group_name, - deliver_logs_permission_arn, - max_aggregation_interval, - log_destination_type, - log_format, - deliver_logs_status="SUCCESS", - deliver_logs_error_message=None, + ec2_backend: Any, + flow_log_id: str, + resource_id: str, + traffic_type: str, + log_destination: str, + log_group_name: str, + deliver_logs_permission_arn: str, + max_aggregation_interval: str, + log_destination_type: str, + log_format: str, + deliver_logs_status: str = "SUCCESS", + deliver_logs_error_message: Optional[str] = None, ): self.ec2_backend = ec2_backend self.id = flow_log_id @@ -48,18 +48,23 @@ class FlowLogs(TaggedEC2Resource, CloudFormationModel): self.created_at = utc_date_and_time() @staticmethod - def cloudformation_name_type(): - return None + def cloudformation_name_type() -> str: + return "" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-flowlog.html return "AWS::EC2::FlowLog" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "FlowLogs": from ..models import ec2_backends properties = cloudformation_json["Properties"] @@ -94,10 +99,12 @@ class FlowLogs(TaggedEC2Resource, CloudFormationModel): return flow_log[0] @property - def physical_resource_id(self): + def physical_resource_id(self) -> str: return self.id - def get_filter_value(self, filter_name): + def get_filter_value( + self, filter_name: str, method_name: Optional[str] = None + ) -> Any: """ API Version 2016-11-15 defines the following filters for DescribeFlowLogs: @@ -129,17 +136,17 @@ class FlowLogs(TaggedEC2Resource, CloudFormationModel): class FlowLogsBackend: - def __init__(self): - self.flow_logs = defaultdict(dict) + def __init__(self) -> None: + self.flow_logs: Dict[str, FlowLogs] = {} def _validate_request( self, - log_group_name, - log_destination, - log_destination_type, - max_aggregation_interval, - deliver_logs_permission_arn, - ): + log_group_name: str, + log_destination: str, + log_destination_type: str, + max_aggregation_interval: str, + deliver_logs_permission_arn: str, + ) -> None: if log_group_name is None and log_destination is None: raise InvalidDependantParameterError( "LogDestination", "LogGroupName", "not provided" @@ -163,16 +170,16 @@ class FlowLogsBackend: def create_flow_logs( self, - resource_type, - resource_ids, - traffic_type, - deliver_logs_permission_arn, - log_destination_type, - log_destination, - log_group_name, - log_format, - max_aggregation_interval, - ): + resource_type: str, + resource_ids: List[str], + traffic_type: str, + deliver_logs_permission_arn: str, + log_destination_type: str, + log_destination: str, + log_group_name: str, + log_format: str, + max_aggregation_interval: str, + ) -> Tuple[List[FlowLogs], List[Any]]: # Guess it's best to put it here due to possible # lack of them in the CloudFormation template max_aggregation_interval = ( @@ -205,13 +212,13 @@ class FlowLogsBackend: flow_log_id = random_flow_log_id() if resource_type == "VPC": # Validate VPCs exist - self.get_vpc(resource_id) + self.get_vpc(resource_id) # type: ignore[attr-defined] elif resource_type == "Subnet": # Validate Subnets exist - self.get_subnet(resource_id) + self.get_subnet(resource_id) # type: ignore[attr-defined] elif resource_type == "NetworkInterface": # Validate NetworkInterfaces exist - self.get_network_interface(resource_id) + self.get_network_interface(resource_id) # type: ignore[attr-defined] if log_destination_type == "s3": from moto.s3.models import s3_backends @@ -219,7 +226,7 @@ class FlowLogsBackend: arn = log_destination.split(":", 5)[5] try: - s3_backends[self.account_id]["global"].get_bucket(arn) + s3_backends[self.account_id]["global"].get_bucket(arn) # type: ignore[attr-defined] except MissingBucket: # Instead of creating FlowLog report # the unsuccessful status for the @@ -238,9 +245,8 @@ class FlowLogsBackend: try: # Need something easy to check the group exists. # The list_tags_log_group seems to do the trick. - logs_backends[self.account_id][ - self.region_name - ].list_tags_log_group(log_group_name) + logs = logs_backends[self.account_id][self.region_name] # type: ignore[attr-defined] + logs.list_tags_log_group(log_group_name) except ResourceNotFoundException: deliver_logs_status = "FAILED" deliver_logs_error_message = "Access error" @@ -274,15 +280,17 @@ class FlowLogsBackend: return flow_logs_set, unsuccessful - def describe_flow_logs(self, flow_log_ids=None, filters=None): - matches = itertools.chain([i for i in self.flow_logs.values()]) + def describe_flow_logs( + self, flow_log_ids: Optional[List[str]] = None, filters: Any = None + ) -> List[FlowLogs]: + matches = list(itertools.chain([i for i in self.flow_logs.values()])) if flow_log_ids: matches = [flow_log for flow_log in matches if flow_log.id in flow_log_ids] if filters: matches = generic_filter(filters, matches) return matches - def delete_flow_logs(self, flow_log_ids): + def delete_flow_logs(self, flow_log_ids: List[str]) -> None: non_existing = [] for flow_log in flow_log_ids: if flow_log in self.flow_logs: @@ -294,4 +302,3 @@ class FlowLogsBackend: raise InvalidFlowLogIdError( len(flow_log_ids), " ".join(x for x in flow_log_ids) ) - return True diff --git a/moto/ec2/models/hosts.py b/moto/ec2/models/hosts.py index 32aacfeb9..d44d39199 100644 --- a/moto/ec2/models/hosts.py +++ b/moto/ec2/models/hosts.py @@ -1,6 +1,6 @@ from .core import TaggedEC2Resource from ..utils import generic_filter, random_dedicated_host_id -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional class Host(TaggedEC2Resource): @@ -17,29 +17,31 @@ class Host(TaggedEC2Resource): self.state = "available" self.host_recovery = host_recovery or "off" self.zone = zone - self.instance_type = instance_type - self.instance_family = instance_family + self.instance_type: Optional[str] = instance_type + self.instance_family: Optional[str] = instance_family self.auto_placement = auto_placement or "on" self.ec2_backend = backend def release(self) -> None: self.state = "released" - def get_filter_value(self, key): - if key == "availability-zone": + def get_filter_value( + self, filter_name: str, method_name: Optional[str] = None + ) -> Any: + if filter_name == "availability-zone": return self.zone - if key == "state": + if filter_name == "state": return self.state - if key == "tag-key": + if filter_name == "tag-key": return [t["key"] for t in self.get_tags()] - if key == "instance-type": + if filter_name == "instance-type": return self.instance_type return None class HostsBackend: - def __init__(self): - self.hosts = {} + def __init__(self) -> None: + self.hosts: Dict[str, Host] = {} def allocate_hosts( self, @@ -74,7 +76,7 @@ class HostsBackend: """ Pagination is not yet implemented """ - results = self.hosts.values() + results = list(self.hosts.values()) if host_ids: results = [r for r in results if r.id in host_ids] if filters: @@ -82,8 +84,13 @@ class HostsBackend: return results def modify_hosts( - self, host_ids, auto_placement, host_recovery, instance_type, instance_family - ): + self, + host_ids: List[str], + auto_placement: str, + host_recovery: str, + instance_type: str, + instance_family: str, + ) -> None: for _id in host_ids: host = self.hosts[_id] if auto_placement is not None: diff --git a/moto/ec2/models/spot_requests.py b/moto/ec2/models/spot_requests.py index 16ebe5eb0..3552e97bc 100644 --- a/moto/ec2/models/spot_requests.py +++ b/moto/ec2/models/spot_requests.py @@ -1,4 +1,5 @@ from collections import defaultdict +from typing import Any, Dict, List from moto.core.common_models import CloudFormationModel from moto.packages.boto.ec2.launchspecification import LaunchSpecification @@ -193,18 +194,18 @@ class SpotRequestBackend: class SpotFleetLaunchSpec(object): def __init__( self, - ebs_optimized, - group_set, - iam_instance_profile, - image_id, - instance_type, - key_name, - monitoring, - spot_price, - subnet_id, - tag_specifications, - user_data, - weighted_capacity, + ebs_optimized: Any, + group_set: List[str], + iam_instance_profile: Any, + image_id: str, + instance_type: str, + key_name: Any, + monitoring: Any, + spot_price: Any, + subnet_id: Any, + tag_specifications: Dict[str, Dict[str, str]], + user_data: Any, + weighted_capacity: float, ): self.ebs_optimized = ebs_optimized self.group_set = group_set diff --git a/moto/ec2/utils.py b/moto/ec2/utils.py index 6d26530a7..5e90c214b 100644 --- a/moto/ec2/utils.py +++ b/moto/ec2/utils.py @@ -7,7 +7,7 @@ from datetime import datetime from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa -from typing import Any, Dict, List +from typing import Any, Dict, List, TypeVar from moto.iam import iam_backends from moto.moto_api._internal import mock_random as random @@ -92,11 +92,11 @@ def random_security_group_rule_id(): return random_id(prefix=EC2_RESOURCE_TO_PREFIX["security-group-rule"], size=17) -def random_fleet_id(): +def random_fleet_id() -> str: return f"fleet-{random_resource_id(size=8)}-{random_resource_id(size=4)}-{random_resource_id(size=4)}-{random_resource_id(size=4)}-{random_resource_id(size=12)}" -def random_flow_log_id(): +def random_flow_log_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["flow-logs"]) @@ -240,7 +240,7 @@ def random_public_ip() -> str: return f"54.214.{random.choice(range(255))}.{random.choice(range(255))}" -def random_dedicated_host_id(): +def random_dedicated_host_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["dedicated_host"], size=17) @@ -519,7 +519,12 @@ def is_filter_matching(obj, _filter, filter_value): return value in filter_value -def generic_filter(filters: Dict[str, Any], objects: List[Any]) -> List[Any]: +GENERIC_FILTER_TYPE = TypeVar("GENERIC_FILTER_TYPE") + + +def generic_filter( + filters: Dict[str, Any], objects: List[GENERIC_FILTER_TYPE] +) -> List[GENERIC_FILTER_TYPE]: if filters: for (_filter, _filter_value) in filters.items(): objects = [ @@ -781,7 +786,9 @@ def gen_moto_amis(described_images, drop_images_missing_keys=True): return result -def convert_tag_spec(tag_spec_set, tag_key="Tag"): +def convert_tag_spec( + tag_spec_set: List[Dict[str, Any]], tag_key: str = "Tag" +) -> Dict[str, Dict[str, str]]: # IN: [{"ResourceType": _type, "Tag": [{"Key": k, "Value": v}, ..]}] # (or) [{"ResourceType": _type, "Tags": [{"Key": k, "Value": v}, ..]}] <-- special cfn case # OUT: {_type: {k: v, ..}} diff --git a/setup.cfg b/setup.cfg index 1bdd2075c..345ae35ba 100644 --- a/setup.cfg +++ b/setup.cfg @@ -229,7 +229,7 @@ disable = W,C,R,E enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import [mypy] -files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/ec2/models/d*,moto/ec2/models/e*,moto/moto_api +files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/ec2/models/d*,moto/ec2/models/e*,moto/ec2/models/f*,moto/ec2/models/h*,moto/moto_api show_column_numbers=True show_error_codes = True disable_error_code=abstract