Techdebt: MyPy Autoscaling (#5581)

This commit is contained in:
Bert Blommers 2022-10-19 20:56:24 +00:00 committed by GitHub
parent 2c0adaa932
commit 62f93c7ed0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 544 additions and 426 deletions

View File

@ -8,7 +8,7 @@ class AutoscalingClientError(RESTError):
class ResourceContentionError(RESTError):
code = 500
def __init__(self):
def __init__(self) -> None:
super().__init__(
"ResourceContentionError",
"You already have a pending update to an Auto Scaling resource (for example, a group, instance, or load balancer).",
@ -16,12 +16,12 @@ class ResourceContentionError(RESTError):
class InvalidInstanceError(AutoscalingClientError):
def __init__(self, instance_id):
def __init__(self, instance_id: str):
super().__init__(
"ValidationError", "Instance [{0}] is invalid.".format(instance_id)
)
class ValidationError(AutoscalingClientError):
def __init__(self, message):
def __init__(self, message: str):
super().__init__("ValidationError", message)

File diff suppressed because it is too large Load Diff

View File

@ -3,18 +3,18 @@ import datetime
from moto.core.responses import BaseResponse
from moto.core.utils import iso_8601_datetime_with_milliseconds
from moto.utilities.aws_headers import amz_crc32, amzn_request_id
from .models import autoscaling_backends
from .models import autoscaling_backends, AutoScalingBackend
class AutoScalingResponse(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="autoscaling")
@property
def autoscaling_backend(self):
def autoscaling_backend(self) -> AutoScalingBackend:
return autoscaling_backends[self.current_account][self.region]
def create_launch_configuration(self):
def create_launch_configuration(self) -> str:
instance_monitoring_string = self._get_param("InstanceMonitoring.Enabled")
if instance_monitoring_string == "true":
instance_monitoring = True
@ -44,7 +44,7 @@ class AutoScalingResponse(BaseResponse):
template = self.response_template(CREATE_LAUNCH_CONFIGURATION_TEMPLATE)
return template.render()
def describe_launch_configurations(self):
def describe_launch_configurations(self) -> str:
names = self._get_multi_param("LaunchConfigurationNames.member")
all_launch_configurations = (
self.autoscaling_backend.describe_launch_configurations(names)
@ -55,9 +55,8 @@ class AutoScalingResponse(BaseResponse):
start = all_names.index(marker) + 1
else:
start = 0
max_records = self._get_int_param(
"MaxRecords", 50
) # the default is 100, but using 50 to make testing easier
# the default is 100, but using 50 to make testing easier
max_records = self._get_int_param("MaxRecords") or 50
launch_configurations_resp = all_launch_configurations[
start : start + max_records
]
@ -70,13 +69,13 @@ class AutoScalingResponse(BaseResponse):
launch_configurations=launch_configurations_resp, next_token=next_token
)
def delete_launch_configuration(self):
launch_configurations_name = self.querystring.get("LaunchConfigurationName")[0]
def delete_launch_configuration(self) -> str:
launch_configurations_name = self.querystring.get("LaunchConfigurationName")[0] # type: ignore[index]
self.autoscaling_backend.delete_launch_configuration(launch_configurations_name)
template = self.response_template(DELETE_LAUNCH_CONFIGURATION_TEMPLATE)
return template.render()
def create_auto_scaling_group(self):
def create_auto_scaling_group(self) -> str:
params = self._get_params()
self.autoscaling_backend.create_auto_scaling_group(
name=self._get_param("AutoScalingGroupName"),
@ -97,7 +96,7 @@ class AutoScalingResponse(BaseResponse):
placement_group=self._get_param("PlacementGroup"),
termination_policies=self._get_multi_param("TerminationPolicies.member"),
tags=self._get_list_prefix("Tags.member"),
capacity_rebalance=self._get_bool_param("CapacityRebalance"),
capacity_rebalance=self._get_bool_param("CapacityRebalance", False),
new_instances_protected_from_scale_in=self._get_bool_param(
"NewInstancesProtectedFromScaleIn", False
),
@ -105,7 +104,7 @@ class AutoScalingResponse(BaseResponse):
template = self.response_template(CREATE_AUTOSCALING_GROUP_TEMPLATE)
return template.render()
def put_scheduled_update_group_action(self):
def put_scheduled_update_group_action(self) -> str:
self.autoscaling_backend.put_scheduled_update_group_action(
name=self._get_param("AutoScalingGroupName"),
desired_capacity=self._get_int_param("DesiredCapacity"),
@ -119,7 +118,7 @@ class AutoScalingResponse(BaseResponse):
template = self.response_template(PUT_SCHEDULED_UPDATE_GROUP_ACTION_TEMPLATE)
return template.render()
def describe_scheduled_actions(self):
def describe_scheduled_actions(self) -> str:
scheduled_actions = self.autoscaling_backend.describe_scheduled_actions(
autoscaling_group_name=self._get_param("AutoScalingGroupName"),
scheduled_action_names=self._get_multi_param("ScheduledActionNames.member"),
@ -127,7 +126,7 @@ class AutoScalingResponse(BaseResponse):
template = self.response_template(DESCRIBE_SCHEDULED_ACTIONS)
return template.render(scheduled_actions=scheduled_actions)
def delete_scheduled_action(self):
def delete_scheduled_action(self) -> str:
auto_scaling_group_name = self._get_param("AutoScalingGroupName")
scheduled_action_name = self._get_param("ScheduledActionName")
self.autoscaling_backend.delete_scheduled_action(
@ -137,13 +136,13 @@ class AutoScalingResponse(BaseResponse):
template = self.response_template(DELETE_SCHEDULED_ACTION_TEMPLATE)
return template.render()
def describe_scaling_activities(self):
def describe_scaling_activities(self) -> str:
template = self.response_template(DESCRIBE_SCALING_ACTIVITIES_TEMPLATE)
return template.render()
@amz_crc32
@amzn_request_id
def attach_instances(self):
def attach_instances(self) -> str:
group_name = self._get_param("AutoScalingGroupName")
instance_ids = self._get_multi_param("InstanceIds.member")
self.autoscaling_backend.attach_instances(group_name, instance_ids)
@ -152,7 +151,7 @@ class AutoScalingResponse(BaseResponse):
@amz_crc32
@amzn_request_id
def set_instance_health(self):
def set_instance_health(self) -> str:
instance_id = self._get_param("InstanceId")
health_status = self._get_param("HealthStatus")
if health_status not in ["Healthy", "Unhealthy"]:
@ -163,7 +162,7 @@ class AutoScalingResponse(BaseResponse):
@amz_crc32
@amzn_request_id
def detach_instances(self):
def detach_instances(self) -> str:
group_name = self._get_param("AutoScalingGroupName")
instance_ids = self._get_multi_param("InstanceIds.member")
should_decrement_string = self._get_param("ShouldDecrementDesiredCapacity")
@ -179,7 +178,7 @@ class AutoScalingResponse(BaseResponse):
@amz_crc32
@amzn_request_id
def attach_load_balancer_target_groups(self):
def attach_load_balancer_target_groups(self) -> str:
group_name = self._get_param("AutoScalingGroupName")
target_group_arns = self._get_multi_param("TargetGroupARNs.member")
@ -191,7 +190,7 @@ class AutoScalingResponse(BaseResponse):
@amz_crc32
@amzn_request_id
def describe_load_balancer_target_groups(self):
def describe_load_balancer_target_groups(self) -> str:
group_name = self._get_param("AutoScalingGroupName")
target_group_arns = (
self.autoscaling_backend.describe_load_balancer_target_groups(group_name)
@ -201,7 +200,7 @@ class AutoScalingResponse(BaseResponse):
@amz_crc32
@amzn_request_id
def detach_load_balancer_target_groups(self):
def detach_load_balancer_target_groups(self) -> str:
group_name = self._get_param("AutoScalingGroupName")
target_group_arns = self._get_multi_param("TargetGroupARNs.member")
@ -211,7 +210,7 @@ class AutoScalingResponse(BaseResponse):
template = self.response_template(DETACH_LOAD_BALANCER_TARGET_GROUPS_TEMPLATE)
return template.render()
def describe_auto_scaling_groups(self):
def describe_auto_scaling_groups(self) -> str:
names = self._get_multi_param("AutoScalingGroupNames.member")
token = self._get_param("NextToken")
all_groups = self.autoscaling_backend.describe_auto_scaling_groups(names)
@ -230,7 +229,7 @@ class AutoScalingResponse(BaseResponse):
template = self.response_template(DESCRIBE_AUTOSCALING_GROUPS_TEMPLATE)
return template.render(groups=groups, next_token=next_token)
def update_auto_scaling_group(self):
def update_auto_scaling_group(self) -> str:
self.autoscaling_backend.update_auto_scaling_group(
name=self._get_param("AutoScalingGroupName"),
availability_zones=self._get_multi_param("AvailabilityZones.member"),
@ -249,41 +248,41 @@ class AutoScalingResponse(BaseResponse):
template = self.response_template(UPDATE_AUTOSCALING_GROUP_TEMPLATE)
return template.render()
def delete_auto_scaling_group(self):
def delete_auto_scaling_group(self) -> str:
group_name = self._get_param("AutoScalingGroupName")
self.autoscaling_backend.delete_auto_scaling_group(group_name)
template = self.response_template(DELETE_AUTOSCALING_GROUP_TEMPLATE)
return template.render()
def set_desired_capacity(self):
def set_desired_capacity(self) -> str:
group_name = self._get_param("AutoScalingGroupName")
desired_capacity = self._get_int_param("DesiredCapacity")
self.autoscaling_backend.set_desired_capacity(group_name, desired_capacity)
template = self.response_template(SET_DESIRED_CAPACITY_TEMPLATE)
return template.render()
def create_or_update_tags(self):
def create_or_update_tags(self) -> str:
tags = self._get_list_prefix("Tags.member")
self.autoscaling_backend.create_or_update_tags(tags)
template = self.response_template(UPDATE_AUTOSCALING_GROUP_TEMPLATE)
return template.render()
def delete_tags(self):
def delete_tags(self) -> str:
tags = self._get_list_prefix("Tags.member")
self.autoscaling_backend.delete_tags(tags)
template = self.response_template(UPDATE_AUTOSCALING_GROUP_TEMPLATE)
return template.render()
def describe_auto_scaling_instances(self):
def describe_auto_scaling_instances(self) -> str:
instance_states = self.autoscaling_backend.describe_auto_scaling_instances(
instance_ids=self._get_multi_param("InstanceIds.member")
)
template = self.response_template(DESCRIBE_AUTOSCALING_INSTANCES_TEMPLATE)
return template.render(instance_states=instance_states)
def put_lifecycle_hook(self):
def put_lifecycle_hook(self) -> str:
lifecycle_hook = self.autoscaling_backend.create_lifecycle_hook(
name=self._get_param("LifecycleHookName"),
as_name=self._get_param("AutoScalingGroupName"),
@ -294,7 +293,7 @@ class AutoScalingResponse(BaseResponse):
template = self.response_template(CREATE_LIFECYLE_HOOK_TEMPLATE)
return template.render(lifecycle_hook=lifecycle_hook)
def describe_lifecycle_hooks(self):
def describe_lifecycle_hooks(self) -> str:
lifecycle_hooks = self.autoscaling_backend.describe_lifecycle_hooks(
as_name=self._get_param("AutoScalingGroupName"),
lifecycle_hook_names=self._get_multi_param("LifecycleHookNames.member"),
@ -302,14 +301,14 @@ class AutoScalingResponse(BaseResponse):
template = self.response_template(DESCRIBE_LIFECYCLE_HOOKS_TEMPLATE)
return template.render(lifecycle_hooks=lifecycle_hooks)
def delete_lifecycle_hook(self):
def delete_lifecycle_hook(self) -> str:
as_name = self._get_param("AutoScalingGroupName")
name = self._get_param("LifecycleHookName")
self.autoscaling_backend.delete_lifecycle_hook(as_name, name)
template = self.response_template(DELETE_LIFECYCLE_HOOK_TEMPLATE)
return template.render()
def put_scaling_policy(self):
def put_scaling_policy(self) -> str:
params = self._get_params()
policy = self.autoscaling_backend.put_scaling_policy(
name=params.get("PolicyName"),
@ -330,7 +329,7 @@ class AutoScalingResponse(BaseResponse):
template = self.response_template(CREATE_SCALING_POLICY_TEMPLATE)
return template.render(policy=policy)
def describe_policies(self):
def describe_policies(self) -> str:
policies = self.autoscaling_backend.describe_policies(
autoscaling_group_name=self._get_param("AutoScalingGroupName"),
policy_names=self._get_multi_param("PolicyNames.member"),
@ -339,13 +338,13 @@ class AutoScalingResponse(BaseResponse):
template = self.response_template(DESCRIBE_SCALING_POLICIES_TEMPLATE)
return template.render(policies=policies)
def delete_policy(self):
def delete_policy(self) -> str:
group_name = self._get_param("PolicyName")
self.autoscaling_backend.delete_policy(group_name)
template = self.response_template(DELETE_POLICY_TEMPLATE)
return template.render()
def execute_policy(self):
def execute_policy(self) -> str:
group_name = self._get_param("PolicyName")
self.autoscaling_backend.execute_policy(group_name)
template = self.response_template(EXECUTE_POLICY_TEMPLATE)
@ -353,7 +352,7 @@ class AutoScalingResponse(BaseResponse):
@amz_crc32
@amzn_request_id
def attach_load_balancers(self):
def attach_load_balancers(self) -> str:
group_name = self._get_param("AutoScalingGroupName")
load_balancer_names = self._get_multi_param("LoadBalancerNames.member")
self.autoscaling_backend.attach_load_balancers(group_name, load_balancer_names)
@ -362,7 +361,7 @@ class AutoScalingResponse(BaseResponse):
@amz_crc32
@amzn_request_id
def describe_load_balancers(self):
def describe_load_balancers(self) -> str:
group_name = self._get_param("AutoScalingGroupName")
load_balancers = self.autoscaling_backend.describe_load_balancers(group_name)
template = self.response_template(DESCRIBE_LOAD_BALANCERS_TEMPLATE)
@ -370,7 +369,7 @@ class AutoScalingResponse(BaseResponse):
@amz_crc32
@amzn_request_id
def detach_load_balancers(self):
def detach_load_balancers(self) -> str:
group_name = self._get_param("AutoScalingGroupName")
load_balancer_names = self._get_multi_param("LoadBalancerNames.member")
self.autoscaling_backend.detach_load_balancers(group_name, load_balancer_names)
@ -379,7 +378,7 @@ class AutoScalingResponse(BaseResponse):
@amz_crc32
@amzn_request_id
def enter_standby(self):
def enter_standby(self) -> str:
group_name = self._get_param("AutoScalingGroupName")
instance_ids = self._get_multi_param("InstanceIds.member")
should_decrement_string = self._get_param("ShouldDecrementDesiredCapacity")
@ -405,7 +404,7 @@ class AutoScalingResponse(BaseResponse):
@amz_crc32
@amzn_request_id
def exit_standby(self):
def exit_standby(self) -> str:
group_name = self._get_param("AutoScalingGroupName")
instance_ids = self._get_multi_param("InstanceIds.member")
(
@ -421,7 +420,7 @@ class AutoScalingResponse(BaseResponse):
timestamp=iso_8601_datetime_with_milliseconds(datetime.datetime.utcnow()),
)
def suspend_processes(self):
def suspend_processes(self) -> str:
autoscaling_group_name = self._get_param("AutoScalingGroupName")
scaling_processes = self._get_multi_param("ScalingProcesses.member")
self.autoscaling_backend.suspend_processes(
@ -430,7 +429,7 @@ class AutoScalingResponse(BaseResponse):
template = self.response_template(SUSPEND_PROCESSES_TEMPLATE)
return template.render()
def resume_processes(self):
def resume_processes(self) -> str:
autoscaling_group_name = self._get_param("AutoScalingGroupName")
scaling_processes = self._get_multi_param("ScalingProcesses.member")
self.autoscaling_backend.resume_processes(
@ -439,7 +438,7 @@ class AutoScalingResponse(BaseResponse):
template = self.response_template(RESUME_PROCESSES_TEMPLATE)
return template.render()
def set_instance_protection(self):
def set_instance_protection(self) -> str:
group_name = self._get_param("AutoScalingGroupName")
instance_ids = self._get_multi_param("InstanceIds.member")
protected_from_scale_in = self._get_bool_param("ProtectedFromScaleIn")
@ -451,7 +450,7 @@ class AutoScalingResponse(BaseResponse):
@amz_crc32
@amzn_request_id
def terminate_instance_in_auto_scaling_group(self):
def terminate_instance_in_auto_scaling_group(self) -> str:
instance_id = self._get_param("InstanceId")
should_decrement_string = self._get_param("ShouldDecrementDesiredCapacity")
if should_decrement_string == "true":
@ -472,13 +471,13 @@ class AutoScalingResponse(BaseResponse):
timestamp=iso_8601_datetime_with_milliseconds(datetime.datetime.utcnow()),
)
def describe_tags(self):
def describe_tags(self) -> str:
filters = self._get_params().get("Filters", [])
tags = self.autoscaling_backend.describe_tags(filters=filters)
template = self.response_template(DESCRIBE_TAGS_TEMPLATE)
return template.render(tags=tags, next_token=None)
def enable_metrics_collection(self):
def enable_metrics_collection(self) -> str:
group_name = self._get_param("AutoScalingGroupName")
metrics = self._get_params().get("Metrics")
self.autoscaling_backend.enable_metrics_collection(group_name, metrics)

View File

@ -50,7 +50,9 @@ class RESTError(HTTPException):
"error": ERROR_RESPONSE,
}
def __init__(self, error_type, message, template="error", **kwargs):
def __init__(
self, error_type: str, message: str, template: str = "error", **kwargs: Any
):
super().__init__()
self.error_type = error_type
self.message = message

View File

@ -14,8 +14,8 @@ from moto import settings
from moto.core.exceptions import DryRunClientError
from moto.core.utils import camelcase_to_underscores, method_names_from_class
from moto.utilities.utils import load_resource
from jinja2 import Environment, DictLoader
from typing import Dict, List, Union, Any, Optional, Tuple
from jinja2 import Environment, DictLoader, Template
from typing import Dict, Union, Any, Tuple, TypeVar
from urllib.parse import parse_qs, parse_qsl, urlparse
from werkzeug.exceptions import HTTPException
from xml.dom.minidom import parseString as parseXML
@ -26,6 +26,7 @@ log = logging.getLogger(__name__)
JINJA_ENVS = {}
TYPE_RESPONSE = Tuple[int, Dict[str, str], str]
TYPE_IF_NONE = TypeVar("TYPE_IF_NONE")
def _decode_dict(d):
@ -105,7 +106,7 @@ class _TemplateEnvironmentMixin(object):
"""
return str(id(source))
def response_template(self, source):
def response_template(self, source: str) -> Template:
template_id = self._make_template_id(source)
if not self.contains_template(template_id):
if settings.PRETTIFY_RESPONSES:
@ -501,13 +502,17 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
pass
return if_none
def _get_int_param(self, param_name, if_none: int = None) -> Optional[int]:
def _get_int_param(
self, param_name, if_none: TYPE_IF_NONE = None
) -> Union[int, TYPE_IF_NONE]:
val = self._get_param(param_name)
if val is not None:
return int(val)
return if_none
def _get_bool_param(self, param_name, if_none: bool = None) -> Optional[bool]:
def _get_bool_param(
self, param_name, if_none: TYPE_IF_NONE = None
) -> Union[bool, TYPE_IF_NONE]:
val = self._get_param(param_name)
if val is not None:
val = str(val)
@ -588,9 +593,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return value_dict
def _get_multi_param(
self, param_prefix, skip_result_conversion=False
) -> List[Union[str, Dict]]:
def _get_multi_param(self, param_prefix, skip_result_conversion=False) -> Any:
"""
Given a querystring of ?LaunchConfigurationNames.member.1=my-test-1&LaunchConfigurationNames.member.2=my-test-2
this will return ['my-test-1', 'my-test-2']
@ -635,7 +638,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
]
return params
def _get_params(self) -> Dict[str, Union[str, List, Dict]]:
def _get_params(self) -> Any:
"""
Given a querystring of
{
@ -710,7 +713,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
else:
obj[keylist[-1]] = value
def _get_list_prefix(self, param_prefix):
def _get_list_prefix(self, param_prefix: str) -> Any:
"""
Given a query dict like
{

View File

@ -12,7 +12,7 @@ from urllib.parse import urlparse
from uuid import uuid4
def camelcase_to_underscores(argument):
def camelcase_to_underscores(argument: Optional[str]) -> str:
"""Converts a camelcase param like theNewAttribute to the equivalent
python underscore variable like the_new_attribute"""
result = ""
@ -147,7 +147,7 @@ class convert_flask_to_responses_response(object):
return status, headers, response
def iso_8601_datetime_with_milliseconds(value):
def iso_8601_datetime_with_milliseconds(value: datetime) -> str:
return value.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"

View File

@ -2,6 +2,7 @@ import copy
import warnings
from collections import OrderedDict
from datetime import datetime
from typing import Any, List, Tuple
from moto import settings
from moto.core import CloudFormationModel
@ -69,7 +70,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
def __init__(self, ec2_backend, image_id, user_data, security_groups, **kwargs):
super().__init__()
self.ec2_backend = ec2_backend
self.id = random_instance_id()
self.id: str = random_instance_id()
self.owner_id = ec2_backend.account_id
self.lifecycle = kwargs.get("lifecycle")
@ -79,9 +80,9 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
if launch_template_arg and not image_id:
# the image id from the template should be used
template_version = ec2_backend._get_template_from_args(launch_template_arg)
self.image_id = template_version.image_id
self.image_id: str = template_version.image_id
else:
self.image_id = image_id
self.image_id: str = image_id
# Check if we have tags to process
if launch_template_arg:
template_version = ec2_backend._get_template_from_args(launch_template_arg)
@ -95,7 +96,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
self._state_reason = StateReason()
self.user_data = user_data
self.security_groups = security_groups
self.instance_type = kwargs.get("instance_type", "m1.small")
self.instance_type: str = kwargs.get("instance_type", "m1.small")
self.region_name = kwargs.get("region_name", "us-east-1")
placement = kwargs.get("placement", None)
self.subnet_id = kwargs.get("subnet_id")
@ -159,7 +160,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
else:
self._placement.zone = ec2_backend.region_name + "a"
self.block_device_mapping = BlockDeviceMapping()
self.block_device_mapping: BlockDeviceMapping = BlockDeviceMapping()
self._private_ips = set()
self.prep_nics(
@ -592,7 +593,14 @@ class InstanceBackend:
return instance
raise InvalidInstanceIdError(instance_id)
def add_instances(self, image_id, count, user_data, security_group_names, **kwargs):
def add_instances(
self,
image_id: str,
count: int,
user_data: str,
security_group_names: List[str],
**kwargs: Any
) -> Reservation:
location_type = "availability-zone" if kwargs.get("placement") else "region"
default_region = "us-east-1"
if settings.ENABLE_KEYPAIR_VALIDATION:
@ -734,7 +742,7 @@ class InstanceBackend:
return stopped_instances
def terminate_instances(self, instance_ids):
def terminate_instances(self, instance_ids: List[str]) -> List[Tuple[str, str]]:
terminated_instances = []
if not instance_ids:
raise EC2ClientError(

View File

@ -179,10 +179,10 @@ class LaunchTemplateBackend:
self.launch_template_insert_order.append(template.id)
return template
def get_launch_template(self, template_id):
def get_launch_template(self, template_id: str) -> LaunchTemplate:
return self.launch_templates[template_id]
def get_launch_template_by_name(self, name):
def get_launch_template_by_name(self, name: str) -> LaunchTemplate:
if name not in self.launch_template_name_to_ids:
raise InvalidLaunchTemplateNameNotFoundWithNameError(name)
return self.get_launch_template(self.launch_template_name_to_ids[name])

View File

@ -1,6 +1,7 @@
import ipaddress
import itertools
from collections import defaultdict
from typing import Any, Iterable, List, Optional
from moto.core import CloudFormationModel
from ..exceptions import (
@ -344,7 +345,9 @@ class SubnetBackend:
self.subnets[availability_zone][subnet_id] = subnet
return subnet
def get_all_subnets(self, subnet_ids=None, filters=None):
def get_all_subnets(
self, subnet_ids: Optional[List[str]] = None, filters: Optional[Any] = None
) -> Iterable[Subnet]:
# Extract a list of all subnets
matches = itertools.chain(
*[x.copy().values() for x in self.subnets.copy().values()]

View File

@ -1,5 +1,6 @@
import re
from collections import defaultdict
from typing import Dict, List
from ..exceptions import (
InvalidParameterValueErrorTagNull,
@ -18,7 +19,7 @@ class TagBackend:
def __init__(self):
self.tags = defaultdict(dict)
def create_tags(self, resource_ids, tags):
def create_tags(self, resource_ids: List[str], tags: Dict[str, str]):
if None in set([tags[tag] for tag in tags]):
raise InvalidParameterValueErrorTagNull()
for resource_id in resource_ids:
@ -36,7 +37,7 @@ class TagBackend:
self.tags[resource_id][tag] = tags[tag]
return True
def delete_tags(self, resource_ids, tags):
def delete_tags(self, resource_ids: List[str], tags: Dict[str, str]):
for resource_id in resource_ids:
for tag in tags:
if tag in self.tags[resource_id]:

View File

@ -74,7 +74,7 @@ def random_ami_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["image"])
def random_instance_id():
def random_instance_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["instance"], size=17)

View File

@ -1,8 +1,8 @@
import datetime
import pytz
from collections import OrderedDict
from typing import List, Iterable
from moto.core import BaseBackend, BaseModel, CloudFormationModel
from moto.core.utils import BackendDict
from moto.ec2.models import ec2_backends
@ -359,7 +359,7 @@ class ELBBackend(BaseBackend):
return balancer
def describe_load_balancers(self, names):
def describe_load_balancers(self, names: List[str]) -> List[FakeLoadBalancer]:
balancers = self.load_balancers.values()
if names:
matched_balancers = [
@ -465,8 +465,11 @@ class ELBBackend(BaseBackend):
return balancer
def register_instances(
self, load_balancer_name, instance_ids, from_autoscaling=False
):
self,
load_balancer_name: str,
instance_ids: Iterable[str],
from_autoscaling: bool = False,
) -> FakeLoadBalancer:
load_balancer = self.get_load_balancer(load_balancer_name)
attr_name = (
"instance_sparse_ids"
@ -479,8 +482,11 @@ class ELBBackend(BaseBackend):
return load_balancer
def deregister_instances(
self, load_balancer_name, instance_ids, from_autoscaling=False
):
self,
load_balancer_name: str,
instance_ids: Iterable[str],
from_autoscaling: bool = False,
) -> FakeLoadBalancer:
load_balancer = self.get_load_balancer(load_balancer_name)
attr_name = (
"instance_sparse_ids"

View File

@ -3,6 +3,7 @@ import re
from jinja2 import Template
from botocore.exceptions import ParamValidationError
from collections import OrderedDict
from typing import Any, List, Dict, Iterable, Optional
from moto.core.exceptions import RESTError
from moto.core import BaseBackend, BaseModel, CloudFormationModel
from moto.core.utils import (
@ -1175,7 +1176,12 @@ Member must satisfy regular expression pattern: {}".format(
raise RuleNotFoundError("One or more rules not found")
return matched_rules
def describe_target_groups(self, load_balancer_arn, target_group_arns, names):
def describe_target_groups(
self,
load_balancer_arn: Optional[str],
target_group_arns: List[str],
names: Optional[List[str]],
) -> Iterable[FakeTargetGroup]:
if load_balancer_arn:
if load_balancer_arn not in self.load_balancers:
raise LoadBalancerNotFoundError()
@ -1284,13 +1290,15 @@ Member must satisfy regular expression pattern: {}".format(
rule.actions = actions
return rule
def register_targets(self, target_group_arn, instances):
def register_targets(self, target_group_arn: str, instances: List[Any]):
target_group = self.target_groups.get(target_group_arn)
if target_group is None:
raise TargetGroupNotFoundError()
target_group.register(instances)
def deregister_targets(self, target_group_arn, instances):
def deregister_targets(
self, target_group_arn: str, instances: List[Dict[str, Any]]
):
target_group = self.target_groups.get(target_group_arn)
if target_group is None:
raise TargetGroupNotFoundError()

View File

@ -20,6 +20,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
from typing import Any, Dict, List, Optional, Union
class BlockDeviceType(object):
@ -29,18 +30,18 @@ class BlockDeviceType(object):
def __init__(
self,
connection=None,
ephemeral_name=None,
no_device=False,
volume_id=None,
snapshot_id=None,
status=None,
attach_time=None,
delete_on_termination=False,
size=None,
volume_type=None,
iops=None,
encrypted=None,
connection: Optional[str] = None,
ephemeral_name: Optional[str] = None,
no_device: Union[bool, str] = False,
volume_id: Optional[str] = None,
snapshot_id: Optional[str] = None,
status: Optional[str] = None,
attach_time: Optional[str] = None,
delete_on_termination: bool = False,
size: Optional[str] = None,
volume_type: Optional[str] = None,
iops: Optional[str] = None,
encrypted: Optional[str] = None,
):
self.connection = connection
self.ephemeral_name = ephemeral_name
@ -55,6 +56,7 @@ class BlockDeviceType(object):
self.iops = iops
self.encrypted = encrypted
self.kms_key_id = None
self.throughput = None
# for backwards compatibility
@ -73,17 +75,7 @@ class BlockDeviceMapping(dict):
reservation = image.run(..., block_device_map=bdm, ...)
"""
def __init__(self, connection=None):
"""
:type connection: :class:`boto.ec2.EC2Connection`
:param connection: Optional connection.
"""
dict.__init__(self)
self.connection = connection
self.current_name = None
self.current_value = None
def to_source_dict(self):
def to_source_dict(self) -> List[Dict[str, Any]]:
return [
{
"DeviceName": device_name,

View File

@ -1,10 +1,14 @@
from functools import wraps
from typing import Any, Callable, TypeVar
import binascii
import re
from moto.moto_api._internal import mock_random as random
TypeDec = TypeVar("TypeDec", bound=Callable[..., Any])
def gen_amz_crc32(response, headerdict=None):
if not isinstance(response, bytes):
response = response.encode("utf-8")
@ -26,9 +30,9 @@ def gen_amzn_requestid_long(headerdict=None):
return req_id
def amz_crc32(f):
def amz_crc32(f: TypeDec) -> TypeDec:
@wraps(f)
def _wrapper(*args, **kwargs):
def _wrapper(*args: Any, **kwargs: Any) -> Any:
response = f(*args, **kwargs)
headers = {}
@ -54,9 +58,9 @@ def amz_crc32(f):
return _wrapper
def amzn_request_id(f):
def amzn_request_id(f: TypeDec) -> TypeDec:
@wraps(f)
def _wrapper(*args, **kwargs):
def _wrapper(*args: Any, **kwargs: Any) -> Any:
response = f(*args, **kwargs)
headers = {}