diff --git a/moto/autoscaling/exceptions.py b/moto/autoscaling/exceptions.py index c408e16b2..5248109e9 100644 --- a/moto/autoscaling/exceptions.py +++ b/moto/autoscaling/exceptions.py @@ -8,7 +8,7 @@ class AutoscalingClientError(RESTError): class ResourceContentionError(RESTError): code = 500 - def __init__(self): + def __init__(self) -> None: super().__init__( "ResourceContentionError", "You already have a pending update to an Auto Scaling resource (for example, a group, instance, or load balancer).", @@ -16,12 +16,12 @@ class ResourceContentionError(RESTError): class InvalidInstanceError(AutoscalingClientError): - def __init__(self, instance_id): + def __init__(self, instance_id: str): super().__init__( "ValidationError", "Instance [{0}] is invalid.".format(instance_id) ) class ValidationError(AutoscalingClientError): - def __init__(self, message): + def __init__(self, message: str): super().__init__("ValidationError", message) diff --git a/moto/autoscaling/models.py b/moto/autoscaling/models.py index 1a2d4af41..1827131a5 100644 --- a/moto/autoscaling/models.py +++ b/moto/autoscaling/models.py @@ -1,4 +1,5 @@ import itertools +from typing import Any, Dict, List, Optional, Tuple, Union from moto.packages.boto.ec2.blockdevicemapping import ( BlockDeviceType, @@ -10,8 +11,10 @@ from collections import OrderedDict from moto.core import BaseBackend, BaseModel, CloudFormationModel from moto.core.utils import camelcase_to_underscores, BackendDict from moto.ec2 import ec2_backends -from moto.elb import elb_backends -from moto.elbv2 import elbv2_backends +from moto.ec2.models import EC2Backend +from moto.ec2.models.instances import Instance +from moto.elb.models import elb_backends, ELBBackend +from moto.elbv2.models import elbv2_backends, ELBv2Backend from moto.elb.exceptions import LoadBalancerNotFoundError from moto.moto_api._internal import mock_random as random from .exceptions import ( @@ -30,22 +33,29 @@ ASG_NAME_TAG = "aws:autoscaling:groupName" class InstanceState(object): def __init__( self, - instance, - lifecycle_state="InService", - health_status="Healthy", - protected_from_scale_in=False, - autoscaling_group=None, + instance: "Instance", + lifecycle_state: str = "InService", + health_status: str = "Healthy", + protected_from_scale_in: Optional[bool] = False, + autoscaling_group: Optional["FakeAutoScalingGroup"] = None, ): self.instance = instance self.lifecycle_state = lifecycle_state self.health_status = health_status self.protected_from_scale_in = protected_from_scale_in if not hasattr(self.instance, "autoscaling_group"): - self.instance.autoscaling_group = autoscaling_group + self.instance.autoscaling_group = autoscaling_group # type: ignore[attr-defined] class FakeLifeCycleHook(BaseModel): - def __init__(self, name, as_name, transition, timeout, result): + def __init__( + self, + name: str, + as_name: str, + transition: Optional[str], + timeout: Optional[int], + result: Optional[str], + ): self.name = name self.as_name = as_name if transition: @@ -63,19 +73,19 @@ class FakeLifeCycleHook(BaseModel): class FakeScalingPolicy(BaseModel): def __init__( self, - name, - policy_type, - metric_aggregation_type, - adjustment_type, - as_name, - min_adjustment_magnitude, - scaling_adjustment, - cooldown, - target_tracking_config, - step_adjustments, - estimated_instance_warmup, - predictive_scaling_configuration, - autoscaling_backend, + name: str, + policy_type: str, + metric_aggregation_type: str, + adjustment_type: str, + as_name: str, + min_adjustment_magnitude: str, + scaling_adjustment: Optional[int], + cooldown: Optional[int], + target_tracking_config: str, + step_adjustments: str, + estimated_instance_warmup: str, + predictive_scaling_configuration: str, + autoscaling_backend: "AutoScalingBackend", ): self.name = name self.policy_type = policy_type @@ -95,10 +105,10 @@ class FakeScalingPolicy(BaseModel): self.autoscaling_backend = autoscaling_backend @property - def arn(self): + def arn(self) -> str: return f"arn:aws:autoscaling:{self.autoscaling_backend.region_name}:{self.autoscaling_backend.account_id}:scalingPolicy:c322761b-3172-4d56-9a21-0ed9d6161d67:autoScalingGroupName/{self.as_name}:policyName/{self.name}" - def execute(self): + def execute(self) -> None: if self.adjustment_type == "ExactCapacity": self.autoscaling_backend.set_desired_capacity( self.as_name, self.scaling_adjustment @@ -116,25 +126,25 @@ class FakeScalingPolicy(BaseModel): class FakeLaunchConfiguration(CloudFormationModel): def __init__( self, - name, - image_id, - key_name, - ramdisk_id, - kernel_id, - security_groups, - user_data, - instance_type, - instance_monitoring, - instance_profile_name, - spot_price, - ebs_optimized, - associate_public_ip_address, - block_device_mapping_dict, - account_id, - region_name, - metadata_options, - classic_link_vpc_id, - classic_link_vpc_security_groups, + name: str, + image_id: str, + key_name: Optional[str], + ramdisk_id: str, + kernel_id: str, + security_groups: List[str], + user_data: str, + instance_type: str, + instance_monitoring: bool, + instance_profile_name: Optional[str], + spot_price: Optional[str], + ebs_optimized: str, + associate_public_ip_address: Union[str, bool], + block_device_mapping_dict: List[Dict[str, Any]], + account_id: str, + region_name: str, + metadata_options: Optional[str], + classic_link_vpc_id: Optional[str], + classic_link_vpc_security_groups: Optional[str], ): self.name = name self.image_id = image_id @@ -148,11 +158,12 @@ class FakeLaunchConfiguration(CloudFormationModel): self.instance_profile_name = instance_profile_name self.spot_price = spot_price self.ebs_optimized = ebs_optimized - self.associate_public_ip_address = associate_public_ip_address if isinstance(associate_public_ip_address, str): self.associate_public_ip_address = ( associate_public_ip_address.lower() == "true" ) + else: + self.associate_public_ip_address = associate_public_ip_address self.block_device_mapping_dict = block_device_mapping_dict self.metadata_options = metadata_options self.classic_link_vpc_id = classic_link_vpc_id @@ -160,7 +171,9 @@ class FakeLaunchConfiguration(CloudFormationModel): self.arn = f"arn:aws:autoscaling:{region_name}:{account_id}:launchConfiguration:9dbbbf87-6141-428a-a409-0752edbe6cad:launchConfigurationName/{self.name}" @classmethod - def create_from_instance(cls, name, instance, backend): + def create_from_instance( + cls, name: str, instance: Instance, backend: "AutoScalingBackend" + ) -> "FakeLaunchConfiguration": config = backend.create_launch_configuration( name=name, image_id=instance.image_id, @@ -181,18 +194,23 @@ class FakeLaunchConfiguration(CloudFormationModel): return config @staticmethod - def cloudformation_name_type(): + def cloudformation_name_type() -> str: return "LaunchConfigurationName" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-autoscaling-launchconfiguration.html return "AWS::AutoScaling::LaunchConfiguration" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "FakeLaunchConfiguration": properties = cloudformation_json["Properties"] instance_profile_name = properties.get("IamInstanceProfile") @@ -217,14 +235,14 @@ class FakeLaunchConfiguration(CloudFormationModel): return config @classmethod - def update_from_cloudformation_json( + def update_from_cloudformation_json( # type: ignore[misc] cls, - original_resource, - new_resource_name, - cloudformation_json, - account_id, - region_name, - ): + original_resource: Any, + new_resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + ) -> "FakeLaunchConfiguration": cls.delete_from_cloudformation_json( original_resource.name, cloudformation_json, account_id, region_name ) @@ -233,42 +251,46 @@ class FakeLaunchConfiguration(CloudFormationModel): ) @classmethod - def delete_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name - ): + def delete_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + ) -> None: backend = autoscaling_backends[account_id][region_name] try: backend.delete_launch_configuration(resource_name) except KeyError: pass - def delete(self, account_id, region_name): + def delete(self, account_id: str, region_name: str) -> None: backend = autoscaling_backends[account_id][region_name] backend.delete_launch_configuration(self.name) @property - def physical_resource_id(self): + def physical_resource_id(self) -> str: return self.name @property - def block_device_mappings(self): + def block_device_mappings(self) -> Optional[BlockDeviceMapping]: if not self.block_device_mapping_dict: return None else: return self._parse_block_device_mappings() @property - def instance_monitoring_enabled(self): + def instance_monitoring_enabled(self) -> str: if self.instance_monitoring: return "true" return "false" - def _parse_block_device_mappings(self): + def _parse_block_device_mappings(self) -> BlockDeviceMapping: block_device_map = BlockDeviceMapping() for mapping in self.block_device_mapping_dict: block_type = BlockDeviceType() mount_point = mapping.get("DeviceName") - if mapping.get("VirtualName") and "ephemeral" in mapping.get("VirtualName"): + if mapping.get("VirtualName") and "ephemeral" in mapping.get("VirtualName"): # type: ignore[operator] block_type.ephemeral_name = mapping.get("VirtualName") elif mapping.get("NoDevice", "false") == "true": block_type.no_device = "true" @@ -288,14 +310,14 @@ class FakeLaunchConfiguration(CloudFormationModel): class FakeScheduledAction(CloudFormationModel): def __init__( self, - name, - desired_capacity, - max_size, - min_size, - scheduled_action_name=None, - start_time=None, - end_time=None, - recurrence=None, + name: str, + desired_capacity: Optional[int], + max_size: Optional[int], + min_size: Optional[int], + scheduled_action_name: str, + start_time: str, + end_time: str, + recurrence: str, ): self.name = name @@ -308,20 +330,23 @@ class FakeScheduledAction(CloudFormationModel): self.scheduled_action_name = scheduled_action_name @staticmethod - def cloudformation_name_type(): - + def cloudformation_name_type() -> str: return "ScheduledActionName" @staticmethod - def cloudformation_type(): - + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-as-scheduledaction.html return "AWS::AutoScaling::ScheduledAction" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Dict[str, Any], + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "FakeScheduledAction": properties = cloudformation_json["Properties"] @@ -346,30 +371,40 @@ class FakeScheduledAction(CloudFormationModel): return scheduled_action +def set_string_propagate_at_launch_booleans_on_tags( + tags: List[Dict[str, Any]] +) -> List[Dict[str, Any]]: + bool_to_string = {True: "true", False: "false"} + for tag in tags: + if "PropagateAtLaunch" in tag: + tag["PropagateAtLaunch"] = bool_to_string[tag["PropagateAtLaunch"]] + return tags + + class FakeAutoScalingGroup(CloudFormationModel): def __init__( self, - name, - availability_zones, - desired_capacity, - max_size, - min_size, - launch_config_name, - launch_template, - vpc_zone_identifier, - default_cooldown, - health_check_period, - health_check_type, - load_balancers, - target_group_arns, - placement_group, - termination_policies, - autoscaling_backend, - ec2_backend, - tags, - mixed_instance_policy, - capacity_rebalance, - new_instances_protected_from_scale_in=False, + name: str, + availability_zones: List[str], + desired_capacity: Optional[int], + max_size: Optional[int], + min_size: Optional[int], + launch_config_name: str, + launch_template: Dict[str, Any], + vpc_zone_identifier: str, + default_cooldown: Optional[int], + health_check_period: Optional[int], + health_check_type: Optional[str], + load_balancers: List[str], + target_group_arns: List[str], + placement_group: str, + termination_policies: str, + autoscaling_backend: "AutoScalingBackend", + ec2_backend: EC2Backend, + tags: List[Dict[str, str]], + mixed_instance_policy: Optional[Dict[str, Any]], + capacity_rebalance: bool, + new_instances_protected_from_scale_in: bool = False, ): self.autoscaling_backend = autoscaling_backend self.ec2_backend = ec2_backend @@ -379,6 +414,7 @@ class FakeAutoScalingGroup(CloudFormationModel): self.account_id = self.autoscaling_backend.account_id self.service_linked_role = f"arn:aws:iam::{self.account_id}:role/aws-service-role/autoscaling.amazonaws.com/AWSServiceRoleForAutoScaling" + self.vpc_zone_identifier: Optional[str] = None self._set_azs_and_vpcs(availability_zones, vpc_zone_identifier) self.max_size = max_size @@ -406,19 +442,19 @@ class FakeAutoScalingGroup(CloudFormationModel): new_instances_protected_from_scale_in ) - self.suspended_processes = [] - self.instance_states = [] - self.tags = tags or [] + self.suspended_processes: List[str] = [] + self.instance_states: List[InstanceState] = [] + self.tags: List[Dict[str, str]] = tags or [] self.set_desired_capacity(desired_capacity) - self.metrics = [] + self.metrics: List[str] = [] @property - def tags(self): + def tags(self) -> List[Dict[str, str]]: return self._tags @tags.setter - def tags(self, tags): + def tags(self, tags: List[Dict[str, str]]) -> None: for tag in tags: if "resource_id" not in tag or not tag["resource_id"]: tag["resource_id"] = self.name @@ -427,13 +463,18 @@ class FakeAutoScalingGroup(CloudFormationModel): self._tags = tags @property - def arn(self): + def arn(self) -> str: return f"arn:aws:autoscaling:{self.region}:{self.account_id}:autoScalingGroup:{self._id}:autoScalingGroupName/{self.name}" - def active_instances(self): + def active_instances(self) -> List[InstanceState]: return [x for x in self.instance_states if x.lifecycle_state == "InService"] - def _set_azs_and_vpcs(self, availability_zones, vpc_zone_identifier, update=False): + def _set_azs_and_vpcs( + self, + availability_zones: List[str], + vpc_zone_identifier: Optional[str], + update: bool = False, + ) -> None: # for updates, if only AZs are provided, they must not clash with # the AZs of existing VPCs if update and availability_zones and not vpc_zone_identifier: @@ -465,8 +506,11 @@ class FakeAutoScalingGroup(CloudFormationModel): self.vpc_zone_identifier = vpc_zone_identifier def _set_launch_configuration( - self, launch_config_name, launch_template, mixed_instance_policy - ): + self, + launch_config_name: str, + launch_template: Dict[str, Any], + mixed_instance_policy: Optional[Dict[str, Any]], + ) -> None: if launch_config_name: self.launch_config = self.autoscaling_backend.launch_configurations[ launch_config_name @@ -480,7 +524,7 @@ class FakeAutoScalingGroup(CloudFormationModel): self.launch_template_version = ( launch_template.get("version") or "$Default" ) - else: + elif mixed_instance_policy: spec = mixed_instance_policy["LaunchTemplate"][ "LaunchTemplateSpecification" ] @@ -505,26 +549,23 @@ class FakeAutoScalingGroup(CloudFormationModel): ) @staticmethod - def __set_string_propagate_at_launch_booleans_on_tags(tags): - bool_to_string = {True: "true", False: "false"} - for tag in tags: - if "PropagateAtLaunch" in tag: - tag["PropagateAtLaunch"] = bool_to_string[tag["PropagateAtLaunch"]] - return tags - - @staticmethod - def cloudformation_name_type(): + def cloudformation_name_type() -> str: return "AutoScalingGroupName" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-autoscaling-autoscalinggroup.html return "AWS::AutoScaling::AutoScalingGroup" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Dict[str, Any], + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "FakeAutoScalingGroup": properties = cloudformation_json["Properties"] launch_config_name = properties.get("LaunchConfigurationName") @@ -554,7 +595,7 @@ class FakeAutoScalingGroup(CloudFormationModel): target_group_arns=target_group_arns, placement_group=None, termination_policies=properties.get("TerminationPolicies", []), - tags=cls.__set_string_propagate_at_launch_booleans_on_tags( + tags=set_string_propagate_at_launch_booleans_on_tags( properties.get("Tags", []) ), new_instances_protected_from_scale_in=properties.get( @@ -564,14 +605,14 @@ class FakeAutoScalingGroup(CloudFormationModel): return group @classmethod - def update_from_cloudformation_json( + def update_from_cloudformation_json( # type: ignore[misc] cls, - original_resource, - new_resource_name, - cloudformation_json, - account_id, - region_name, - ): + original_resource: Any, + new_resource_name: str, + cloudformation_json: Dict[str, Any], + account_id: str, + region_name: str, + ) -> "FakeAutoScalingGroup": cls.delete_from_cloudformation_json( original_resource.name, cloudformation_json, account_id, region_name ) @@ -580,68 +621,72 @@ class FakeAutoScalingGroup(CloudFormationModel): ) @classmethod - def delete_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name - ): + def delete_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Dict[str, Any], + account_id: str, + region_name: str, + ) -> None: backend = autoscaling_backends[account_id][region_name] try: backend.delete_auto_scaling_group(resource_name) except KeyError: pass - def delete(self, account_id, region_name): + def delete(self, account_id: str, region_name: str) -> None: backend = autoscaling_backends[account_id][region_name] backend.delete_auto_scaling_group(self.name) @property - def physical_resource_id(self): + def physical_resource_id(self) -> str: return self.name @property - def image_id(self): + def image_id(self) -> str: if self.launch_template: version = self.launch_template.get_version(self.launch_template_version) return version.image_id - return self.launch_config.image_id + return self.launch_config.image_id # type: ignore[union-attr] @property - def instance_type(self): + def instance_type(self) -> str: if self.launch_template: version = self.launch_template.get_version(self.launch_template_version) return version.instance_type - return self.launch_config.instance_type + return self.launch_config.instance_type # type: ignore[union-attr] @property - def user_data(self): + def user_data(self) -> str: if self.launch_template: version = self.launch_template.get_version(self.launch_template_version) return version.user_data - return self.launch_config.user_data + return self.launch_config.user_data # type: ignore[union-attr] @property - def security_groups(self): + def security_groups(self) -> List[str]: if self.launch_template: version = self.launch_template.get_version(self.launch_template_version) return version.security_groups - return self.launch_config.security_groups + return self.launch_config.security_groups # type: ignore[union-attr] def update( self, - availability_zones, - desired_capacity, - max_size, - min_size, - launch_config_name, - launch_template, - vpc_zone_identifier, - health_check_period, - health_check_type, - new_instances_protected_from_scale_in=None, - ): + availability_zones: List[str], + desired_capacity: Optional[int], + max_size: Optional[int], + min_size: Optional[int], + launch_config_name: str, + launch_template: Dict[str, Any], + vpc_zone_identifier: str, + health_check_period: int, + health_check_type: str, + new_instances_protected_from_scale_in: Optional[bool] = None, + ) -> None: self._set_azs_and_vpcs(availability_zones, vpc_zone_identifier, update=True) if max_size is not None: @@ -671,7 +716,7 @@ class FakeAutoScalingGroup(CloudFormationModel): if desired_capacity is not None: self.set_desired_capacity(desired_capacity) - def set_desired_capacity(self, new_capacity): + def set_desired_capacity(self, new_capacity: Optional[int]) -> None: if new_capacity is None: self.desired_capacity = self.min_size else: @@ -681,15 +726,15 @@ class FakeAutoScalingGroup(CloudFormationModel): if self.desired_capacity == curr_instance_count: pass # Nothing to do here - elif self.desired_capacity > curr_instance_count: + elif self.desired_capacity > curr_instance_count: # type: ignore[operator] # Need more instances - count_needed = int(self.desired_capacity) - int(curr_instance_count) + count_needed = int(self.desired_capacity) - int(curr_instance_count) # type: ignore[arg-type] propagated_tags = self.get_propagated_tags() self.replace_autoscaling_group_instances(count_needed, propagated_tags) else: # Need to remove some instances - count_to_remove = curr_instance_count - self.desired_capacity + count_to_remove = curr_instance_count - self.desired_capacity # type: ignore[operator] instances_to_remove = [ # only remove unprotected state for state in self.instance_states @@ -709,7 +754,7 @@ class FakeAutoScalingGroup(CloudFormationModel): self.autoscaling_backend.update_attached_elbs(self.name) self.autoscaling_backend.update_attached_target_groups(self.name) - def get_propagated_tags(self): + def get_propagated_tags(self) -> Dict[str, str]: propagated_tags = {} for tag in self.tags: # boto uses 'propagate_at_launch @@ -720,7 +765,9 @@ class FakeAutoScalingGroup(CloudFormationModel): propagated_tags[tag["Key"]] = tag["Value"] return propagated_tags - def replace_autoscaling_group_instances(self, count_needed, propagated_tags): + def replace_autoscaling_group_instances( + self, count_needed: int, propagated_tags: Dict[str, str] + ) -> None: propagated_tags[ASG_NAME_TAG] = self.name # VPCZoneIdentifier: @@ -756,28 +803,28 @@ class FakeAutoScalingGroup(CloudFormationModel): ) ) - def append_target_groups(self, target_group_arns): + def append_target_groups(self, target_group_arns: List[str]) -> None: append = [x for x in target_group_arns if x not in self.target_group_arns] self.target_group_arns.extend(append) - def enable_metrics_collection(self, metrics): + def enable_metrics_collection(self, metrics: List[str]) -> None: self.metrics = metrics or [] class AutoScalingBackend(BaseBackend): - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.autoscaling_groups = OrderedDict() - self.launch_configurations = OrderedDict() - self.scheduled_actions = OrderedDict() - self.policies = {} - self.lifecycle_hooks = {} - self.ec2_backend = ec2_backends[self.account_id][region_name] - self.elb_backend = elb_backends[self.account_id][region_name] - self.elbv2_backend = elbv2_backends[self.account_id][region_name] + self.autoscaling_groups: Dict[str, FakeAutoScalingGroup] = OrderedDict() + self.launch_configurations: Dict[str, FakeLaunchConfiguration] = OrderedDict() + self.scheduled_actions: Dict[str, FakeScheduledAction] = OrderedDict() + self.policies: Dict[str, FakeScalingPolicy] = {} + self.lifecycle_hooks: Dict[str, FakeLifeCycleHook] = {} + self.ec2_backend: EC2Backend = ec2_backends[self.account_id][region_name] + self.elb_backend: ELBBackend = elb_backends[self.account_id][region_name] + self.elbv2_backend: ELBv2Backend = elbv2_backends[self.account_id][region_name] @staticmethod - def default_vpc_endpoint_service(service_region, zones): + def default_vpc_endpoint_service(service_region: str, zones: List[Dict[str, Any]]) -> List[Dict[str, Any]]: # type: ignore[misc] """Default VPC endpoint service.""" return BaseBackend.default_vpc_endpoint_service_factory( service_region, zones, "autoscaling" @@ -787,25 +834,25 @@ class AutoScalingBackend(BaseBackend): def create_launch_configuration( self, - name, - image_id, - key_name, - kernel_id, - ramdisk_id, - security_groups, - user_data, - instance_type, - instance_monitoring, - instance_profile_name, - spot_price, - ebs_optimized, - associate_public_ip_address, - block_device_mappings, - instance_id=None, - metadata_options=None, - classic_link_vpc_id=None, - classic_link_vpc_security_groups=None, - ): + name: str, + image_id: str, + key_name: Optional[str], + kernel_id: str, + ramdisk_id: str, + security_groups: List[str], + user_data: str, + instance_type: str, + instance_monitoring: bool, + instance_profile_name: Optional[str], + spot_price: Optional[str], + ebs_optimized: str, + associate_public_ip_address: str, + block_device_mappings: List[Dict[str, Any]], + instance_id: Optional[str] = None, + metadata_options: Optional[str] = None, + classic_link_vpc_id: Optional[str] = None, + classic_link_vpc_security_groups: Optional[str] = None, + ) -> FakeLaunchConfiguration: valid_requests = [ instance_id is not None, image_id is not None and instance_type is not None, @@ -841,7 +888,9 @@ class AutoScalingBackend(BaseBackend): self.launch_configurations[name] = launch_configuration return launch_configuration - def describe_launch_configurations(self, names): + def describe_launch_configurations( + self, names: Optional[List[str]] + ) -> List[FakeLaunchConfiguration]: configurations = self.launch_configurations.values() if names: return [ @@ -852,24 +901,23 @@ class AutoScalingBackend(BaseBackend): else: return list(configurations) - def delete_launch_configuration(self, launch_configuration_name): + def delete_launch_configuration(self, launch_configuration_name: str) -> None: self.launch_configurations.pop(launch_configuration_name, None) - def make_int(self, value): + def make_int(self, value: Union[None, str, int]) -> Optional[int]: return int(value) if value is not None else value def put_scheduled_update_group_action( self, - name, - desired_capacity, - max_size, - min_size, - scheduled_action_name=None, - start_time=None, - end_time=None, - recurrence=None, - ): - + name: str, + desired_capacity: Union[None, str, int], + max_size: Union[None, str, int], + min_size: Union[None, str, int], + scheduled_action_name: str, + start_time: str, + end_time: str, + recurrence: str, + ) -> FakeScheduledAction: max_size = self.make_int(max_size) min_size = self.make_int(min_size) desired_capacity = self.make_int(desired_capacity) @@ -889,59 +937,60 @@ class AutoScalingBackend(BaseBackend): return scheduled_action def describe_scheduled_actions( - self, autoscaling_group_name=None, scheduled_action_names=None - ): + self, + autoscaling_group_name: Optional[str] = None, + scheduled_action_names: Optional[List[str]] = None, + ) -> List[FakeScheduledAction]: scheduled_actions = [] for scheduled_action in self.scheduled_actions.values(): if ( not autoscaling_group_name or scheduled_action.name == autoscaling_group_name ): - if scheduled_action.scheduled_action_name in scheduled_action_names: - scheduled_actions.append(scheduled_action) - elif not scheduled_action_names: + if ( + not scheduled_action_names + or scheduled_action.scheduled_action_name in scheduled_action_names + ): scheduled_actions.append(scheduled_action) return scheduled_actions - def delete_scheduled_action(self, auto_scaling_group_name, scheduled_action_name): + def delete_scheduled_action( + self, auto_scaling_group_name: str, scheduled_action_name: str + ) -> None: scheduled_action = self.describe_scheduled_actions( - auto_scaling_group_name, scheduled_action_name + auto_scaling_group_name, [scheduled_action_name] ) if scheduled_action: self.scheduled_actions.pop(scheduled_action_name, None) def create_auto_scaling_group( self, - name, - availability_zones, - desired_capacity, - max_size, - min_size, - launch_config_name, - launch_template, - vpc_zone_identifier, - default_cooldown, - health_check_period, - health_check_type, - load_balancers, - target_group_arns, - placement_group, - termination_policies, - tags, - capacity_rebalance=False, - new_instances_protected_from_scale_in=False, - instance_id=None, - mixed_instance_policy=None, - ): + name: str, + availability_zones: List[str], + desired_capacity: Union[None, str, int], + max_size: Union[None, str, int], + min_size: Union[None, str, int], + launch_config_name: str, + launch_template: Dict[str, Any], + vpc_zone_identifier: str, + default_cooldown: Optional[int], + health_check_period: Union[None, str, int], + health_check_type: Optional[str], + load_balancers: List[str], + target_group_arns: List[str], + placement_group: str, + termination_policies: str, + tags: List[Dict[str, str]], + capacity_rebalance: bool = False, + new_instances_protected_from_scale_in: bool = False, + instance_id: Optional[str] = None, + mixed_instance_policy: Optional[Dict[str, Any]] = None, + ) -> FakeAutoScalingGroup: max_size = self.make_int(max_size) min_size = self.make_int(min_size) desired_capacity = self.make_int(desired_capacity) default_cooldown = self.make_int(default_cooldown) - if health_check_period is None: - health_check_period = 300 - else: - health_check_period = self.make_int(health_check_period) # Verify only a single launch config-like parameter is provided. params = [ @@ -978,7 +1027,9 @@ class AutoScalingBackend(BaseBackend): launch_template=launch_template, vpc_zone_identifier=vpc_zone_identifier, default_cooldown=default_cooldown, - health_check_period=health_check_period, + health_check_period=self.make_int(health_check_period) + if health_check_period + else 300, health_check_type=health_check_type, load_balancers=load_balancers, target_group_arns=target_group_arns, @@ -999,18 +1050,18 @@ class AutoScalingBackend(BaseBackend): def update_auto_scaling_group( self, - name, - availability_zones, - desired_capacity, - max_size, - min_size, - launch_config_name, - launch_template, - vpc_zone_identifier, - health_check_period, - health_check_type, - new_instances_protected_from_scale_in=None, - ): + name: str, + availability_zones: List[str], + desired_capacity: Optional[int], + max_size: Optional[int], + min_size: Optional[int], + launch_config_name: str, + launch_template: Dict[str, Any], + vpc_zone_identifier: str, + health_check_period: int, + health_check_type: str, + new_instances_protected_from_scale_in: Optional[bool] = None, + ) -> FakeAutoScalingGroup: """ The parameter DefaultCooldown, PlacementGroup, TerminationPolicies are not yet implemented """ @@ -1037,18 +1088,22 @@ class AutoScalingBackend(BaseBackend): ) return group - def describe_auto_scaling_groups(self, names) -> [FakeAutoScalingGroup]: + def describe_auto_scaling_groups( + self, names: List[str] + ) -> List[FakeAutoScalingGroup]: groups = self.autoscaling_groups.values() if names: return [group for group in groups if group.name in names] else: return list(groups) - def delete_auto_scaling_group(self, group_name): + def delete_auto_scaling_group(self, group_name: str) -> None: self.set_desired_capacity(group_name, 0) self.autoscaling_groups.pop(group_name, None) - def describe_auto_scaling_instances(self, instance_ids): + def describe_auto_scaling_instances( + self, instance_ids: List[str] + ) -> List[InstanceState]: instance_states = [] for group in self.autoscaling_groups.values(): instance_states.extend( @@ -1060,11 +1115,11 @@ class AutoScalingBackend(BaseBackend): ) return instance_states - def attach_instances(self, group_name, instance_ids): + def attach_instances(self, group_name: str, instance_ids: List[str]) -> None: group = self.autoscaling_groups[group_name] original_size = len(group.instance_states) - if (original_size + len(instance_ids)) > group.max_size: + if (original_size + len(instance_ids)) > group.max_size: # type: ignore[operator] raise ResourceContentionError else: group.desired_capacity = original_size + len(instance_ids) @@ -1084,7 +1139,7 @@ class AutoScalingBackend(BaseBackend): self.update_attached_elbs(group.name) self.update_attached_target_groups(group.name) - def set_instance_health(self, instance_id, health_status): + def set_instance_health(self, instance_id: str, health_status: str) -> None: """ The ShouldRespectGracePeriod-parameter is not yet implemented """ @@ -1097,7 +1152,9 @@ class AutoScalingBackend(BaseBackend): ) instance_state.health_status = health_status - def detach_instances(self, group_name, instance_ids, should_decrement): + def detach_instances( + self, group_name: str, instance_ids: List[str], should_decrement: bool + ) -> List[InstanceState]: group = self.autoscaling_groups[group_name] original_size = group.desired_capacity @@ -1115,22 +1172,28 @@ class AutoScalingBackend(BaseBackend): group.instance_states = new_instance_state if should_decrement: - group.desired_capacity = original_size - len(instance_ids) + group.desired_capacity = original_size - len(instance_ids) # type: ignore[operator] group.set_desired_capacity(group.desired_capacity) return detached_instances - def set_desired_capacity(self, group_name, desired_capacity): + def set_desired_capacity( + self, group_name: str, desired_capacity: Optional[int] + ) -> None: group = self.autoscaling_groups[group_name] group.set_desired_capacity(desired_capacity) self.update_attached_elbs(group_name) - def change_capacity(self, group_name, scaling_adjustment): + def change_capacity( + self, group_name: str, scaling_adjustment: Optional[int] + ) -> None: group = self.autoscaling_groups[group_name] - desired_capacity = group.desired_capacity + scaling_adjustment + desired_capacity = group.desired_capacity + scaling_adjustment # type: ignore[operator] self.set_desired_capacity(group_name, desired_capacity) - def change_capacity_percent(self, group_name, scaling_adjustment): + def change_capacity_percent( + self, group_name: str, scaling_adjustment: Optional[int] + ) -> None: """http://docs.aws.amazon.com/AutoScaling/latest/DeveloperGuide/as-scale-based-on-demand.html If PercentChangeInCapacity returns a value between 0 and 1, Auto Scaling will round it off to 1. If the PercentChangeInCapacity @@ -1138,21 +1201,30 @@ class AutoScalingBackend(BaseBackend): lower value. For example, if PercentChangeInCapacity returns 12.5, then Auto Scaling will round it off to 12.""" group = self.autoscaling_groups[group_name] - percent_change = 1 + (scaling_adjustment / 100.0) - desired_capacity = group.desired_capacity * percent_change - if group.desired_capacity < desired_capacity < group.desired_capacity + 1: - desired_capacity = group.desired_capacity + 1 + percent_change = 1 + (scaling_adjustment / 100.0) # type: ignore[operator] + desired_capacity = group.desired_capacity * percent_change # type: ignore[operator] + if group.desired_capacity < desired_capacity < group.desired_capacity + 1: # type: ignore[operator] + desired_capacity = group.desired_capacity + 1 # type: ignore[operator] else: desired_capacity = int(desired_capacity) self.set_desired_capacity(group_name, desired_capacity) - def create_lifecycle_hook(self, name, as_name, transition, timeout, result): + def create_lifecycle_hook( + self, + name: str, + as_name: str, + transition: str, + timeout: Optional[int], + result: str, + ) -> FakeLifeCycleHook: lifecycle_hook = FakeLifeCycleHook(name, as_name, transition, timeout, result) self.lifecycle_hooks["%s_%s" % (as_name, name)] = lifecycle_hook return lifecycle_hook - def describe_lifecycle_hooks(self, as_name, lifecycle_hook_names=None): + def describe_lifecycle_hooks( + self, as_name: str, lifecycle_hook_names: Optional[str] = None + ) -> List[FakeLifeCycleHook]: return [ lifecycle_hook for lifecycle_hook in self.lifecycle_hooks.values() @@ -1162,24 +1234,24 @@ class AutoScalingBackend(BaseBackend): ) ] - def delete_lifecycle_hook(self, as_name, name): + def delete_lifecycle_hook(self, as_name: str, name: str) -> None: self.lifecycle_hooks.pop("%s_%s" % (as_name, name), None) def put_scaling_policy( self, - name, - policy_type, - metric_aggregation_type, - adjustment_type, - as_name, - min_adjustment_magnitude, - scaling_adjustment, - cooldown, - target_tracking_config, - step_adjustments, - estimated_instance_warmup, - predictive_scaling_configuration, - ): + name: str, + policy_type: str, + metric_aggregation_type: str, + adjustment_type: str, + as_name: str, + min_adjustment_magnitude: str, + scaling_adjustment: Optional[int], + cooldown: Optional[int], + target_tracking_config: str, + step_adjustments: str, + estimated_instance_warmup: str, + predictive_scaling_configuration: str, + ) -> FakeScalingPolicy: policy = FakeScalingPolicy( name, policy_type, @@ -1200,8 +1272,11 @@ class AutoScalingBackend(BaseBackend): return policy def describe_policies( - self, autoscaling_group_name=None, policy_names=None, policy_types=None - ): + self, + autoscaling_group_name: Optional[str] = None, + policy_names: Optional[List[str]] = None, + policy_types: Optional[List[str]] = None, + ) -> List[FakeScalingPolicy]: return [ policy for policy in self.policies.values() @@ -1210,14 +1285,14 @@ class AutoScalingBackend(BaseBackend): and (not policy_types or policy.policy_type in policy_types) ] - def delete_policy(self, group_name): + def delete_policy(self, group_name: str) -> None: self.policies.pop(group_name, None) - def execute_policy(self, group_name): + def execute_policy(self, group_name: str) -> None: policy = self.policies[group_name] policy.execute() - def update_attached_elbs(self, group_name): + def update_attached_elbs(self, group_name: str) -> None: group = self.autoscaling_groups[group_name] group_instance_ids = set( state.instance.id for state in group.active_instances() @@ -1242,7 +1317,7 @@ class AutoScalingBackend(BaseBackend): elb.name, elb_instace_ids - group_instance_ids, from_autoscaling=True ) - def update_attached_target_groups(self, group_name): + def update_attached_target_groups(self, group_name: str) -> None: group = self.autoscaling_groups[group_name] group_instance_ids = set(state.instance.id for state in group.instance_states) @@ -1262,7 +1337,7 @@ class AutoScalingBackend(BaseBackend): ] self.elbv2_backend.register_targets(target_group.arn, (asg_targets)) - def create_or_update_tags(self, tags): + def create_or_update_tags(self, tags: List[Dict[str, str]]) -> None: for tag in tags: group_name = tag["resource_id"] group = self.autoscaling_groups[group_name] @@ -1282,7 +1357,7 @@ class AutoScalingBackend(BaseBackend): group.tags = new_tags - def delete_tags(self, tags): + def delete_tags(self, tags: List[Dict[str, str]]) -> None: for tag_to_delete in tags: group_name = tag_to_delete["resource_id"] key_to_delete = tag_to_delete["key"] @@ -1290,17 +1365,21 @@ class AutoScalingBackend(BaseBackend): old_tags = group.tags group.tags = [x for x in old_tags if x["key"] != key_to_delete] - def attach_load_balancers(self, group_name, load_balancer_names): + def attach_load_balancers( + self, group_name: str, load_balancer_names: List[str] + ) -> None: group = self.autoscaling_groups[group_name] group.load_balancers.extend( [x for x in load_balancer_names if x not in group.load_balancers] ) self.update_attached_elbs(group_name) - def describe_load_balancers(self, group_name): + def describe_load_balancers(self, group_name: str) -> List[str]: return self.autoscaling_groups[group_name].load_balancers - def detach_load_balancers(self, group_name, load_balancer_names): + def detach_load_balancers( + self, group_name: str, load_balancer_names: List[str] + ) -> None: group = self.autoscaling_groups[group_name] group_instance_ids = set(state.instance.id for state in group.instance_states) elbs = self.elb_backend.describe_load_balancers(names=group.load_balancers) @@ -1312,15 +1391,19 @@ class AutoScalingBackend(BaseBackend): x for x in group.load_balancers if x not in load_balancer_names ] - def attach_load_balancer_target_groups(self, group_name, target_group_arns): + def attach_load_balancer_target_groups( + self, group_name: str, target_group_arns: List[str] + ) -> None: group = self.autoscaling_groups[group_name] group.append_target_groups(target_group_arns) self.update_attached_target_groups(group_name) - def describe_load_balancer_target_groups(self, group_name): + def describe_load_balancer_target_groups(self, group_name: str) -> List[str]: return self.autoscaling_groups[group_name].target_group_arns - def detach_load_balancer_target_groups(self, group_name, target_group_arns): + def detach_load_balancer_target_groups( + self, group_name: str, target_group_arns: List[str] + ) -> None: group = self.autoscaling_groups[group_name] group.target_group_arns = [ x for x in group.target_group_arns if x not in target_group_arns @@ -1329,7 +1412,7 @@ class AutoScalingBackend(BaseBackend): asg_targets = [{"id": x.instance.id} for x in group.instance_states] self.elbv2_backend.deregister_targets(target_group, (asg_targets)) - def suspend_processes(self, group_name, scaling_processes): + def suspend_processes(self, group_name: str, scaling_processes: List[str]) -> None: all_proc_names = [ "Launch", "Terminate", @@ -1347,7 +1430,7 @@ class AutoScalingBackend(BaseBackend): set(group.suspended_processes).union(set_to_add) ) - def resume_processes(self, group_name, scaling_processes): + def resume_processes(self, group_name: str, scaling_processes: List[str]) -> None: group = self.autoscaling_groups[group_name] if scaling_processes: group.suspended_processes = list( @@ -1357,8 +1440,11 @@ class AutoScalingBackend(BaseBackend): group.suspended_processes = [] def set_instance_protection( - self, group_name, instance_ids, protected_from_scale_in - ): + self, + group_name: str, + instance_ids: List[str], + protected_from_scale_in: Optional[bool], + ) -> None: group = self.autoscaling_groups[group_name] protected_instances = [ x for x in group.instance_states if x.instance.id in instance_ids @@ -1366,7 +1452,7 @@ class AutoScalingBackend(BaseBackend): for instance in protected_instances: instance.protected_from_scale_in = protected_from_scale_in - def notify_terminate_instances(self, instance_ids): + def notify_terminate_instances(self, instance_ids: List[str]) -> None: for ( autoscaling_group_name, autoscaling_group, @@ -1387,7 +1473,9 @@ class AutoScalingBackend(BaseBackend): ) self.update_attached_elbs(autoscaling_group_name) - def enter_standby_instances(self, group_name, instance_ids, should_decrement): + def enter_standby_instances( + self, group_name: str, instance_ids: List[str], should_decrement: bool + ) -> Tuple[List[InstanceState], Optional[int], Optional[int]]: group = self.autoscaling_groups[group_name] original_size = group.desired_capacity standby_instances = [] @@ -1396,11 +1484,13 @@ class AutoScalingBackend(BaseBackend): instance_state.lifecycle_state = "Standby" standby_instances.append(instance_state) if should_decrement: - group.desired_capacity = group.desired_capacity - len(instance_ids) + group.desired_capacity = group.desired_capacity - len(instance_ids) # type: ignore[operator] group.set_desired_capacity(group.desired_capacity) return standby_instances, original_size, group.desired_capacity - def exit_standby_instances(self, group_name, instance_ids): + def exit_standby_instances( + self, group_name: str, instance_ids: List[str] + ) -> Tuple[List[InstanceState], Optional[int], int]: group = self.autoscaling_groups[group_name] original_size = group.desired_capacity standby_instances = [] @@ -1408,11 +1498,13 @@ class AutoScalingBackend(BaseBackend): if instance_state.instance.id in instance_ids: instance_state.lifecycle_state = "InService" standby_instances.append(instance_state) - group.desired_capacity = group.desired_capacity + len(instance_ids) + group.desired_capacity = group.desired_capacity + len(instance_ids) # type: ignore[operator] group.set_desired_capacity(group.desired_capacity) return standby_instances, original_size, group.desired_capacity - def terminate_instance(self, instance_id, should_decrement): + def terminate_instance( + self, instance_id: str, should_decrement: bool + ) -> Tuple[InstanceState, Any, Any]: instance = self.ec2_backend.get_instance(instance_id) instance_state = next( instance_state @@ -1420,13 +1512,13 @@ class AutoScalingBackend(BaseBackend): for instance_state in group.instance_states if instance_state.instance.id == instance.id ) - group = instance.autoscaling_group + group = instance.autoscaling_group # type: ignore[attr-defined] original_size = group.desired_capacity self.detach_instances(group.name, [instance.id], should_decrement) self.ec2_backend.terminate_instances([instance.id]) return instance_state, original_size, group.desired_capacity - def describe_tags(self, filters): + def describe_tags(self, filters: List[Dict[str, str]]) -> List[Dict[str, str]]: """ Pagination is not yet implemented. Only the `auto-scaling-group` and `propagate-at-launch` filters are implemented. @@ -1445,7 +1537,7 @@ class AutoScalingBackend(BaseBackend): ] return tags - def enable_metrics_collection(self, group_name, metrics): + def enable_metrics_collection(self, group_name: str, metrics: List[str]) -> None: group = self.describe_auto_scaling_groups([group_name])[0] group.enable_metrics_collection(metrics) diff --git a/moto/autoscaling/responses.py b/moto/autoscaling/responses.py index 608c29c15..f3731272c 100644 --- a/moto/autoscaling/responses.py +++ b/moto/autoscaling/responses.py @@ -3,18 +3,18 @@ import datetime from moto.core.responses import BaseResponse from moto.core.utils import iso_8601_datetime_with_milliseconds from moto.utilities.aws_headers import amz_crc32, amzn_request_id -from .models import autoscaling_backends +from .models import autoscaling_backends, AutoScalingBackend class AutoScalingResponse(BaseResponse): - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="autoscaling") @property - def autoscaling_backend(self): + def autoscaling_backend(self) -> AutoScalingBackend: return autoscaling_backends[self.current_account][self.region] - def create_launch_configuration(self): + def create_launch_configuration(self) -> str: instance_monitoring_string = self._get_param("InstanceMonitoring.Enabled") if instance_monitoring_string == "true": instance_monitoring = True @@ -44,7 +44,7 @@ class AutoScalingResponse(BaseResponse): template = self.response_template(CREATE_LAUNCH_CONFIGURATION_TEMPLATE) return template.render() - def describe_launch_configurations(self): + def describe_launch_configurations(self) -> str: names = self._get_multi_param("LaunchConfigurationNames.member") all_launch_configurations = ( self.autoscaling_backend.describe_launch_configurations(names) @@ -55,9 +55,8 @@ class AutoScalingResponse(BaseResponse): start = all_names.index(marker) + 1 else: start = 0 - max_records = self._get_int_param( - "MaxRecords", 50 - ) # the default is 100, but using 50 to make testing easier + # the default is 100, but using 50 to make testing easier + max_records = self._get_int_param("MaxRecords") or 50 launch_configurations_resp = all_launch_configurations[ start : start + max_records ] @@ -70,13 +69,13 @@ class AutoScalingResponse(BaseResponse): launch_configurations=launch_configurations_resp, next_token=next_token ) - def delete_launch_configuration(self): - launch_configurations_name = self.querystring.get("LaunchConfigurationName")[0] + def delete_launch_configuration(self) -> str: + launch_configurations_name = self.querystring.get("LaunchConfigurationName")[0] # type: ignore[index] self.autoscaling_backend.delete_launch_configuration(launch_configurations_name) template = self.response_template(DELETE_LAUNCH_CONFIGURATION_TEMPLATE) return template.render() - def create_auto_scaling_group(self): + def create_auto_scaling_group(self) -> str: params = self._get_params() self.autoscaling_backend.create_auto_scaling_group( name=self._get_param("AutoScalingGroupName"), @@ -97,7 +96,7 @@ class AutoScalingResponse(BaseResponse): placement_group=self._get_param("PlacementGroup"), termination_policies=self._get_multi_param("TerminationPolicies.member"), tags=self._get_list_prefix("Tags.member"), - capacity_rebalance=self._get_bool_param("CapacityRebalance"), + capacity_rebalance=self._get_bool_param("CapacityRebalance", False), new_instances_protected_from_scale_in=self._get_bool_param( "NewInstancesProtectedFromScaleIn", False ), @@ -105,7 +104,7 @@ class AutoScalingResponse(BaseResponse): template = self.response_template(CREATE_AUTOSCALING_GROUP_TEMPLATE) return template.render() - def put_scheduled_update_group_action(self): + def put_scheduled_update_group_action(self) -> str: self.autoscaling_backend.put_scheduled_update_group_action( name=self._get_param("AutoScalingGroupName"), desired_capacity=self._get_int_param("DesiredCapacity"), @@ -119,7 +118,7 @@ class AutoScalingResponse(BaseResponse): template = self.response_template(PUT_SCHEDULED_UPDATE_GROUP_ACTION_TEMPLATE) return template.render() - def describe_scheduled_actions(self): + def describe_scheduled_actions(self) -> str: scheduled_actions = self.autoscaling_backend.describe_scheduled_actions( autoscaling_group_name=self._get_param("AutoScalingGroupName"), scheduled_action_names=self._get_multi_param("ScheduledActionNames.member"), @@ -127,7 +126,7 @@ class AutoScalingResponse(BaseResponse): template = self.response_template(DESCRIBE_SCHEDULED_ACTIONS) return template.render(scheduled_actions=scheduled_actions) - def delete_scheduled_action(self): + def delete_scheduled_action(self) -> str: auto_scaling_group_name = self._get_param("AutoScalingGroupName") scheduled_action_name = self._get_param("ScheduledActionName") self.autoscaling_backend.delete_scheduled_action( @@ -137,13 +136,13 @@ class AutoScalingResponse(BaseResponse): template = self.response_template(DELETE_SCHEDULED_ACTION_TEMPLATE) return template.render() - def describe_scaling_activities(self): + def describe_scaling_activities(self) -> str: template = self.response_template(DESCRIBE_SCALING_ACTIVITIES_TEMPLATE) return template.render() @amz_crc32 @amzn_request_id - def attach_instances(self): + def attach_instances(self) -> str: group_name = self._get_param("AutoScalingGroupName") instance_ids = self._get_multi_param("InstanceIds.member") self.autoscaling_backend.attach_instances(group_name, instance_ids) @@ -152,7 +151,7 @@ class AutoScalingResponse(BaseResponse): @amz_crc32 @amzn_request_id - def set_instance_health(self): + def set_instance_health(self) -> str: instance_id = self._get_param("InstanceId") health_status = self._get_param("HealthStatus") if health_status not in ["Healthy", "Unhealthy"]: @@ -163,7 +162,7 @@ class AutoScalingResponse(BaseResponse): @amz_crc32 @amzn_request_id - def detach_instances(self): + def detach_instances(self) -> str: group_name = self._get_param("AutoScalingGroupName") instance_ids = self._get_multi_param("InstanceIds.member") should_decrement_string = self._get_param("ShouldDecrementDesiredCapacity") @@ -179,7 +178,7 @@ class AutoScalingResponse(BaseResponse): @amz_crc32 @amzn_request_id - def attach_load_balancer_target_groups(self): + def attach_load_balancer_target_groups(self) -> str: group_name = self._get_param("AutoScalingGroupName") target_group_arns = self._get_multi_param("TargetGroupARNs.member") @@ -191,7 +190,7 @@ class AutoScalingResponse(BaseResponse): @amz_crc32 @amzn_request_id - def describe_load_balancer_target_groups(self): + def describe_load_balancer_target_groups(self) -> str: group_name = self._get_param("AutoScalingGroupName") target_group_arns = ( self.autoscaling_backend.describe_load_balancer_target_groups(group_name) @@ -201,7 +200,7 @@ class AutoScalingResponse(BaseResponse): @amz_crc32 @amzn_request_id - def detach_load_balancer_target_groups(self): + def detach_load_balancer_target_groups(self) -> str: group_name = self._get_param("AutoScalingGroupName") target_group_arns = self._get_multi_param("TargetGroupARNs.member") @@ -211,7 +210,7 @@ class AutoScalingResponse(BaseResponse): template = self.response_template(DETACH_LOAD_BALANCER_TARGET_GROUPS_TEMPLATE) return template.render() - def describe_auto_scaling_groups(self): + def describe_auto_scaling_groups(self) -> str: names = self._get_multi_param("AutoScalingGroupNames.member") token = self._get_param("NextToken") all_groups = self.autoscaling_backend.describe_auto_scaling_groups(names) @@ -230,7 +229,7 @@ class AutoScalingResponse(BaseResponse): template = self.response_template(DESCRIBE_AUTOSCALING_GROUPS_TEMPLATE) return template.render(groups=groups, next_token=next_token) - def update_auto_scaling_group(self): + def update_auto_scaling_group(self) -> str: self.autoscaling_backend.update_auto_scaling_group( name=self._get_param("AutoScalingGroupName"), availability_zones=self._get_multi_param("AvailabilityZones.member"), @@ -249,41 +248,41 @@ class AutoScalingResponse(BaseResponse): template = self.response_template(UPDATE_AUTOSCALING_GROUP_TEMPLATE) return template.render() - def delete_auto_scaling_group(self): + def delete_auto_scaling_group(self) -> str: group_name = self._get_param("AutoScalingGroupName") self.autoscaling_backend.delete_auto_scaling_group(group_name) template = self.response_template(DELETE_AUTOSCALING_GROUP_TEMPLATE) return template.render() - def set_desired_capacity(self): + def set_desired_capacity(self) -> str: group_name = self._get_param("AutoScalingGroupName") desired_capacity = self._get_int_param("DesiredCapacity") self.autoscaling_backend.set_desired_capacity(group_name, desired_capacity) template = self.response_template(SET_DESIRED_CAPACITY_TEMPLATE) return template.render() - def create_or_update_tags(self): + def create_or_update_tags(self) -> str: tags = self._get_list_prefix("Tags.member") self.autoscaling_backend.create_or_update_tags(tags) template = self.response_template(UPDATE_AUTOSCALING_GROUP_TEMPLATE) return template.render() - def delete_tags(self): + def delete_tags(self) -> str: tags = self._get_list_prefix("Tags.member") self.autoscaling_backend.delete_tags(tags) template = self.response_template(UPDATE_AUTOSCALING_GROUP_TEMPLATE) return template.render() - def describe_auto_scaling_instances(self): + def describe_auto_scaling_instances(self) -> str: instance_states = self.autoscaling_backend.describe_auto_scaling_instances( instance_ids=self._get_multi_param("InstanceIds.member") ) template = self.response_template(DESCRIBE_AUTOSCALING_INSTANCES_TEMPLATE) return template.render(instance_states=instance_states) - def put_lifecycle_hook(self): + def put_lifecycle_hook(self) -> str: lifecycle_hook = self.autoscaling_backend.create_lifecycle_hook( name=self._get_param("LifecycleHookName"), as_name=self._get_param("AutoScalingGroupName"), @@ -294,7 +293,7 @@ class AutoScalingResponse(BaseResponse): template = self.response_template(CREATE_LIFECYLE_HOOK_TEMPLATE) return template.render(lifecycle_hook=lifecycle_hook) - def describe_lifecycle_hooks(self): + def describe_lifecycle_hooks(self) -> str: lifecycle_hooks = self.autoscaling_backend.describe_lifecycle_hooks( as_name=self._get_param("AutoScalingGroupName"), lifecycle_hook_names=self._get_multi_param("LifecycleHookNames.member"), @@ -302,14 +301,14 @@ class AutoScalingResponse(BaseResponse): template = self.response_template(DESCRIBE_LIFECYCLE_HOOKS_TEMPLATE) return template.render(lifecycle_hooks=lifecycle_hooks) - def delete_lifecycle_hook(self): + def delete_lifecycle_hook(self) -> str: as_name = self._get_param("AutoScalingGroupName") name = self._get_param("LifecycleHookName") self.autoscaling_backend.delete_lifecycle_hook(as_name, name) template = self.response_template(DELETE_LIFECYCLE_HOOK_TEMPLATE) return template.render() - def put_scaling_policy(self): + def put_scaling_policy(self) -> str: params = self._get_params() policy = self.autoscaling_backend.put_scaling_policy( name=params.get("PolicyName"), @@ -330,7 +329,7 @@ class AutoScalingResponse(BaseResponse): template = self.response_template(CREATE_SCALING_POLICY_TEMPLATE) return template.render(policy=policy) - def describe_policies(self): + def describe_policies(self) -> str: policies = self.autoscaling_backend.describe_policies( autoscaling_group_name=self._get_param("AutoScalingGroupName"), policy_names=self._get_multi_param("PolicyNames.member"), @@ -339,13 +338,13 @@ class AutoScalingResponse(BaseResponse): template = self.response_template(DESCRIBE_SCALING_POLICIES_TEMPLATE) return template.render(policies=policies) - def delete_policy(self): + def delete_policy(self) -> str: group_name = self._get_param("PolicyName") self.autoscaling_backend.delete_policy(group_name) template = self.response_template(DELETE_POLICY_TEMPLATE) return template.render() - def execute_policy(self): + def execute_policy(self) -> str: group_name = self._get_param("PolicyName") self.autoscaling_backend.execute_policy(group_name) template = self.response_template(EXECUTE_POLICY_TEMPLATE) @@ -353,7 +352,7 @@ class AutoScalingResponse(BaseResponse): @amz_crc32 @amzn_request_id - def attach_load_balancers(self): + def attach_load_balancers(self) -> str: group_name = self._get_param("AutoScalingGroupName") load_balancer_names = self._get_multi_param("LoadBalancerNames.member") self.autoscaling_backend.attach_load_balancers(group_name, load_balancer_names) @@ -362,7 +361,7 @@ class AutoScalingResponse(BaseResponse): @amz_crc32 @amzn_request_id - def describe_load_balancers(self): + def describe_load_balancers(self) -> str: group_name = self._get_param("AutoScalingGroupName") load_balancers = self.autoscaling_backend.describe_load_balancers(group_name) template = self.response_template(DESCRIBE_LOAD_BALANCERS_TEMPLATE) @@ -370,7 +369,7 @@ class AutoScalingResponse(BaseResponse): @amz_crc32 @amzn_request_id - def detach_load_balancers(self): + def detach_load_balancers(self) -> str: group_name = self._get_param("AutoScalingGroupName") load_balancer_names = self._get_multi_param("LoadBalancerNames.member") self.autoscaling_backend.detach_load_balancers(group_name, load_balancer_names) @@ -379,7 +378,7 @@ class AutoScalingResponse(BaseResponse): @amz_crc32 @amzn_request_id - def enter_standby(self): + def enter_standby(self) -> str: group_name = self._get_param("AutoScalingGroupName") instance_ids = self._get_multi_param("InstanceIds.member") should_decrement_string = self._get_param("ShouldDecrementDesiredCapacity") @@ -405,7 +404,7 @@ class AutoScalingResponse(BaseResponse): @amz_crc32 @amzn_request_id - def exit_standby(self): + def exit_standby(self) -> str: group_name = self._get_param("AutoScalingGroupName") instance_ids = self._get_multi_param("InstanceIds.member") ( @@ -421,7 +420,7 @@ class AutoScalingResponse(BaseResponse): timestamp=iso_8601_datetime_with_milliseconds(datetime.datetime.utcnow()), ) - def suspend_processes(self): + def suspend_processes(self) -> str: autoscaling_group_name = self._get_param("AutoScalingGroupName") scaling_processes = self._get_multi_param("ScalingProcesses.member") self.autoscaling_backend.suspend_processes( @@ -430,7 +429,7 @@ class AutoScalingResponse(BaseResponse): template = self.response_template(SUSPEND_PROCESSES_TEMPLATE) return template.render() - def resume_processes(self): + def resume_processes(self) -> str: autoscaling_group_name = self._get_param("AutoScalingGroupName") scaling_processes = self._get_multi_param("ScalingProcesses.member") self.autoscaling_backend.resume_processes( @@ -439,7 +438,7 @@ class AutoScalingResponse(BaseResponse): template = self.response_template(RESUME_PROCESSES_TEMPLATE) return template.render() - def set_instance_protection(self): + def set_instance_protection(self) -> str: group_name = self._get_param("AutoScalingGroupName") instance_ids = self._get_multi_param("InstanceIds.member") protected_from_scale_in = self._get_bool_param("ProtectedFromScaleIn") @@ -451,7 +450,7 @@ class AutoScalingResponse(BaseResponse): @amz_crc32 @amzn_request_id - def terminate_instance_in_auto_scaling_group(self): + def terminate_instance_in_auto_scaling_group(self) -> str: instance_id = self._get_param("InstanceId") should_decrement_string = self._get_param("ShouldDecrementDesiredCapacity") if should_decrement_string == "true": @@ -472,13 +471,13 @@ class AutoScalingResponse(BaseResponse): timestamp=iso_8601_datetime_with_milliseconds(datetime.datetime.utcnow()), ) - def describe_tags(self): + def describe_tags(self) -> str: filters = self._get_params().get("Filters", []) tags = self.autoscaling_backend.describe_tags(filters=filters) template = self.response_template(DESCRIBE_TAGS_TEMPLATE) return template.render(tags=tags, next_token=None) - def enable_metrics_collection(self): + def enable_metrics_collection(self) -> str: group_name = self._get_param("AutoScalingGroupName") metrics = self._get_params().get("Metrics") self.autoscaling_backend.enable_metrics_collection(group_name, metrics) diff --git a/moto/core/exceptions.py b/moto/core/exceptions.py index 7edd796b7..a5b0630f3 100644 --- a/moto/core/exceptions.py +++ b/moto/core/exceptions.py @@ -50,7 +50,9 @@ class RESTError(HTTPException): "error": ERROR_RESPONSE, } - def __init__(self, error_type, message, template="error", **kwargs): + def __init__( + self, error_type: str, message: str, template: str = "error", **kwargs: Any + ): super().__init__() self.error_type = error_type self.message = message diff --git a/moto/core/responses.py b/moto/core/responses.py index cce796499..0b6a504c7 100644 --- a/moto/core/responses.py +++ b/moto/core/responses.py @@ -14,8 +14,8 @@ from moto import settings from moto.core.exceptions import DryRunClientError from moto.core.utils import camelcase_to_underscores, method_names_from_class from moto.utilities.utils import load_resource -from jinja2 import Environment, DictLoader -from typing import Dict, List, Union, Any, Optional, Tuple +from jinja2 import Environment, DictLoader, Template +from typing import Dict, Union, Any, Tuple, TypeVar from urllib.parse import parse_qs, parse_qsl, urlparse from werkzeug.exceptions import HTTPException from xml.dom.minidom import parseString as parseXML @@ -26,6 +26,7 @@ log = logging.getLogger(__name__) JINJA_ENVS = {} TYPE_RESPONSE = Tuple[int, Dict[str, str], str] +TYPE_IF_NONE = TypeVar("TYPE_IF_NONE") def _decode_dict(d): @@ -105,7 +106,7 @@ class _TemplateEnvironmentMixin(object): """ return str(id(source)) - def response_template(self, source): + def response_template(self, source: str) -> Template: template_id = self._make_template_id(source) if not self.contains_template(template_id): if settings.PRETTIFY_RESPONSES: @@ -501,13 +502,17 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin): pass return if_none - def _get_int_param(self, param_name, if_none: int = None) -> Optional[int]: + def _get_int_param( + self, param_name, if_none: TYPE_IF_NONE = None + ) -> Union[int, TYPE_IF_NONE]: val = self._get_param(param_name) if val is not None: return int(val) return if_none - def _get_bool_param(self, param_name, if_none: bool = None) -> Optional[bool]: + def _get_bool_param( + self, param_name, if_none: TYPE_IF_NONE = None + ) -> Union[bool, TYPE_IF_NONE]: val = self._get_param(param_name) if val is not None: val = str(val) @@ -588,9 +593,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin): return value_dict - def _get_multi_param( - self, param_prefix, skip_result_conversion=False - ) -> List[Union[str, Dict]]: + def _get_multi_param(self, param_prefix, skip_result_conversion=False) -> Any: """ Given a querystring of ?LaunchConfigurationNames.member.1=my-test-1&LaunchConfigurationNames.member.2=my-test-2 this will return ['my-test-1', 'my-test-2'] @@ -635,7 +638,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin): ] return params - def _get_params(self) -> Dict[str, Union[str, List, Dict]]: + def _get_params(self) -> Any: """ Given a querystring of { @@ -710,7 +713,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin): else: obj[keylist[-1]] = value - def _get_list_prefix(self, param_prefix): + def _get_list_prefix(self, param_prefix: str) -> Any: """ Given a query dict like { diff --git a/moto/core/utils.py b/moto/core/utils.py index 115310ff5..4fa4416df 100644 --- a/moto/core/utils.py +++ b/moto/core/utils.py @@ -12,7 +12,7 @@ from urllib.parse import urlparse from uuid import uuid4 -def camelcase_to_underscores(argument): +def camelcase_to_underscores(argument: Optional[str]) -> str: """Converts a camelcase param like theNewAttribute to the equivalent python underscore variable like the_new_attribute""" result = "" @@ -147,7 +147,7 @@ class convert_flask_to_responses_response(object): return status, headers, response -def iso_8601_datetime_with_milliseconds(value): +def iso_8601_datetime_with_milliseconds(value: datetime) -> str: return value.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" diff --git a/moto/ec2/models/instances.py b/moto/ec2/models/instances.py index 417335c37..c159055cf 100644 --- a/moto/ec2/models/instances.py +++ b/moto/ec2/models/instances.py @@ -2,6 +2,7 @@ import copy import warnings from collections import OrderedDict from datetime import datetime +from typing import Any, List, Tuple from moto import settings from moto.core import CloudFormationModel @@ -69,7 +70,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel): def __init__(self, ec2_backend, image_id, user_data, security_groups, **kwargs): super().__init__() self.ec2_backend = ec2_backend - self.id = random_instance_id() + self.id: str = random_instance_id() self.owner_id = ec2_backend.account_id self.lifecycle = kwargs.get("lifecycle") @@ -79,9 +80,9 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel): if launch_template_arg and not image_id: # the image id from the template should be used template_version = ec2_backend._get_template_from_args(launch_template_arg) - self.image_id = template_version.image_id + self.image_id: str = template_version.image_id else: - self.image_id = image_id + self.image_id: str = image_id # Check if we have tags to process if launch_template_arg: template_version = ec2_backend._get_template_from_args(launch_template_arg) @@ -95,7 +96,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel): self._state_reason = StateReason() self.user_data = user_data self.security_groups = security_groups - self.instance_type = kwargs.get("instance_type", "m1.small") + self.instance_type: str = kwargs.get("instance_type", "m1.small") self.region_name = kwargs.get("region_name", "us-east-1") placement = kwargs.get("placement", None) self.subnet_id = kwargs.get("subnet_id") @@ -159,7 +160,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel): else: self._placement.zone = ec2_backend.region_name + "a" - self.block_device_mapping = BlockDeviceMapping() + self.block_device_mapping: BlockDeviceMapping = BlockDeviceMapping() self._private_ips = set() self.prep_nics( @@ -592,7 +593,14 @@ class InstanceBackend: return instance raise InvalidInstanceIdError(instance_id) - def add_instances(self, image_id, count, user_data, security_group_names, **kwargs): + def add_instances( + self, + image_id: str, + count: int, + user_data: str, + security_group_names: List[str], + **kwargs: Any + ) -> Reservation: location_type = "availability-zone" if kwargs.get("placement") else "region" default_region = "us-east-1" if settings.ENABLE_KEYPAIR_VALIDATION: @@ -734,7 +742,7 @@ class InstanceBackend: return stopped_instances - def terminate_instances(self, instance_ids): + def terminate_instances(self, instance_ids: List[str]) -> List[Tuple[str, str]]: terminated_instances = [] if not instance_ids: raise EC2ClientError( diff --git a/moto/ec2/models/launch_templates.py b/moto/ec2/models/launch_templates.py index 1a24bca40..69ecf9def 100644 --- a/moto/ec2/models/launch_templates.py +++ b/moto/ec2/models/launch_templates.py @@ -179,10 +179,10 @@ class LaunchTemplateBackend: self.launch_template_insert_order.append(template.id) return template - def get_launch_template(self, template_id): + def get_launch_template(self, template_id: str) -> LaunchTemplate: return self.launch_templates[template_id] - def get_launch_template_by_name(self, name): + def get_launch_template_by_name(self, name: str) -> LaunchTemplate: if name not in self.launch_template_name_to_ids: raise InvalidLaunchTemplateNameNotFoundWithNameError(name) return self.get_launch_template(self.launch_template_name_to_ids[name]) diff --git a/moto/ec2/models/subnets.py b/moto/ec2/models/subnets.py index e8b3bbcc0..f25f7db75 100644 --- a/moto/ec2/models/subnets.py +++ b/moto/ec2/models/subnets.py @@ -1,6 +1,7 @@ import ipaddress import itertools from collections import defaultdict +from typing import Any, Iterable, List, Optional from moto.core import CloudFormationModel from ..exceptions import ( @@ -344,7 +345,9 @@ class SubnetBackend: self.subnets[availability_zone][subnet_id] = subnet return subnet - def get_all_subnets(self, subnet_ids=None, filters=None): + def get_all_subnets( + self, subnet_ids: Optional[List[str]] = None, filters: Optional[Any] = None + ) -> Iterable[Subnet]: # Extract a list of all subnets matches = itertools.chain( *[x.copy().values() for x in self.subnets.copy().values()] diff --git a/moto/ec2/models/tags.py b/moto/ec2/models/tags.py index b56778a4f..6f8c9843d 100644 --- a/moto/ec2/models/tags.py +++ b/moto/ec2/models/tags.py @@ -1,5 +1,6 @@ import re from collections import defaultdict +from typing import Dict, List from ..exceptions import ( InvalidParameterValueErrorTagNull, @@ -18,7 +19,7 @@ class TagBackend: def __init__(self): self.tags = defaultdict(dict) - def create_tags(self, resource_ids, tags): + def create_tags(self, resource_ids: List[str], tags: Dict[str, str]): if None in set([tags[tag] for tag in tags]): raise InvalidParameterValueErrorTagNull() for resource_id in resource_ids: @@ -36,7 +37,7 @@ class TagBackend: self.tags[resource_id][tag] = tags[tag] return True - def delete_tags(self, resource_ids, tags): + def delete_tags(self, resource_ids: List[str], tags: Dict[str, str]): for resource_id in resource_ids: for tag in tags: if tag in self.tags[resource_id]: diff --git a/moto/ec2/utils.py b/moto/ec2/utils.py index 74e180b50..d235a3102 100644 --- a/moto/ec2/utils.py +++ b/moto/ec2/utils.py @@ -74,7 +74,7 @@ def random_ami_id(): return random_id(prefix=EC2_RESOURCE_TO_PREFIX["image"]) -def random_instance_id(): +def random_instance_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["instance"], size=17) diff --git a/moto/elb/models.py b/moto/elb/models.py index aa287fcd1..48b080de8 100644 --- a/moto/elb/models.py +++ b/moto/elb/models.py @@ -1,8 +1,8 @@ import datetime - import pytz - from collections import OrderedDict +from typing import List, Iterable + from moto.core import BaseBackend, BaseModel, CloudFormationModel from moto.core.utils import BackendDict from moto.ec2.models import ec2_backends @@ -359,7 +359,7 @@ class ELBBackend(BaseBackend): return balancer - def describe_load_balancers(self, names): + def describe_load_balancers(self, names: List[str]) -> List[FakeLoadBalancer]: balancers = self.load_balancers.values() if names: matched_balancers = [ @@ -465,8 +465,11 @@ class ELBBackend(BaseBackend): return balancer def register_instances( - self, load_balancer_name, instance_ids, from_autoscaling=False - ): + self, + load_balancer_name: str, + instance_ids: Iterable[str], + from_autoscaling: bool = False, + ) -> FakeLoadBalancer: load_balancer = self.get_load_balancer(load_balancer_name) attr_name = ( "instance_sparse_ids" @@ -479,8 +482,11 @@ class ELBBackend(BaseBackend): return load_balancer def deregister_instances( - self, load_balancer_name, instance_ids, from_autoscaling=False - ): + self, + load_balancer_name: str, + instance_ids: Iterable[str], + from_autoscaling: bool = False, + ) -> FakeLoadBalancer: load_balancer = self.get_load_balancer(load_balancer_name) attr_name = ( "instance_sparse_ids" diff --git a/moto/elbv2/models.py b/moto/elbv2/models.py index bd7256a7c..683a317fa 100644 --- a/moto/elbv2/models.py +++ b/moto/elbv2/models.py @@ -3,6 +3,7 @@ import re from jinja2 import Template from botocore.exceptions import ParamValidationError from collections import OrderedDict +from typing import Any, List, Dict, Iterable, Optional from moto.core.exceptions import RESTError from moto.core import BaseBackend, BaseModel, CloudFormationModel from moto.core.utils import ( @@ -1175,7 +1176,12 @@ Member must satisfy regular expression pattern: {}".format( raise RuleNotFoundError("One or more rules not found") return matched_rules - def describe_target_groups(self, load_balancer_arn, target_group_arns, names): + def describe_target_groups( + self, + load_balancer_arn: Optional[str], + target_group_arns: List[str], + names: Optional[List[str]], + ) -> Iterable[FakeTargetGroup]: if load_balancer_arn: if load_balancer_arn not in self.load_balancers: raise LoadBalancerNotFoundError() @@ -1284,13 +1290,15 @@ Member must satisfy regular expression pattern: {}".format( rule.actions = actions return rule - def register_targets(self, target_group_arn, instances): + def register_targets(self, target_group_arn: str, instances: List[Any]): target_group = self.target_groups.get(target_group_arn) if target_group is None: raise TargetGroupNotFoundError() target_group.register(instances) - def deregister_targets(self, target_group_arn, instances): + def deregister_targets( + self, target_group_arn: str, instances: List[Dict[str, Any]] + ): target_group = self.target_groups.get(target_group_arn) if target_group is None: raise TargetGroupNotFoundError() diff --git a/moto/packages/boto/ec2/blockdevicemapping.py b/moto/packages/boto/ec2/blockdevicemapping.py index db85210dd..775d29460 100644 --- a/moto/packages/boto/ec2/blockdevicemapping.py +++ b/moto/packages/boto/ec2/blockdevicemapping.py @@ -20,6 +20,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. # +from typing import Any, Dict, List, Optional, Union class BlockDeviceType(object): @@ -29,18 +30,18 @@ class BlockDeviceType(object): def __init__( self, - connection=None, - ephemeral_name=None, - no_device=False, - volume_id=None, - snapshot_id=None, - status=None, - attach_time=None, - delete_on_termination=False, - size=None, - volume_type=None, - iops=None, - encrypted=None, + connection: Optional[str] = None, + ephemeral_name: Optional[str] = None, + no_device: Union[bool, str] = False, + volume_id: Optional[str] = None, + snapshot_id: Optional[str] = None, + status: Optional[str] = None, + attach_time: Optional[str] = None, + delete_on_termination: bool = False, + size: Optional[str] = None, + volume_type: Optional[str] = None, + iops: Optional[str] = None, + encrypted: Optional[str] = None, ): self.connection = connection self.ephemeral_name = ephemeral_name @@ -55,6 +56,7 @@ class BlockDeviceType(object): self.iops = iops self.encrypted = encrypted self.kms_key_id = None + self.throughput = None # for backwards compatibility @@ -73,17 +75,7 @@ class BlockDeviceMapping(dict): reservation = image.run(..., block_device_map=bdm, ...) """ - def __init__(self, connection=None): - """ - :type connection: :class:`boto.ec2.EC2Connection` - :param connection: Optional connection. - """ - dict.__init__(self) - self.connection = connection - self.current_name = None - self.current_value = None - - def to_source_dict(self): + def to_source_dict(self) -> List[Dict[str, Any]]: return [ { "DeviceName": device_name, diff --git a/moto/utilities/aws_headers.py b/moto/utilities/aws_headers.py index 29b817fac..f3366e499 100644 --- a/moto/utilities/aws_headers.py +++ b/moto/utilities/aws_headers.py @@ -1,10 +1,14 @@ from functools import wraps +from typing import Any, Callable, TypeVar import binascii import re from moto.moto_api._internal import mock_random as random +TypeDec = TypeVar("TypeDec", bound=Callable[..., Any]) + + def gen_amz_crc32(response, headerdict=None): if not isinstance(response, bytes): response = response.encode("utf-8") @@ -26,9 +30,9 @@ def gen_amzn_requestid_long(headerdict=None): return req_id -def amz_crc32(f): +def amz_crc32(f: TypeDec) -> TypeDec: @wraps(f) - def _wrapper(*args, **kwargs): + def _wrapper(*args: Any, **kwargs: Any) -> Any: response = f(*args, **kwargs) headers = {} @@ -54,9 +58,9 @@ def amz_crc32(f): return _wrapper -def amzn_request_id(f): +def amzn_request_id(f: TypeDec) -> TypeDec: @wraps(f) - def _wrapper(*args, **kwargs): + def _wrapper(*args: Any, **kwargs: Any) -> Any: response = f(*args, **kwargs) headers = {}