Techdebt: MyPy EC2 (f-h-models) (#5900)

This commit is contained in:
Bert Blommers 2023-02-04 11:31:16 -01:00 committed by GitHub
parent b41533d42d
commit 8915ef60f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 168 additions and 141 deletions

View File

@ -110,7 +110,7 @@ class InvalidSubnetIdError(EC2ClientError):
class InvalidFlowLogIdError(EC2ClientError): class InvalidFlowLogIdError(EC2ClientError):
def __init__(self, count, flow_log_ids): def __init__(self, count: int, flow_log_ids: str):
super().__init__( super().__init__(
"InvalidFlowLogId.NotFound", "InvalidFlowLogId.NotFound",
f"These flow log ids in the input list are not found: [TotalCount: {count}] {flow_log_ids}", f"These flow log ids in the input list are not found: [TotalCount: {count}] {flow_log_ids}",
@ -118,7 +118,7 @@ class InvalidFlowLogIdError(EC2ClientError):
class FlowLogAlreadyExists(EC2ClientError): class FlowLogAlreadyExists(EC2ClientError):
def __init__(self): def __init__(self) -> None:
super().__init__( super().__init__(
"FlowLogAlreadyExists", "FlowLogAlreadyExists",
"Error. There is an existing Flow Log with the same configuration and log destination.", "Error. There is an existing Flow Log with the same configuration and log destination.",
@ -405,7 +405,7 @@ class InvalidNextToken(EC2ClientError):
class InvalidDependantParameterError(EC2ClientError): class InvalidDependantParameterError(EC2ClientError):
def __init__(self, dependant_parameter, parameter, parameter_value): def __init__(self, dependant_parameter: str, parameter: str, parameter_value: str):
super().__init__( super().__init__(
"InvalidParameter", "InvalidParameter",
f"{dependant_parameter} can't be empty if {parameter} is {parameter_value}.", f"{dependant_parameter} can't be empty if {parameter} is {parameter_value}.",
@ -413,7 +413,7 @@ class InvalidDependantParameterError(EC2ClientError):
class InvalidDependantParameterTypeError(EC2ClientError): class InvalidDependantParameterTypeError(EC2ClientError):
def __init__(self, dependant_parameter, parameter_value, parameter): def __init__(self, dependant_parameter: str, parameter_value: str, parameter: str):
super().__init__( super().__init__(
"InvalidParameter", "InvalidParameter",
f"{dependant_parameter} type must be {parameter_value} if {parameter} is provided.", f"{dependant_parameter} type must be {parameter_value} if {parameter} is provided.",
@ -421,7 +421,7 @@ class InvalidDependantParameterTypeError(EC2ClientError):
class InvalidAggregationIntervalParameterError(EC2ClientError): class InvalidAggregationIntervalParameterError(EC2ClientError):
def __init__(self, parameter): def __init__(self, parameter: str):
super().__init__("InvalidParameter", f"Invalid {parameter}") super().__init__("InvalidParameter", f"Invalid {parameter}")

View File

@ -1,7 +1,8 @@
import datetime import datetime
from collections import defaultdict from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple
from moto.ec2.models.spot_requests import SpotFleetLaunchSpec from moto.ec2.models.spot_requests import SpotFleetLaunchSpec, SpotInstanceRequest
from .core import TaggedEC2Resource from .core import TaggedEC2Resource
from ..utils import ( from ..utils import (
random_fleet_id, random_fleet_id,
@ -12,19 +13,19 @@ from ..utils import (
class Fleet(TaggedEC2Resource): class Fleet(TaggedEC2Resource):
def __init__( def __init__(
self, self,
ec2_backend, ec2_backend: Any,
fleet_id, fleet_id: str,
on_demand_options, on_demand_options: Dict[str, Any],
spot_options, spot_options: Dict[str, Any],
target_capacity_specification, target_capacity_specification: Dict[str, Any],
launch_template_configs, launch_template_configs: List[Dict[str, Any]],
excess_capacity_termination_policy, excess_capacity_termination_policy: str,
replace_unhealthy_instances, replace_unhealthy_instances: bool,
terminate_instances_with_expiration, terminate_instances_with_expiration: bool,
fleet_type, fleet_type: str,
valid_from, valid_from: str,
valid_until, valid_until: str,
tag_specifications, tag_specifications: List[Dict[str, Any]],
): ):
self.ec2_backend = ec2_backend self.ec2_backend = ec2_backend
@ -50,18 +51,18 @@ class Fleet(TaggedEC2Resource):
self.fulfilled_on_demand_capacity = 0.0 self.fulfilled_on_demand_capacity = 0.0
self.fulfilled_spot_capacity = 0.0 self.fulfilled_spot_capacity = 0.0
self.launch_specs = [] self.launch_specs: List[SpotFleetLaunchSpec] = []
launch_specs_from_config = [] launch_specs_from_config: List[Dict[str, Any]] = []
for config in launch_template_configs or []: for config in launch_template_configs or []:
spec = config["LaunchTemplateSpecification"] launch_spec = config["LaunchTemplateSpecification"]
if "LaunchTemplateId" in spec: if "LaunchTemplateId" in launch_spec:
launch_template = self.ec2_backend.get_launch_template( launch_template = self.ec2_backend.get_launch_template(
template_id=spec["LaunchTemplateId"] template_id=launch_spec["LaunchTemplateId"]
) )
elif "LaunchTemplateName" in spec: elif "LaunchTemplateName" in launch_spec:
launch_template = self.ec2_backend.get_launch_template_by_name( launch_template = self.ec2_backend.get_launch_template_by_name(
name=spec["LaunchTemplateName"] name=launch_spec["LaunchTemplateName"]
) )
else: else:
continue continue
@ -92,14 +93,14 @@ class Fleet(TaggedEC2Resource):
) )
) )
self.spot_requests = [] self.spot_requests: List[SpotInstanceRequest] = []
self.on_demand_instances = [] self.on_demand_instances: List[Dict[str, Any]] = []
default_capacity = ( default_capacity = (
target_capacity_specification.get("DefaultTargetCapacityType") target_capacity_specification.get("DefaultTargetCapacityType")
or "on-demand" or "on-demand"
) )
self.target_capacity = int( self.target_capacity = int(
target_capacity_specification.get("TotalTargetCapacity") target_capacity_specification.get("TotalTargetCapacity") # type: ignore[arg-type]
) )
self.spot_target_capacity = int( self.spot_target_capacity = int(
target_capacity_specification.get("SpotTargetCapacity", 0) target_capacity_specification.get("SpotTargetCapacity", 0)
@ -122,13 +123,13 @@ class Fleet(TaggedEC2Resource):
self.create_spot_requests(remaining_capacity) self.create_spot_requests(remaining_capacity)
@property @property
def physical_resource_id(self): def physical_resource_id(self) -> str:
return self.id return self.id
def create_spot_requests(self, weight_to_add): def create_spot_requests(self, weight_to_add: float) -> List[SpotInstanceRequest]:
weight_map, added_weight = self.get_launch_spec_counts(weight_to_add) weight_map, added_weight = self.get_launch_spec_counts(weight_to_add)
for launch_spec, count in weight_map.items(): for launch_spec, count in weight_map.items():
requests = self.ec2_backend.request_spot_instances( requests = self.ec2_backend.request_spot_instances( # type: ignore[attr-defined]
price=launch_spec.spot_price, price=launch_spec.spot_price,
image_id=launch_spec.image_id, image_id=launch_spec.image_id,
count=count, count=count,
@ -153,10 +154,10 @@ class Fleet(TaggedEC2Resource):
self.fulfilled_capacity += added_weight self.fulfilled_capacity += added_weight
return self.spot_requests return self.spot_requests
def create_on_demand_requests(self, weight_to_add): def create_on_demand_requests(self, weight_to_add: float) -> List[Dict[str, Any]]:
weight_map, added_weight = self.get_launch_spec_counts(weight_to_add) weight_map, added_weight = self.get_launch_spec_counts(weight_to_add)
for launch_spec, count in weight_map.items(): for launch_spec, count in weight_map.items():
reservation = self.ec2_backend.add_instances( reservation = self.ec2_backend.add_instances( # type: ignore[attr-defined]
image_id=launch_spec.image_id, image_id=launch_spec.image_id,
count=count, count=count,
instance_type=launch_spec.instance_type, instance_type=launch_spec.instance_type,
@ -184,10 +185,12 @@ class Fleet(TaggedEC2Resource):
self.fulfilled_capacity += added_weight self.fulfilled_capacity += added_weight
return self.on_demand_instances return self.on_demand_instances
def get_launch_spec_counts(self, weight_to_add): def get_launch_spec_counts(
weight_map = defaultdict(int) self, weight_to_add: float
) -> Tuple[Dict[SpotFleetLaunchSpec, int], float]:
weight_map: Dict[SpotFleetLaunchSpec, int] = defaultdict(int)
weight_so_far = 0 weight_so_far = 0.0
if ( if (
self.spot_options self.spot_options
and self.spot_options["AllocationStrategy"] == "diversified" and self.spot_options["AllocationStrategy"] == "diversified"
@ -217,15 +220,15 @@ class Fleet(TaggedEC2Resource):
return weight_map, weight_so_far return weight_map, weight_so_far
def terminate_instances(self): def terminate_instances(self) -> None:
instance_ids = [] instance_ids = []
new_fulfilled_capacity = self.fulfilled_capacity new_fulfilled_capacity = self.fulfilled_capacity
for req in self.spot_requests + self.on_demand_instances: for req in self.spot_requests + self.on_demand_instances:
instance = None instance = None
try: try:
instance = req.instance instance = req.instance # type: ignore
except AttributeError: except AttributeError:
instance = req["instance"] instance = req["instance"] # type: ignore[index]
if instance.state == "terminated": if instance.state == "terminated":
continue continue
@ -249,23 +252,23 @@ class Fleet(TaggedEC2Resource):
class FleetsBackend: class FleetsBackend:
def __init__(self): def __init__(self) -> None:
self.fleets = {} self.fleets: Dict[str, Fleet] = {}
def create_fleet( def create_fleet(
self, self,
on_demand_options, on_demand_options: Dict[str, Any],
spot_options, spot_options: Dict[str, Any],
target_capacity_specification, target_capacity_specification: Dict[str, Any],
launch_template_configs, launch_template_configs: List[Dict[str, Any]],
excess_capacity_termination_policy, excess_capacity_termination_policy: str,
replace_unhealthy_instances, replace_unhealthy_instances: bool,
terminate_instances_with_expiration, terminate_instances_with_expiration: bool,
fleet_type, fleet_type: str,
valid_from, valid_from: str,
valid_until, valid_until: str,
tag_specifications, tag_specifications: List[Dict[str, Any]],
): ) -> Fleet:
fleet_id = random_fleet_id() fleet_id = random_fleet_id()
fleet = Fleet( fleet = Fleet(
@ -286,24 +289,26 @@ class FleetsBackend:
self.fleets[fleet_id] = fleet self.fleets[fleet_id] = fleet
return fleet return fleet
def get_fleet(self, fleet_id): def get_fleet(self, fleet_id: str) -> Optional[Fleet]:
return self.fleets.get(fleet_id) return self.fleets.get(fleet_id)
def describe_fleet_instances(self, fleet_id): def describe_fleet_instances(self, fleet_id: str) -> List[Any]:
fleet = self.get_fleet(fleet_id) fleet = self.get_fleet(fleet_id)
if not fleet: if not fleet:
return [] return []
return fleet.spot_requests + fleet.on_demand_instances return fleet.spot_requests + fleet.on_demand_instances
def describe_fleets(self, fleet_ids): def describe_fleets(self, fleet_ids: Optional[List[str]]) -> List[Fleet]:
fleets = self.fleets.values() fleets = list(self.fleets.values())
if fleet_ids: if fleet_ids:
fleets = [fleet for fleet in fleets if fleet.id in fleet_ids] fleets = [fleet for fleet in fleets if fleet.id in fleet_ids]
return fleets return fleets
def delete_fleets(self, fleet_ids, terminate_instances): def delete_fleets(
self, fleet_ids: List[str], terminate_instances: bool
) -> List[Fleet]:
fleets = [] fleets = []
for fleet_id in fleet_ids: for fleet_id in fleet_ids:
fleet = self.fleets[fleet_id] fleet = self.fleets[fleet_id]

View File

@ -1,5 +1,5 @@
import itertools import itertools
from collections import defaultdict from typing import Any, Dict, List, Optional, Tuple
from moto.core import CloudFormationModel from moto.core import CloudFormationModel
from ..exceptions import ( from ..exceptions import (
FlowLogAlreadyExists, FlowLogAlreadyExists,
@ -19,18 +19,18 @@ from ..utils import (
class FlowLogs(TaggedEC2Resource, CloudFormationModel): class FlowLogs(TaggedEC2Resource, CloudFormationModel):
def __init__( def __init__(
self, self,
ec2_backend, ec2_backend: Any,
flow_log_id, flow_log_id: str,
resource_id, resource_id: str,
traffic_type, traffic_type: str,
log_destination, log_destination: str,
log_group_name, log_group_name: str,
deliver_logs_permission_arn, deliver_logs_permission_arn: str,
max_aggregation_interval, max_aggregation_interval: str,
log_destination_type, log_destination_type: str,
log_format, log_format: str,
deliver_logs_status="SUCCESS", deliver_logs_status: str = "SUCCESS",
deliver_logs_error_message=None, deliver_logs_error_message: Optional[str] = None,
): ):
self.ec2_backend = ec2_backend self.ec2_backend = ec2_backend
self.id = flow_log_id self.id = flow_log_id
@ -48,18 +48,23 @@ class FlowLogs(TaggedEC2Resource, CloudFormationModel):
self.created_at = utc_date_and_time() self.created_at = utc_date_and_time()
@staticmethod @staticmethod
def cloudformation_name_type(): def cloudformation_name_type() -> str:
return None return ""
@staticmethod @staticmethod
def cloudformation_type(): def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-flowlog.html # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-flowlog.html
return "AWS::EC2::FlowLog" return "AWS::EC2::FlowLog"
@classmethod @classmethod
def create_from_cloudformation_json( def create_from_cloudformation_json( # type: ignore[misc]
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs cls,
): resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "FlowLogs":
from ..models import ec2_backends from ..models import ec2_backends
properties = cloudformation_json["Properties"] properties = cloudformation_json["Properties"]
@ -94,10 +99,12 @@ class FlowLogs(TaggedEC2Resource, CloudFormationModel):
return flow_log[0] return flow_log[0]
@property @property
def physical_resource_id(self): def physical_resource_id(self) -> str:
return self.id return self.id
def get_filter_value(self, filter_name): def get_filter_value(
self, filter_name: str, method_name: Optional[str] = None
) -> Any:
""" """
API Version 2016-11-15 defines the following filters for DescribeFlowLogs: API Version 2016-11-15 defines the following filters for DescribeFlowLogs:
@ -129,17 +136,17 @@ class FlowLogs(TaggedEC2Resource, CloudFormationModel):
class FlowLogsBackend: class FlowLogsBackend:
def __init__(self): def __init__(self) -> None:
self.flow_logs = defaultdict(dict) self.flow_logs: Dict[str, FlowLogs] = {}
def _validate_request( def _validate_request(
self, self,
log_group_name, log_group_name: str,
log_destination, log_destination: str,
log_destination_type, log_destination_type: str,
max_aggregation_interval, max_aggregation_interval: str,
deliver_logs_permission_arn, deliver_logs_permission_arn: str,
): ) -> None:
if log_group_name is None and log_destination is None: if log_group_name is None and log_destination is None:
raise InvalidDependantParameterError( raise InvalidDependantParameterError(
"LogDestination", "LogGroupName", "not provided" "LogDestination", "LogGroupName", "not provided"
@ -163,16 +170,16 @@ class FlowLogsBackend:
def create_flow_logs( def create_flow_logs(
self, self,
resource_type, resource_type: str,
resource_ids, resource_ids: List[str],
traffic_type, traffic_type: str,
deliver_logs_permission_arn, deliver_logs_permission_arn: str,
log_destination_type, log_destination_type: str,
log_destination, log_destination: str,
log_group_name, log_group_name: str,
log_format, log_format: str,
max_aggregation_interval, max_aggregation_interval: str,
): ) -> Tuple[List[FlowLogs], List[Any]]:
# Guess it's best to put it here due to possible # Guess it's best to put it here due to possible
# lack of them in the CloudFormation template # lack of them in the CloudFormation template
max_aggregation_interval = ( max_aggregation_interval = (
@ -205,13 +212,13 @@ class FlowLogsBackend:
flow_log_id = random_flow_log_id() flow_log_id = random_flow_log_id()
if resource_type == "VPC": if resource_type == "VPC":
# Validate VPCs exist # Validate VPCs exist
self.get_vpc(resource_id) self.get_vpc(resource_id) # type: ignore[attr-defined]
elif resource_type == "Subnet": elif resource_type == "Subnet":
# Validate Subnets exist # Validate Subnets exist
self.get_subnet(resource_id) self.get_subnet(resource_id) # type: ignore[attr-defined]
elif resource_type == "NetworkInterface": elif resource_type == "NetworkInterface":
# Validate NetworkInterfaces exist # Validate NetworkInterfaces exist
self.get_network_interface(resource_id) self.get_network_interface(resource_id) # type: ignore[attr-defined]
if log_destination_type == "s3": if log_destination_type == "s3":
from moto.s3.models import s3_backends from moto.s3.models import s3_backends
@ -219,7 +226,7 @@ class FlowLogsBackend:
arn = log_destination.split(":", 5)[5] arn = log_destination.split(":", 5)[5]
try: try:
s3_backends[self.account_id]["global"].get_bucket(arn) s3_backends[self.account_id]["global"].get_bucket(arn) # type: ignore[attr-defined]
except MissingBucket: except MissingBucket:
# Instead of creating FlowLog report # Instead of creating FlowLog report
# the unsuccessful status for the # the unsuccessful status for the
@ -238,9 +245,8 @@ class FlowLogsBackend:
try: try:
# Need something easy to check the group exists. # Need something easy to check the group exists.
# The list_tags_log_group seems to do the trick. # The list_tags_log_group seems to do the trick.
logs_backends[self.account_id][ logs = logs_backends[self.account_id][self.region_name] # type: ignore[attr-defined]
self.region_name logs.list_tags_log_group(log_group_name)
].list_tags_log_group(log_group_name)
except ResourceNotFoundException: except ResourceNotFoundException:
deliver_logs_status = "FAILED" deliver_logs_status = "FAILED"
deliver_logs_error_message = "Access error" deliver_logs_error_message = "Access error"
@ -274,15 +280,17 @@ class FlowLogsBackend:
return flow_logs_set, unsuccessful return flow_logs_set, unsuccessful
def describe_flow_logs(self, flow_log_ids=None, filters=None): def describe_flow_logs(
matches = itertools.chain([i for i in self.flow_logs.values()]) self, flow_log_ids: Optional[List[str]] = None, filters: Any = None
) -> List[FlowLogs]:
matches = list(itertools.chain([i for i in self.flow_logs.values()]))
if flow_log_ids: if flow_log_ids:
matches = [flow_log for flow_log in matches if flow_log.id in flow_log_ids] matches = [flow_log for flow_log in matches if flow_log.id in flow_log_ids]
if filters: if filters:
matches = generic_filter(filters, matches) matches = generic_filter(filters, matches)
return matches return matches
def delete_flow_logs(self, flow_log_ids): def delete_flow_logs(self, flow_log_ids: List[str]) -> None:
non_existing = [] non_existing = []
for flow_log in flow_log_ids: for flow_log in flow_log_ids:
if flow_log in self.flow_logs: if flow_log in self.flow_logs:
@ -294,4 +302,3 @@ class FlowLogsBackend:
raise InvalidFlowLogIdError( raise InvalidFlowLogIdError(
len(flow_log_ids), " ".join(x for x in flow_log_ids) len(flow_log_ids), " ".join(x for x in flow_log_ids)
) )
return True

View File

@ -1,6 +1,6 @@
from .core import TaggedEC2Resource from .core import TaggedEC2Resource
from ..utils import generic_filter, random_dedicated_host_id from ..utils import generic_filter, random_dedicated_host_id
from typing import Any, Dict, List from typing import Any, Dict, List, Optional
class Host(TaggedEC2Resource): class Host(TaggedEC2Resource):
@ -17,29 +17,31 @@ class Host(TaggedEC2Resource):
self.state = "available" self.state = "available"
self.host_recovery = host_recovery or "off" self.host_recovery = host_recovery or "off"
self.zone = zone self.zone = zone
self.instance_type = instance_type self.instance_type: Optional[str] = instance_type
self.instance_family = instance_family self.instance_family: Optional[str] = instance_family
self.auto_placement = auto_placement or "on" self.auto_placement = auto_placement or "on"
self.ec2_backend = backend self.ec2_backend = backend
def release(self) -> None: def release(self) -> None:
self.state = "released" self.state = "released"
def get_filter_value(self, key): def get_filter_value(
if key == "availability-zone": self, filter_name: str, method_name: Optional[str] = None
) -> Any:
if filter_name == "availability-zone":
return self.zone return self.zone
if key == "state": if filter_name == "state":
return self.state return self.state
if key == "tag-key": if filter_name == "tag-key":
return [t["key"] for t in self.get_tags()] return [t["key"] for t in self.get_tags()]
if key == "instance-type": if filter_name == "instance-type":
return self.instance_type return self.instance_type
return None return None
class HostsBackend: class HostsBackend:
def __init__(self): def __init__(self) -> None:
self.hosts = {} self.hosts: Dict[str, Host] = {}
def allocate_hosts( def allocate_hosts(
self, self,
@ -74,7 +76,7 @@ class HostsBackend:
""" """
Pagination is not yet implemented Pagination is not yet implemented
""" """
results = self.hosts.values() results = list(self.hosts.values())
if host_ids: if host_ids:
results = [r for r in results if r.id in host_ids] results = [r for r in results if r.id in host_ids]
if filters: if filters:
@ -82,8 +84,13 @@ class HostsBackend:
return results return results
def modify_hosts( def modify_hosts(
self, host_ids, auto_placement, host_recovery, instance_type, instance_family self,
): host_ids: List[str],
auto_placement: str,
host_recovery: str,
instance_type: str,
instance_family: str,
) -> None:
for _id in host_ids: for _id in host_ids:
host = self.hosts[_id] host = self.hosts[_id]
if auto_placement is not None: if auto_placement is not None:

View File

@ -1,4 +1,5 @@
from collections import defaultdict from collections import defaultdict
from typing import Any, Dict, List
from moto.core.common_models import CloudFormationModel from moto.core.common_models import CloudFormationModel
from moto.packages.boto.ec2.launchspecification import LaunchSpecification from moto.packages.boto.ec2.launchspecification import LaunchSpecification
@ -193,18 +194,18 @@ class SpotRequestBackend:
class SpotFleetLaunchSpec(object): class SpotFleetLaunchSpec(object):
def __init__( def __init__(
self, self,
ebs_optimized, ebs_optimized: Any,
group_set, group_set: List[str],
iam_instance_profile, iam_instance_profile: Any,
image_id, image_id: str,
instance_type, instance_type: str,
key_name, key_name: Any,
monitoring, monitoring: Any,
spot_price, spot_price: Any,
subnet_id, subnet_id: Any,
tag_specifications, tag_specifications: Dict[str, Dict[str, str]],
user_data, user_data: Any,
weighted_capacity, weighted_capacity: float,
): ):
self.ebs_optimized = ebs_optimized self.ebs_optimized = ebs_optimized
self.group_set = group_set self.group_set = group_set

View File

@ -7,7 +7,7 @@ from datetime import datetime
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import rsa
from typing import Any, Dict, List from typing import Any, Dict, List, TypeVar
from moto.iam import iam_backends from moto.iam import iam_backends
from moto.moto_api._internal import mock_random as random from moto.moto_api._internal import mock_random as random
@ -92,11 +92,11 @@ def random_security_group_rule_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["security-group-rule"], size=17) return random_id(prefix=EC2_RESOURCE_TO_PREFIX["security-group-rule"], size=17)
def random_fleet_id(): def random_fleet_id() -> str:
return f"fleet-{random_resource_id(size=8)}-{random_resource_id(size=4)}-{random_resource_id(size=4)}-{random_resource_id(size=4)}-{random_resource_id(size=12)}" return f"fleet-{random_resource_id(size=8)}-{random_resource_id(size=4)}-{random_resource_id(size=4)}-{random_resource_id(size=4)}-{random_resource_id(size=12)}"
def random_flow_log_id(): def random_flow_log_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["flow-logs"]) return random_id(prefix=EC2_RESOURCE_TO_PREFIX["flow-logs"])
@ -240,7 +240,7 @@ def random_public_ip() -> str:
return f"54.214.{random.choice(range(255))}.{random.choice(range(255))}" return f"54.214.{random.choice(range(255))}.{random.choice(range(255))}"
def random_dedicated_host_id(): def random_dedicated_host_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["dedicated_host"], size=17) return random_id(prefix=EC2_RESOURCE_TO_PREFIX["dedicated_host"], size=17)
@ -519,7 +519,12 @@ def is_filter_matching(obj, _filter, filter_value):
return value in filter_value return value in filter_value
def generic_filter(filters: Dict[str, Any], objects: List[Any]) -> List[Any]: GENERIC_FILTER_TYPE = TypeVar("GENERIC_FILTER_TYPE")
def generic_filter(
filters: Dict[str, Any], objects: List[GENERIC_FILTER_TYPE]
) -> List[GENERIC_FILTER_TYPE]:
if filters: if filters:
for (_filter, _filter_value) in filters.items(): for (_filter, _filter_value) in filters.items():
objects = [ objects = [
@ -781,7 +786,9 @@ def gen_moto_amis(described_images, drop_images_missing_keys=True):
return result return result
def convert_tag_spec(tag_spec_set, tag_key="Tag"): def convert_tag_spec(
tag_spec_set: List[Dict[str, Any]], tag_key: str = "Tag"
) -> Dict[str, Dict[str, str]]:
# IN: [{"ResourceType": _type, "Tag": [{"Key": k, "Value": v}, ..]}] # IN: [{"ResourceType": _type, "Tag": [{"Key": k, "Value": v}, ..]}]
# (or) [{"ResourceType": _type, "Tags": [{"Key": k, "Value": v}, ..]}] <-- special cfn case # (or) [{"ResourceType": _type, "Tags": [{"Key": k, "Value": v}, ..]}] <-- special cfn case
# OUT: {_type: {k: v, ..}} # OUT: {_type: {k: v, ..}}

View File

@ -229,7 +229,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy] [mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/ec2/models/d*,moto/ec2/models/e*,moto/moto_api files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/ec2/models/d*,moto/ec2/models/e*,moto/ec2/models/f*,moto/ec2/models/h*,moto/moto_api
show_column_numbers=True show_column_numbers=True
show_error_codes = True show_error_codes = True
disable_error_code=abstract disable_error_code=abstract