From bb1b5f511aea916b7fcf89bf65b4ac8f11b5deaa Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sat, 11 Feb 2023 10:48:22 -0100 Subject: [PATCH] Techdebt: MyPy EC2 (s-models) (#5917) --- moto/core/utils.py | 2 +- moto/ec2/exceptions.py | 30 +- moto/ec2/models/__init__.py | 11 +- moto/ec2/models/fleets.py | 3 +- moto/ec2/models/security_groups.py | 412 ++++++++++-------- moto/ec2/models/spot_requests.py | 398 +++++++++-------- moto/ec2/models/subnets.py | 248 ++++++----- moto/ec2/responses/security_groups.py | 4 +- moto/ec2/responses/spot_fleets.py | 7 +- moto/ec2/utils.py | 20 +- moto/packages/boto/ec2/launchspecification.py | 49 --- moto/packages/boto/ec2/spotinstancerequest.py | 85 ---- setup.cfg | 2 +- 13 files changed, 618 insertions(+), 653 deletions(-) delete mode 100644 moto/packages/boto/ec2/launchspecification.py delete mode 100644 moto/packages/boto/ec2/spotinstancerequest.py diff --git a/moto/core/utils.py b/moto/core/utils.py index 6fe69b822..9ed602910 100644 --- a/moto/core/utils.py +++ b/moto/core/utils.py @@ -277,7 +277,7 @@ def merge_dicts( dict1.pop(key) -def aws_api_matches(pattern: str, string: str) -> bool: +def aws_api_matches(pattern: str, string: Any) -> bool: """ AWS API can match a value based on a glob, or an exact match """ diff --git a/moto/ec2/exceptions.py b/moto/ec2/exceptions.py index bd813f581..7dd6f5e23 100644 --- a/moto/ec2/exceptions.py +++ b/moto/ec2/exceptions.py @@ -103,7 +103,7 @@ class InvalidVPCIdError(EC2ClientError): class InvalidSubnetIdError(EC2ClientError): - def __init__(self, subnet_id): + def __init__(self, subnet_id: str): super().__init__( "InvalidSubnetID.NotFound", f"The subnet ID '{subnet_id}' does not exist" ) @@ -182,14 +182,14 @@ class InvalidNetworkAttachmentIdError(EC2ClientError): class InvalidSecurityGroupDuplicateError(EC2ClientError): - def __init__(self, name): + def __init__(self, name: str): super().__init__( "InvalidGroup.Duplicate", f"The security group '{name}' already exists" ) class InvalidSecurityGroupNotFoundError(EC2ClientError): - def __init__(self, name: str): + def __init__(self, name: Any): super().__init__( "InvalidGroup.NotFound", f"The security group '{name}' does not exist", @@ -197,7 +197,7 @@ class InvalidSecurityGroupNotFoundError(EC2ClientError): class InvalidPermissionNotFoundError(EC2ClientError): - def __init__(self): + def __init__(self) -> None: super().__init__( "InvalidPermission.NotFound", "The specified rule does not exist in this security group", @@ -205,7 +205,7 @@ class InvalidPermissionNotFoundError(EC2ClientError): class InvalidPermissionDuplicateError(EC2ClientError): - def __init__(self): + def __init__(self) -> None: super().__init__( "InvalidPermission.Duplicate", "The specified rule already exists" ) @@ -503,14 +503,14 @@ class InvalidID(EC2ClientError): class InvalidCIDRSubnetError(EC2ClientError): - def __init__(self, cidr): + def __init__(self, cidr: Any): super().__init__( "InvalidParameterValue", f"invalid CIDR subnet specification: {cidr}" ) class RulesPerSecurityGroupLimitExceededError(EC2ClientError): - def __init__(self): + def __init__(self) -> None: super().__init__( "RulesPerSecurityGroupLimitExceeded", "The maximum number of rules per security group " "has been reached.", @@ -518,7 +518,7 @@ class RulesPerSecurityGroupLimitExceededError(EC2ClientError): class MotoNotImplementedError(NotImplementedError): - def __init__(self, blurb): + def __init__(self, blurb: str): super().__init__( f"{blurb} has not been implemented in Moto yet." " Feel free to open an issue at" @@ -555,7 +555,9 @@ class OperationNotPermitted(EC2ClientError): class InvalidAvailabilityZoneError(EC2ClientError): - def __init__(self, availability_zone_value, valid_availability_zones): + def __init__( + self, availability_zone_value: Optional[str], valid_availability_zones: str + ): super().__init__( "InvalidParameterValue", f"Value ({availability_zone_value}) for parameter availabilityZone is invalid. " @@ -580,12 +582,12 @@ class NetworkAclEntryAlreadyExistsError(EC2ClientError): class InvalidSubnetRangeError(EC2ClientError): - def __init__(self, cidr_block): + def __init__(self, cidr_block: str): super().__init__("InvalidSubnet.Range", f"The CIDR '{cidr_block}' is invalid.") class InvalidCIDRBlockParameterError(EC2ClientError): - def __init__(self, cidr_block): + def __init__(self, cidr_block: str): super().__init__( "InvalidParameterValue", f"Value ({cidr_block}) for parameter cidrBlock is invalid. This is not a valid CIDR block.", @@ -601,7 +603,7 @@ class InvalidDestinationCIDRBlockParameterError(EC2ClientError): class InvalidSubnetConflictError(EC2ClientError): - def __init__(self, cidr_block): + def __init__(self, cidr_block: str): super().__init__( "InvalidSubnet.Conflict", f"The CIDR '{cidr_block}' conflicts with another subnet", @@ -704,7 +706,7 @@ class InvalidTaggableResourceType(EC2ClientError): class GenericInvalidParameterValueError(EC2ClientError): - def __init__(self, attribute, value): + def __init__(self, attribute: str, value: str): super().__init__( "InvalidParameterValue", f"invalid value for parameter {attribute}: {value}", @@ -712,7 +714,7 @@ class GenericInvalidParameterValueError(EC2ClientError): class InvalidSubnetCidrBlockAssociationID(EC2ClientError): - def __init__(self, association_id): + def __init__(self, association_id: str): super().__init__( "InvalidSubnetCidrBlockAssociationID.NotFound", f"The subnet CIDR block with association ID '{association_id}' does not exist", diff --git a/moto/ec2/models/__init__.py b/moto/ec2/models/__init__.py index 6d6935e53..ade8411f4 100644 --- a/moto/ec2/models/__init__.py +++ b/moto/ec2/models/__init__.py @@ -30,12 +30,8 @@ from .network_acls import NetworkAclBackend from .availability_zones_and_regions import RegionsAndZonesBackend from .route_tables import RouteBackend from .security_groups import SecurityGroupBackend -from .spot_requests import ( - SpotRequestBackend, - SpotPriceBackend, - SpotFleetBackend, -) -from .subnets import SubnetBackend, SubnetRouteTableAssociationBackend +from .spot_requests import SpotRequestBackend +from .subnets import SubnetBackend from .tags import TagBackend from .transit_gateway import TransitGatewayBackend from .transit_gateway_route_tables import ( @@ -95,7 +91,6 @@ class EC2Backend( VPCBackend, ManagedPrefixListBackend, SubnetBackend, - SubnetRouteTableAssociationBackend, FlowLogsBackend, NetworkInterfaceBackend, VPNConnectionBackend, @@ -104,9 +99,7 @@ class EC2Backend( RouteBackend, InternetGatewayBackend, EgressOnlyInternetGatewayBackend, - SpotFleetBackend, SpotRequestBackend, - SpotPriceBackend, ElasticAddressBackend, KeyPairBackend, SettingsBackend, diff --git a/moto/ec2/models/fleets.py b/moto/ec2/models/fleets.py index 6ec9ca40b..1a95cda4a 100644 --- a/moto/ec2/models/fleets.py +++ b/moto/ec2/models/fleets.py @@ -154,7 +154,7 @@ class Fleet(TaggedEC2Resource): self.fulfilled_capacity += added_weight return self.spot_requests - def create_on_demand_requests(self, weight_to_add: float) -> List[Dict[str, Any]]: + def create_on_demand_requests(self, weight_to_add: float) -> None: weight_map, added_weight = self.get_launch_spec_counts(weight_to_add) for launch_spec, count in weight_map.items(): reservation = self.ec2_backend.add_instances( # type: ignore[attr-defined] @@ -183,7 +183,6 @@ class Fleet(TaggedEC2Resource): } ) self.fulfilled_capacity += added_weight - return self.on_demand_instances def get_launch_spec_counts( self, weight_to_add: float diff --git a/moto/ec2/models/security_groups.py b/moto/ec2/models/security_groups.py index e0ffa6cf8..5cd32ee58 100644 --- a/moto/ec2/models/security_groups.py +++ b/moto/ec2/models/security_groups.py @@ -2,11 +2,10 @@ import copy import itertools import json from collections import defaultdict -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional, Tuple from moto.core import CloudFormationModel from moto.core.utils import aws_api_matches from ..exceptions import ( - DependencyViolationError, InvalidCIDRSubnetError, InvalidPermissionNotFoundError, InvalidPermissionDuplicateError, @@ -27,16 +26,16 @@ from ..utils import ( ) -class SecurityRule(object): +class SecurityRule: def __init__( self, - account_id, - ip_protocol, - from_port, - to_port, - ip_ranges, - source_groups, - prefix_list_ids=None, + account_id: str, + ip_protocol: str, + from_port: Optional[str], + to_port: Optional[str], + ip_ranges: Optional[List[Any]], + source_groups: List[Dict[str, Any]], + prefix_list_ids: Optional[List[Dict[str, str]]] = None, ): self.account_id = account_id self.id = random_security_group_rule_id() @@ -47,8 +46,8 @@ class SecurityRule(object): self.from_port = self.to_port = None if self.ip_protocol != "-1": - self.from_port = int(from_port) - self.to_port = int(to_port) + self.from_port = int(from_port) # type: ignore[arg-type] + self.to_port = int(to_port) # type: ignore[arg-type] ip_protocol_keywords = { "tcp": "tcp", @@ -70,10 +69,10 @@ class SecurityRule(object): self.ip_protocol = proto if proto else self.ip_protocol @property - def owner_id(self): + def owner_id(self) -> str: return self.account_id - def __eq__(self, other): + def __eq__(self, other: "SecurityRule") -> bool: # type: ignore[override] if self.ip_protocol != other.ip_protocol: return False ip_ranges = list( @@ -124,10 +123,9 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): self.name = name self.group_name = self.name self.description = description - self.ingress_rules = [] - self.egress_rules = [] - self.enis = {} - self.vpc_id = vpc_id + self.ingress_rules: List[SecurityRule] = [] + self.egress_rules: List[SecurityRule] = [] + self.vpc_id: Optional[str] = vpc_id self.owner_id = ec2_backend.account_id self.add_tags(tags or {}) self.is_default = is_default or False @@ -177,18 +175,23 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): } @staticmethod - def cloudformation_name_type(): + def cloudformation_name_type() -> str: return "GroupName" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-securitygroup.html return "AWS::EC2::SecurityGroup" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any + ) -> "SecurityGroup": from ..models import ec2_backends properties = cloudformation_json["Properties"] @@ -228,14 +231,14 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): return security_group @classmethod - def update_from_cloudformation_json( + def update_from_cloudformation_json( # type: ignore[misc] cls, - original_resource, - new_resource_name, - cloudformation_json, - account_id, - region_name, - ): + original_resource: Any, + new_resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + ) -> "SecurityGroup": cls._delete_security_group_given_vpc_id( original_resource.name, original_resource.vpc_id, account_id, region_name ) @@ -244,9 +247,13 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): ) @classmethod - def delete_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name - ): + def delete_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + ) -> None: properties = cloudformation_json["Properties"] vpc_id = properties.get("VpcId") cls._delete_security_group_given_vpc_id( @@ -255,8 +262,8 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): @classmethod def _delete_security_group_given_vpc_id( - cls, resource_name, vpc_id, account_id, region_name - ): + cls, resource_name: str, vpc_id: str, account_id: str, region_name: str + ) -> None: from ..models import ec2_backends ec2_backend = ec2_backends[account_id][region_name] @@ -266,21 +273,23 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): if security_group: security_group.delete(account_id, region_name) - def delete(self, account_id, region_name): # pylint: disable=unused-argument + def delete( + self, account_id: str, region_name: str # pylint: disable=unused-argument + ) -> None: """Not exposed as part of the ELB API - used for CloudFormation.""" self.ec2_backend.delete_security_group(group_id=self.id) @property - def physical_resource_id(self): + def physical_resource_id(self) -> str: return self.id - def filter_description(self, values): + def filter_description(self, values: List[Any]) -> bool: for value in values: if aws_api_matches(value, self.description): return True return False - def filter_egress__ip_permission__cidr(self, values): + def filter_egress__ip_permission__cidr(self, values: List[Any]) -> bool: for value in values: for rule in self.egress_rules: for cidr in rule.ip_ranges: @@ -288,16 +297,16 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): return True return False - def filter_egress__ip_permission__from_port(self, values): + def filter_egress__ip_permission__from_port(self, values: List[Any]) -> bool: for value in values: for rule in self.egress_rules: - if rule.ip_protocol != -1 and aws_api_matches( + if rule.ip_protocol != "-1" and aws_api_matches( value, str(rule.from_port) ): return True return False - def filter_egress__ip_permission__group_id(self, values): + def filter_egress__ip_permission__group_id(self, values: List[Any]) -> bool: for value in values: for rule in self.egress_rules: for sg in rule.source_groups: @@ -305,7 +314,7 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): return True return False - def filter_egress__ip_permission__group_name(self, values): + def filter_egress__ip_permission__group_name(self, values: List[Any]) -> bool: for value in values: for rule in self.egress_rules: for group in rule.source_groups: @@ -313,46 +322,46 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): return True return False - def filter_egress__ip_permission__ipv6_cidr(self, values): + def filter_egress__ip_permission__ipv6_cidr(self, values: List[Any]) -> bool: raise MotoNotImplementedError("egress.ip-permission.ipv6-cidr filter") - def filter_egress__ip_permission__prefix_list_id(self, values): + def filter_egress__ip_permission__prefix_list_id(self, values: List[Any]) -> bool: raise MotoNotImplementedError("egress.ip-permission.prefix-list-id filter") - def filter_egress__ip_permission__protocol(self, values): + def filter_egress__ip_permission__protocol(self, values: List[Any]) -> bool: for value in values: for rule in self.egress_rules: if aws_api_matches(value, rule.ip_protocol): return True return False - def filter_egress__ip_permission__to_port(self, values): + def filter_egress__ip_permission__to_port(self, values: List[Any]) -> bool: for value in values: for rule in self.egress_rules: if aws_api_matches(value, rule.to_port): return True return False - def filter_egress__ip_permission__user_id(self, values): + def filter_egress__ip_permission__user_id(self, values: List[Any]) -> bool: for value in values: for rule in self.egress_rules: if aws_api_matches(value, rule.owner_id): return True return False - def filter_group_id(self, values): + def filter_group_id(self, values: List[Any]) -> bool: for value in values: if aws_api_matches(value, self.id): return True return False - def filter_group_name(self, values): + def filter_group_name(self, values: List[Any]) -> bool: for value in values: if aws_api_matches(value, self.group_name): return True return False - def filter_ip_permission__cidr(self, values): + def filter_ip_permission__cidr(self, values: List[Any]) -> bool: for value in values: for rule in self.ingress_rules: for cidr in rule.ip_ranges: @@ -360,14 +369,14 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): return True return False - def filter_ip_permission__from_port(self, values): + def filter_ip_permission__from_port(self, values: List[Any]) -> bool: for value in values: for rule in self.ingress_rules: if aws_api_matches(value, rule.from_port): return True return False - def filter_ip_permission__group_id(self, values): + def filter_ip_permission__group_id(self, values: List[Any]) -> bool: for value in values: for rule in self.ingress_rules: for group in rule.source_groups: @@ -375,7 +384,7 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): return True return False - def filter_ip_permission__group_name(self, values): + def filter_ip_permission__group_name(self, values: List[Any]) -> bool: for value in values: for rule in self.ingress_rules: for group in rule.source_groups: @@ -383,46 +392,46 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): return True return False - def filter_ip_permission__ipv6_cidr(self, values): + def filter_ip_permission__ipv6_cidr(self, values: List[Any]) -> None: raise MotoNotImplementedError("ip-permission.ipv6 filter") - def filter_ip_permission__prefix_list_id(self, values): + def filter_ip_permission__prefix_list_id(self, values: List[Any]) -> None: raise MotoNotImplementedError("ip-permission.prefix-list-id filter") - def filter_ip_permission__protocol(self, values): + def filter_ip_permission__protocol(self, values: List[Any]) -> bool: for value in values: for rule in self.ingress_rules: - if aws_api_matches(value, rule.protocol): + if aws_api_matches(value, rule.ip_protocol): return True return False - def filter_ip_permission__to_port(self, values): + def filter_ip_permission__to_port(self, values: List[Any]) -> bool: for value in values: for rule in self.ingress_rules: if aws_api_matches(value, rule.to_port): return True return False - def filter_ip_permission__user_id(self, values): + def filter_ip_permission__user_id(self, values: List[Any]) -> bool: for value in values: for rule in self.ingress_rules: if aws_api_matches(value, rule.owner_id): return True return False - def filter_owner_id(self, values): + def filter_owner_id(self, values: List[Any]) -> bool: for value in values: if aws_api_matches(value, self.owner_id): return True return False - def filter_vpc_id(self, values): + def filter_vpc_id(self, values: List[Any]) -> bool: for value in values: if aws_api_matches(value, self.vpc_id): return True return False - def matches_filter(self, key, filter_value): + def matches_filter(self, key: str, filter_value: Any) -> Any: if is_tag_filter(key): tag_value = self.get_filter_value(key) if isinstance(filter_value, list): @@ -431,56 +440,62 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): else: return self.filters[key](filter_value) - def matches_filters(self, filters): + def matches_filters(self, filters: Any) -> bool: for key, value in filters.items(): if not self.matches_filter(key, value): return False return True @classmethod - def has_cfn_attr(cls, attr): + def has_cfn_attr(cls, attr: str) -> bool: return attr in ["GroupId"] - def get_cfn_attribute(self, attribute_name): + def get_cfn_attribute(self, attribute_name: str) -> str: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "GroupId": return self.id raise UnformattedGetAttTemplateException() - def add_ingress_rule(self, rule): + def add_ingress_rule(self, rule: SecurityRule) -> None: if rule in self.ingress_rules: raise InvalidPermissionDuplicateError() self.ingress_rules.append(rule) - def add_egress_rule(self, rule): + def add_egress_rule(self, rule: SecurityRule) -> None: if rule in self.egress_rules: raise InvalidPermissionDuplicateError() self.egress_rules.append(rule) - def get_number_of_ingress_rules(self): + def get_number_of_ingress_rules(self) -> int: return sum( len(rule.ip_ranges) + len(rule.source_groups) for rule in self.ingress_rules ) - def get_number_of_egress_rules(self): + def get_number_of_egress_rules(self) -> int: return sum( len(rule.ip_ranges) + len(rule.source_groups) for rule in self.egress_rules ) class SecurityGroupBackend: - def __init__(self): + def __init__(self) -> None: # the key in the dict group is the vpc_id or None (non-vpc) - self.groups = defaultdict(dict) + self.groups: Dict[str, Dict[str, SecurityGroup]] = defaultdict(dict) # This will help us in RuleLimitExceed errors. - self.sg_old_ingress_ruls = {} - self.sg_old_egress_ruls = {} + self.sg_old_ingress_ruls: Dict[str, List[SecurityRule]] = {} + self.sg_old_egress_ruls: Dict[str, List[SecurityRule]] = {} def create_security_group( - self, name, description, vpc_id=None, tags=None, force=False, is_default=None - ): - vpc_id = vpc_id or self.default_vpc.id + self, + name: str, + description: str, + vpc_id: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + force: bool = False, + is_default: Optional[bool] = None, + ) -> SecurityGroup: + vpc_id = vpc_id or self.default_vpc.id # type: ignore[attr-defined] if not description: raise MissingParameterError("GroupDescription") @@ -502,25 +517,34 @@ class SecurityGroupBackend: self.groups[vpc_id][group_id] = group return group - def describe_security_groups(self, group_ids=None, groupnames=None, filters=None): + def describe_security_groups( + self, + group_ids: Optional[List[str]] = None, + groupnames: Optional[List[str]] = None, + filters: Any = None, + ) -> List[SecurityGroup]: all_groups = self.groups.copy() - matches = itertools.chain(*[x.copy().values() for x in all_groups.values()]) + matches = list( + itertools.chain(*[x.copy().values() for x in all_groups.values()]) + ) if group_ids: matches = [grp for grp in matches if grp.id in group_ids] if len(group_ids) > len(matches): - unknown_ids = set(group_ids) - set(matches) + unknown_ids = set(group_ids) - set(matches) # type: ignore[arg-type] raise InvalidSecurityGroupNotFoundError(unknown_ids) if groupnames: matches = [grp for grp in matches if grp.name in groupnames] if len(groupnames) > len(matches): - unknown_names = set(groupnames) - set(matches) + unknown_names = set(groupnames) - set(matches) # type: ignore[arg-type] raise InvalidSecurityGroupNotFoundError(unknown_names) if filters: matches = [grp for grp in matches if grp.matches_filters(filters)] return matches - def describe_security_group_rules(self, group_ids=None, filters=None): + def describe_security_group_rules( + self, group_ids: Optional[List[str]] = None, filters: Any = None + ) -> List[SecurityRule]: matches = self.describe_security_groups(group_ids=group_ids, filters=filters) if not matches: raise InvalidSecurityGroupNotFoundError( @@ -533,13 +557,13 @@ class SecurityGroupBackend: return rules - def _delete_security_group(self, vpc_id, group_id): - vpc_id = vpc_id or self.default_vpc.id - if self.groups[vpc_id][group_id].enis: - raise DependencyViolationError(f"{group_id} is being utilized by ENIs") - return self.groups[vpc_id].pop(group_id) + def _delete_security_group(self, vpc_id: Optional[str], group_id: str) -> None: + vpc_id = vpc_id or self.default_vpc.id # type: ignore[attr-defined] + self.groups[vpc_id].pop(group_id) - def delete_security_group(self, name=None, group_id=None): + def delete_security_group( + self, name: Optional[str] = None, group_id: Optional[str] = None + ) -> None: if group_id: # loop over all the SGs, find the right one for vpc_id, groups in self.groups.items(): @@ -562,8 +586,11 @@ class SecurityGroupBackend: for group in all_groups: if group.id == group_id: return group + return None - def get_security_group_from_name(self, name, vpc_id=None): + def get_security_group_from_name( + self, name: str, vpc_id: Optional[str] = None + ) -> Optional[SecurityGroup]: if vpc_id: for group in self.groups[vpc_id].values(): if group.name == name: @@ -573,8 +600,11 @@ class SecurityGroupBackend: for group in self.groups[vpc_id].values(): if group.name == name: return group + return None - def get_security_group_by_name_or_id(self, group_name_or_id, vpc_id=None): + def get_security_group_by_name_or_id( + self, group_name_or_id: str, vpc_id: Optional[str] = None + ) -> Optional[SecurityGroup]: # try searching by id, fallbacks to name search group = self.get_security_group_from_id(group_name_or_id) @@ -582,22 +612,25 @@ class SecurityGroupBackend: group = self.get_security_group_from_name(group_name_or_id, vpc_id) return group - def get_default_security_group(self, vpc_id=None): - for group in self.groups[vpc_id or self.default_vpc.id].values(): + def get_default_security_group( + self, vpc_id: Optional[str] = None + ) -> Optional[SecurityGroup]: + for group in self.groups[vpc_id or self.default_vpc.id].values(): # type: ignore[attr-defined] if group.is_default: return group + return None def authorize_security_group_ingress( self, - group_name_or_id, - ip_protocol, - from_port, - to_port, - ip_ranges, - source_groups=None, - prefix_list_ids=None, - vpc_id=None, - ): + group_name_or_id: str, + ip_protocol: str, + from_port: str, + to_port: str, + ip_ranges: List[Any], + source_groups: Optional[List[Dict[str, str]]] = None, + prefix_list_ids: Optional[List[Dict[str, str]]] = None, + vpc_id: Optional[str] = None, + ) -> Tuple[SecurityRule, SecurityGroup]: group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id) if group is None: raise InvalidSecurityGroupNotFoundError(group_name_or_id) @@ -629,7 +662,7 @@ class SecurityGroupBackend: _source_groups = self._add_source_group(source_groups, vpc_id) security_rule = SecurityRule( - self.account_id, + self.account_id, # type: ignore[attr-defined] ip_protocol, from_port, to_port, @@ -677,22 +710,22 @@ class SecurityGroupBackend: def revoke_security_group_ingress( self, - group_name_or_id, - ip_protocol, - from_port, - to_port, - ip_ranges, - source_groups=None, - prefix_list_ids=None, - vpc_id=None, - ): + group_name_or_id: str, + ip_protocol: str, + from_port: str, + to_port: str, + ip_ranges: List[Any], + source_groups: Optional[List[Dict[str, Any]]] = None, + prefix_list_ids: Optional[List[Dict[str, str]]] = None, + vpc_id: Optional[str] = None, + ) -> SecurityRule: - group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id) + group: SecurityGroup = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id) # type: ignore[assignment] _source_groups = self._add_source_group(source_groups, vpc_id) security_rule = SecurityRule( - self.account_id, + self.account_id, # type: ignore[attr-defined] ip_protocol, from_port, to_port, @@ -718,7 +751,7 @@ class SecurityGroupBackend: security_rule.prefix_list_ids.extend( [ item - for item in prefix_list_ids + for item in prefix_list_ids # type: ignore[union-attr] if item not in rule.prefix_list_ids ] ) @@ -742,15 +775,15 @@ class SecurityGroupBackend: def authorize_security_group_egress( self, - group_name_or_id, - ip_protocol, - from_port, - to_port, - ip_ranges, - source_groups=None, - prefix_list_ids=None, - vpc_id=None, - ): + group_name_or_id: str, + ip_protocol: str, + from_port: str, + to_port: str, + ip_ranges: List[Any], + source_groups: Optional[List[Dict[str, Any]]] = None, + prefix_list_ids: Optional[List[Dict[str, str]]] = None, + vpc_id: Optional[str] = None, + ) -> Tuple[SecurityRule, SecurityGroup]: group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id) if group is None: raise InvalidSecurityGroupNotFoundError(group_name_or_id) @@ -786,7 +819,7 @@ class SecurityGroupBackend: _source_groups = self._add_source_group(source_groups, vpc_id) security_rule = SecurityRule( - self.account_id, + self.account_id, # type: ignore[attr-defined] ip_protocol, from_port, to_port, @@ -835,17 +868,17 @@ class SecurityGroupBackend: def revoke_security_group_egress( self, - group_name_or_id, - ip_protocol, - from_port, - to_port, - ip_ranges, - source_groups=None, - prefix_list_ids=None, - vpc_id=None, - ): + group_name_or_id: str, + ip_protocol: str, + from_port: str, + to_port: str, + ip_ranges: List[Any], + source_groups: Optional[List[Dict[str, Any]]] = None, + prefix_list_ids: Optional[List[Dict[str, str]]] = None, + vpc_id: Optional[str] = None, + ) -> SecurityRule: - group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id) + group: SecurityGroup = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id) # type: ignore[assignment] _source_groups = self._add_source_group(source_groups, vpc_id) @@ -857,14 +890,14 @@ class SecurityGroupBackend: # ip_ranges = [ip.get("CidrIp") if ip.get("CidrIp") == "0.0.0.0/0" else ip] if group.vpc_id: - vpc = self.vpcs.get(group.vpc_id) + vpc = self.vpcs.get(group.vpc_id) # type: ignore[attr-defined] if vpc and not len(vpc.get_cidr_block_association_set(ipv6=True)) > 0: for item in ip_ranges.copy(): if "CidrIpv6" in item: ip_ranges.remove(item) security_rule = SecurityRule( - self.account_id, + self.account_id, # type: ignore[attr-defined] ip_protocol, from_port, to_port, @@ -891,7 +924,7 @@ class SecurityGroupBackend: security_rule.prefix_list_ids.extend( [ item - for item in prefix_list_ids + for item in prefix_list_ids # type: ignore[union-attr] if item not in rule.prefix_list_ids ] ) @@ -914,15 +947,15 @@ class SecurityGroupBackend: def update_security_group_rule_descriptions_ingress( self, - group_name_or_id, - ip_protocol, - from_port, - to_port, - ip_ranges, - source_groups=None, - prefix_list_ids=None, - vpc_id=None, - ): + group_name_or_id: str, + ip_protocol: str, + from_port: str, + to_port: str, + ip_ranges: List[str], + source_groups: Optional[List[Dict[str, Any]]] = None, + prefix_list_ids: Optional[List[Dict[str, str]]] = None, + vpc_id: Optional[str] = None, + ) -> SecurityGroup: group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id) if group is None: @@ -936,7 +969,7 @@ class SecurityGroupBackend: if ip_ranges: for cidr in ip_ranges: if ( - type(cidr) is dict + type(cidr) is dict # type: ignore and not any( [ is_valid_cidr(cidr.get("CidrIp", "")), @@ -951,7 +984,7 @@ class SecurityGroupBackend: _source_groups = self._add_source_group(source_groups, vpc_id) security_rule = SecurityRule( - self.account_id, + self.account_id, # type: ignore[attr-defined] ip_protocol, from_port, to_port, @@ -970,15 +1003,15 @@ class SecurityGroupBackend: def update_security_group_rule_descriptions_egress( self, - group_name_or_id, - ip_protocol, - from_port, - to_port, - ip_ranges, - source_groups=None, - prefix_list_ids=None, - vpc_id=None, - ): + group_name_or_id: str, + ip_protocol: str, + from_port: str, + to_port: str, + ip_ranges: List[str], + source_groups: Optional[List[Dict[str, Any]]] = None, + prefix_list_ids: Optional[List[Dict[str, str]]] = None, + vpc_id: Optional[str] = None, + ) -> SecurityGroup: group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id) if group is None: @@ -992,7 +1025,7 @@ class SecurityGroupBackend: if ip_ranges: for cidr in ip_ranges: if ( - type(cidr) is dict + type(cidr) is dict # type: ignore and not any( [ is_valid_cidr(cidr.get("CidrIp", "")), @@ -1007,11 +1040,11 @@ class SecurityGroupBackend: _source_groups = self._add_source_group(source_groups, vpc_id) security_rule = SecurityRule( - self.account_id, + self.account_id, # type: ignore[attr-defined] ip_protocol, from_port, to_port, - ip_ranges, + ip_ranges, # type: ignore[arg-type] _source_groups, prefix_list_ids, ) @@ -1024,7 +1057,9 @@ class SecurityGroupBackend: self._sg_update_description(security_rule, rule) return group - def _sg_update_description(self, security_rule, rule): + def _sg_update_description( + self, security_rule: SecurityRule, rule: SecurityRule + ) -> None: for item in security_rule.ip_ranges: for cidr_item in rule.ip_ranges: if cidr_item.get("CidrIp") == item.get("CidrIp"): @@ -1039,7 +1074,13 @@ class SecurityGroupBackend: ) or source_group.get("GroupName") == group.get("GroupName"): source_group["Description"] = group.get("Description") - def _remove_items_from_rule(self, ip_ranges, _source_groups, prefix_list_ids, rule): + def _remove_items_from_rule( + self, + ip_ranges: List[Any], + _source_groups: List[Any], + prefix_list_ids: Optional[List[Any]], + rule: SecurityRule, + ) -> None: for item in ip_ranges: if item not in rule.ip_ranges: raise InvalidPermissionNotFoundError() @@ -1052,30 +1093,29 @@ class SecurityGroupBackend: else: rule.source_groups.remove(item) - for item in prefix_list_ids: + for item in prefix_list_ids: # type: ignore[union-attr] if item not in rule.prefix_list_ids: raise InvalidPermissionNotFoundError() else: rule.prefix_list_ids.remove(item) - pass - def _add_source_group(self, source_groups, vpc_id): + def _add_source_group( + self, source_groups: Optional[List[Dict[str, Any]]], vpc_id: Optional[str] + ) -> List[Dict[str, Any]]: _source_groups = [] for item in source_groups or []: if "OwnerId" not in item: - item["OwnerId"] = self.account_id + item["OwnerId"] = self.account_id # type: ignore[attr-defined] # for VPCs if "GroupId" in item: - if not self.get_security_group_by_name_or_id( - item.get("GroupId"), vpc_id - ): - raise InvalidSecurityGroupNotFoundError(item.get("GroupId")) + if not self.get_security_group_by_name_or_id(item["GroupId"], vpc_id): + raise InvalidSecurityGroupNotFoundError(item["GroupId"]) if "GroupName" in item: source_group = self.get_security_group_by_name_or_id( - item.get("GroupName"), vpc_id + item["GroupName"], vpc_id ) if not source_group: - raise InvalidSecurityGroupNotFoundError(item.get("GroupName")) + raise InvalidSecurityGroupNotFoundError(item["GroupName"]) else: item["GroupId"] = source_group.id item.pop("GroupName") @@ -1084,8 +1124,13 @@ class SecurityGroupBackend: return _source_groups def _verify_group_will_respect_rule_count_limit( - self, group, current_rule_nb, ip_ranges, source_groups=None, egress=False - ): + self, + group: SecurityGroup, + current_rule_nb: int, + ip_ranges: List[str], + source_groups: Optional[List[Dict[str, str]]] = None, + egress: bool = False, + ) -> None: max_nb_rules = 60 if group.vpc_id else 100 future_group_nb_rules = current_rule_nb if ip_ranges: @@ -1101,23 +1146,28 @@ class SecurityGroupBackend: class SecurityGroupIngress(CloudFormationModel): - def __init__(self, security_group, properties): + def __init__(self, security_group: SecurityGroup, properties: Any): self.security_group = security_group self.properties = properties @staticmethod - def cloudformation_name_type(): - return None + def cloudformation_name_type() -> str: + return "" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-securitygroupingress.html return "AWS::EC2::SecurityGroupIngress" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any + ) -> "SecurityGroupIngress": from ..models import ec2_backends properties = cloudformation_json["Properties"] diff --git a/moto/ec2/models/spot_requests.py b/moto/ec2/models/spot_requests.py index 3552e97bc..d9242931c 100644 --- a/moto/ec2/models/spot_requests.py +++ b/moto/ec2/models/spot_requests.py @@ -1,11 +1,11 @@ from collections import defaultdict -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING -from moto.core.common_models import CloudFormationModel -from moto.packages.boto.ec2.launchspecification import LaunchSpecification -from moto.packages.boto.ec2.spotinstancerequest import ( - SpotInstanceRequest as BotoSpotRequest, -) +from moto.core.common_models import BaseModel, CloudFormationModel + +if TYPE_CHECKING: + from moto.ec2.models.instances import Instance + from moto.ec2.models.security_groups import SecurityGroup from .core import TaggedEC2Resource from .instance_types import INSTANCE_TYPE_OFFERINGS from ..utils import ( @@ -16,43 +16,73 @@ from ..utils import ( ) -class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource): +class LaunchSpecification(BaseModel): def __init__( self, - ec2_backend, - spot_request_id, - price, - image_id, - spot_instance_type, - valid_from, - valid_until, - launch_group, - availability_zone_group, - key_name, - security_groups, - user_data, - instance_type, - placement, - kernel_id, - ramdisk_id, - monitoring_enabled, - subnet_id, - tags, - spot_fleet_id, - instance_interruption_behaviour, - **kwargs, + kernel_id: Optional[str], + ramdisk_id: Optional[str], + image_id: Optional[str], + key_name: Optional[str], + instance_type: str, + placement: Optional[str], + monitored: bool, + subnet_id: str, ): - super().__init__(**kwargs) - ls = LaunchSpecification() + self.key_name = key_name + self.instance_type = instance_type + self.image_id = image_id + self.groups: List[SecurityGroup] = [] + self.placement = placement + self.kernel = kernel_id + self.ramdisk = ramdisk_id + self.monitored = monitored + self.subnet_id = subnet_id + self.ebs_optimized = False + + +class SpotInstanceRequest(TaggedEC2Resource): + def __init__( + self, + ec2_backend: Any, + spot_request_id: str, + price: str, + image_id: str, + spot_instance_type: str, + valid_from: Optional[str], + valid_until: Optional[str], + launch_group: Optional[str], + availability_zone_group: Optional[str], + key_name: str, + security_groups: List[str], + user_data: Dict[str, Any], + instance_type: str, + placement: Optional[str], + kernel_id: Optional[str], + ramdisk_id: Optional[str], + monitoring_enabled: bool, + subnet_id: str, + tags: Dict[str, Dict[str, str]], + spot_fleet_id: Optional[str], + instance_interruption_behaviour: Optional[str], + ): + super().__init__() self.ec2_backend = ec2_backend - self.launch_specification = ls + self.launch_specification = LaunchSpecification( + kernel_id=kernel_id, + ramdisk_id=ramdisk_id, + image_id=image_id, + key_name=key_name, + instance_type=instance_type, + placement=placement, + monitored=monitoring_enabled, + subnet_id=subnet_id, + ) self.id = spot_request_id self.state = "open" self.status = "pending-evaluation" self.status_message = "Your Spot request has been submitted for review, and is pending evaluation." if price: - price = float(price) - price = f"{price:.6f}" # round up/down to 6 decimals + price = f"{float(price):.6f}" # round up/down to 6 decimals self.price = price self.type = spot_instance_type self.valid_from = valid_from @@ -63,14 +93,6 @@ class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource): instance_interruption_behaviour or "terminate" ) self.user_data = user_data # NOT - ls.kernel = kernel_id - ls.ramdisk = ramdisk_id - ls.image_id = image_id - ls.key_name = key_name - ls.instance_type = instance_type - ls.placement = placement - ls.monitored = monitoring_enabled - ls.subnet_id = subnet_id self.spot_fleet_id = spot_fleet_id tag_map = tags.get("spot-instances-request", {}) self.add_tags(tag_map) @@ -80,18 +102,20 @@ class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource): for group_name in security_groups: group = self.ec2_backend.get_security_group_by_name_or_id(group_name) if group: - ls.groups.append(group) + self.launch_specification.groups.append(group) else: # If not security groups, add the default default_group = self.ec2_backend.get_security_group_by_name_or_id("default") - ls.groups.append(default_group) + self.launch_specification.groups.append(default_group) self.instance = self.launch_instance() self.state = "active" self.status = "fulfilled" self.status_message = "" - def get_filter_value(self, filter_name): + def get_filter_value( + self, filter_name: str, method_name: Optional[str] = None + ) -> Any: if filter_name == "state": return self.state elif filter_name == "spot-instance-request-id": @@ -99,7 +123,7 @@ class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource): else: return super().get_filter_value(filter_name, "DescribeSpotInstanceRequests") - def launch_instance(self): + def launch_instance(self) -> "Instance": reservation = self.ec2_backend.add_instances( image_id=self.launch_specification.image_id, count=1, @@ -118,80 +142,7 @@ class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource): return instance -class SpotRequestBackend: - def __init__(self): - self.spot_instance_requests = {} - - def request_spot_instances( - self, - price, - image_id, - count, - spot_instance_type, - valid_from, - valid_until, - launch_group, - availability_zone_group, - key_name, - security_groups, - user_data, - instance_type, - placement, - kernel_id, - ramdisk_id, - monitoring_enabled, - subnet_id, - tags=None, - spot_fleet_id=None, - instance_interruption_behaviour=None, - ): - requests = [] - tags = tags or {} - for _ in range(count): - spot_request_id = random_spot_request_id() - request = SpotInstanceRequest( - self, - spot_request_id, - price, - image_id, - spot_instance_type, - valid_from, - valid_until, - launch_group, - availability_zone_group, - key_name, - security_groups, - user_data, - instance_type, - placement, - kernel_id, - ramdisk_id, - monitoring_enabled, - subnet_id, - tags, - spot_fleet_id, - instance_interruption_behaviour, - ) - self.spot_instance_requests[spot_request_id] = request - requests.append(request) - return requests - - def describe_spot_instance_requests(self, filters=None, spot_instance_ids=None): - requests = self.spot_instance_requests.copy().values() - - if spot_instance_ids: - requests = [i for i in requests if i.id in spot_instance_ids] - - return generic_filter(filters, requests) - - def cancel_spot_instance_requests(self, request_ids): - requests = [] - for request_id in request_ids: - requests.append(self.spot_instance_requests.pop(request_id)) - return requests - - -class SpotFleetLaunchSpec(object): +class SpotFleetLaunchSpec: def __init__( self, ebs_optimized: Any, @@ -224,18 +175,20 @@ class SpotFleetLaunchSpec(object): class SpotFleetRequest(TaggedEC2Resource, CloudFormationModel): def __init__( self, - ec2_backend, - spot_fleet_request_id, - spot_price, - target_capacity, - iam_fleet_role, - allocation_strategy, - launch_specs, - launch_template_config, - instance_interruption_behaviour, + ec2_backend: Any, + spot_backend: "SpotRequestBackend", + spot_fleet_request_id: str, + spot_price: str, + target_capacity: str, + iam_fleet_role: str, + allocation_strategy: str, + launch_specs: List[Dict[str, Any]], + launch_template_config: Optional[List[Dict[str, Any]]], + instance_interruption_behaviour: Optional[str], ): self.ec2_backend = ec2_backend + self.spot_backend = spot_backend self.id = spot_fleet_request_id self.spot_price = spot_price self.target_capacity = int(target_capacity) @@ -289,26 +242,31 @@ class SpotFleetRequest(TaggedEC2Resource, CloudFormationModel): ) ) - self.spot_requests = [] + self.spot_requests: List[SpotInstanceRequest] = [] self.create_spot_requests(self.target_capacity) @property - def physical_resource_id(self): + def physical_resource_id(self) -> str: return self.id @staticmethod - def cloudformation_name_type(): - return None + def cloudformation_name_type() -> str: + return "" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-spotfleet.html return "AWS::EC2::SpotFleet" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "SpotFleetRequest": from ..models import ec2_backends properties = cloudformation_json["Properties"]["SpotFleetRequestConfigData"] @@ -330,10 +288,12 @@ class SpotFleetRequest(TaggedEC2Resource, CloudFormationModel): return spot_fleet_request - def get_launch_spec_counts(self, weight_to_add): - weight_map = defaultdict(int) + def get_launch_spec_counts( + self, weight_to_add: float + ) -> Tuple[Dict[Any, int], float]: + weight_map: Dict[Any, int] = defaultdict(int) - weight_so_far = 0 + weight_so_far = 0.0 if self.allocation_strategy == "diversified": launch_spec_index = 0 while True: @@ -360,10 +320,10 @@ class SpotFleetRequest(TaggedEC2Resource, CloudFormationModel): return weight_map, weight_so_far - def create_spot_requests(self, weight_to_add): + def create_spot_requests(self, weight_to_add: float) -> None: weight_map, added_weight = self.get_launch_spec_counts(weight_to_add) for launch_spec, count in weight_map.items(): - requests = self.ec2_backend.request_spot_instances( + requests = self.spot_backend.request_spot_instances( price=launch_spec.spot_price, image_id=launch_spec.image_id, count=count, @@ -386,9 +346,8 @@ class SpotFleetRequest(TaggedEC2Resource, CloudFormationModel): ) self.spot_requests.extend(requests) self.fulfilled_capacity += added_weight - return self.spot_requests - def terminate_instances(self): + def terminate_instances(self) -> None: instance_ids = [] new_fulfilled_capacity = self.fulfilled_capacity for req in self.spot_requests: @@ -408,50 +367,130 @@ class SpotFleetRequest(TaggedEC2Resource, CloudFormationModel): self.spot_requests = [ req for req in self.spot_requests if req.instance.id not in instance_ids ] - self.ec2_backend.terminate_instances(instance_ids) + self.ec2_backend.terminate_instances(instance_ids) # type: ignore[attr-defined] -class SpotFleetBackend: - def __init__(self): - self.spot_fleet_requests = {} +class SpotRequestBackend: + def __init__(self) -> None: + self.spot_instance_requests: Dict[str, SpotInstanceRequest] = {} + self.spot_fleet_requests: Dict[str, SpotFleetRequest] = {} + + def request_spot_instances( + self, + price: str, + image_id: str, + count: int, + spot_instance_type: str, + valid_from: Optional[str], + valid_until: Optional[str], + launch_group: Optional[str], + availability_zone_group: Optional[str], + key_name: str, + security_groups: List[str], + user_data: Dict[str, Any], + instance_type: str, + placement: Optional[str], + kernel_id: Optional[str], + ramdisk_id: Optional[str], + monitoring_enabled: bool, + subnet_id: str, + tags: Optional[Dict[str, Dict[str, str]]] = None, + spot_fleet_id: Optional[str] = None, + instance_interruption_behaviour: Optional[str] = None, + ) -> List[SpotInstanceRequest]: + requests = [] + tags = tags or {} + for _ in range(count): + spot_request_id = random_spot_request_id() + request = SpotInstanceRequest( + self, + spot_request_id, + price, + image_id, + spot_instance_type, + valid_from, + valid_until, + launch_group, + availability_zone_group, + key_name, + security_groups, + user_data, + instance_type, + placement, + kernel_id, + ramdisk_id, + monitoring_enabled, + subnet_id, + tags, + spot_fleet_id, + instance_interruption_behaviour, + ) + self.spot_instance_requests[spot_request_id] = request + requests.append(request) + return requests + + def describe_spot_instance_requests( + self, filters: Any = None, spot_instance_ids: Optional[List[str]] = None + ) -> List[SpotInstanceRequest]: + requests = list(self.spot_instance_requests.values()) + + if spot_instance_ids: + requests = [i for i in requests if i.id in spot_instance_ids] + + return generic_filter(filters, requests) + + def cancel_spot_instance_requests( + self, request_ids: List[str] + ) -> List[SpotInstanceRequest]: + requests = [] + for request_id in request_ids: + requests.append(self.spot_instance_requests.pop(request_id)) + return requests def request_spot_fleet( self, - spot_price, - target_capacity, - iam_fleet_role, - allocation_strategy, - launch_specs, - launch_template_config=None, - instance_interruption_behaviour=None, - ): + spot_price: str, + target_capacity: str, + iam_fleet_role: str, + allocation_strategy: str, + launch_specs: List[Dict[str, Any]], + launch_template_config: Optional[List[Dict[str, Any]]] = None, + instance_interruption_behaviour: Optional[str] = None, + ) -> SpotFleetRequest: spot_fleet_request_id = random_spot_fleet_request_id() request = SpotFleetRequest( - self, - spot_fleet_request_id, - spot_price, - target_capacity, - iam_fleet_role, - allocation_strategy, - launch_specs, - launch_template_config, - instance_interruption_behaviour, + ec2_backend=self, + spot_backend=self, + spot_fleet_request_id=spot_fleet_request_id, + spot_price=spot_price, + target_capacity=target_capacity, + iam_fleet_role=iam_fleet_role, + allocation_strategy=allocation_strategy, + launch_specs=launch_specs, + launch_template_config=launch_template_config, + instance_interruption_behaviour=instance_interruption_behaviour, ) self.spot_fleet_requests[spot_fleet_request_id] = request return request - def get_spot_fleet_request(self, spot_fleet_request_id): + def get_spot_fleet_request( + self, spot_fleet_request_id: str + ) -> Optional[SpotFleetRequest]: return self.spot_fleet_requests.get(spot_fleet_request_id) - def describe_spot_fleet_instances(self, spot_fleet_request_id): + def describe_spot_fleet_instances( + self, spot_fleet_request_id: str + ) -> List[SpotInstanceRequest]: spot_fleet = self.get_spot_fleet_request(spot_fleet_request_id) if not spot_fleet: return [] return spot_fleet.spot_requests - def describe_spot_fleet_requests(self, spot_fleet_request_ids): - requests = self.spot_fleet_requests.values() + def describe_spot_fleet_requests( + self, spot_fleet_request_ids: List[str] + ) -> List[SpotFleetRequest]: + requests = list(self.spot_fleet_requests.values()) if spot_fleet_request_ids: requests = [ @@ -460,7 +499,9 @@ class SpotFleetBackend: return requests - def cancel_spot_fleet_requests(self, spot_fleet_request_ids, terminate_instances): + def cancel_spot_fleet_requests( + self, spot_fleet_request_ids: List[str], terminate_instances: bool + ) -> List[SpotFleetRequest]: spot_requests = [] for spot_fleet_request_id in spot_fleet_request_ids: spot_fleet = self.spot_fleet_requests[spot_fleet_request_id] @@ -474,8 +515,8 @@ class SpotFleetBackend: return spot_requests def modify_spot_fleet_request( - self, spot_fleet_request_id, target_capacity, terminate_instances - ): + self, spot_fleet_request_id: str, target_capacity: int, terminate_instances: str + ) -> None: if target_capacity < 0: raise ValueError("Cannot reduce spot fleet capacity below 0") spot_fleet_request = self.spot_fleet_requests[spot_fleet_request_id] @@ -485,16 +526,15 @@ class SpotFleetBackend: spot_fleet_request.create_spot_requests(delta) elif delta < 0 and terminate_instances == "Default": spot_fleet_request.terminate_instances() - return True - -class SpotPriceBackend: - def describe_spot_price_history(self, instance_types=None, filters=None): + def describe_spot_price_history( + self, instance_types: Optional[List[str]] = None, filters: Any = None + ) -> List[Dict[str, str]]: matches = INSTANCE_TYPE_OFFERINGS["availability-zone"] - matches = matches.get(self.region_name, []) + matches = matches.get(self.region_name, []) # type: ignore[attr-defined] - def matches_filters(offering, filters): - def matches_filter(key, values): + def matches_filters(offering: Dict[str, Any], filters: Any) -> bool: + def matches_filter(key: str, values: List[str]) -> bool: if key == "availability-zone": return offering.get("Location") in values elif key == "instance-type": diff --git a/moto/ec2/models/subnets.py b/moto/ec2/models/subnets.py index 4877d8abf..1921eca80 100644 --- a/moto/ec2/models/subnets.py +++ b/moto/ec2/models/subnets.py @@ -1,9 +1,13 @@ import ipaddress import itertools from collections import defaultdict -from typing import Any, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Tuple, TYPE_CHECKING, Set from moto.core import CloudFormationModel + +if TYPE_CHECKING: + from moto.ec2.models.instances import Instance +from moto.ec2.models.availability_zones_and_regions import Zone from ..exceptions import ( GenericInvalidParameterValueError, InvalidAvailabilityZoneError, @@ -26,15 +30,15 @@ from ..utils import ( class Subnet(TaggedEC2Resource, CloudFormationModel): def __init__( self, - ec2_backend, - subnet_id, - vpc_id, - cidr_block, - ipv6_cidr_block, - availability_zone, - default_for_az, - map_public_ip_on_launch, - assign_ipv6_address_on_creation=False, + ec2_backend: Any, + subnet_id: str, + vpc_id: str, + cidr_block: str, + ipv6_cidr_block: Optional[str], + availability_zone: Zone, + default_for_az: str, + map_public_ip_on_launch: str, + assign_ipv6_address_on_creation: bool = False, ): self.ec2_backend = ec2_backend self.id = subnet_id @@ -46,7 +50,7 @@ class Subnet(TaggedEC2Resource, CloudFormationModel): self.default_for_az = default_for_az self.map_public_ip_on_launch = map_public_ip_on_launch self.assign_ipv6_address_on_creation = assign_ipv6_address_on_creation - self.ipv6_cidr_block_associations = {} + self.ipv6_cidr_block_associations: Dict[str, Dict[str, Any]] = {} if ipv6_cidr_block: self.attach_ipv6_cidr_block_associations(ipv6_cidr_block) @@ -55,30 +59,37 @@ class Subnet(TaggedEC2Resource, CloudFormationModel): self.reserved_ips = [ next(self._subnet_ip_generator) for _ in range(0, 3) ] # Reserved by AWS - self._unused_ips = set() # if instance is destroyed hold IP here for reuse - self._subnet_ips = {} # has IP: instance + self._unused_ips: Set[ + str + ] = set() # if instance is destroyed hold IP here for reuse + self._subnet_ips: Dict[str, "Instance"] = {} self.state = "available" # Placeholder for response templates until Ipv6 support implemented. self.ipv6_native = False @property - def owner_id(self): + def owner_id(self) -> str: return self.ec2_backend.account_id @staticmethod - def cloudformation_name_type(): - return None + def cloudformation_name_type() -> str: + return "" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-subnet.html return "AWS::EC2::Subnet" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "Subnet": from ..models import ec2_backends properties = cloudformation_json["Properties"] @@ -98,7 +109,7 @@ class Subnet(TaggedEC2Resource, CloudFormationModel): return subnet @property - def available_ip_addresses(self): + def available_ip_addresses(self) -> str: enis = [ eni for eni in self.ec2_backend.get_all_network_interfaces() @@ -111,18 +122,20 @@ class Subnet(TaggedEC2Resource, CloudFormationModel): return str(self._available_ip_addresses - len(addresses_taken)) @property - def availability_zone(self): + def availability_zone(self) -> str: return self._availability_zone.name @property - def availability_zone_id(self): + def availability_zone_id(self) -> str: return self._availability_zone.zone_id @property - def physical_resource_id(self): + def physical_resource_id(self) -> str: return self.id - def get_filter_value(self, filter_name): + def get_filter_value( + self, filter_name: str, method_name: Optional[str] = None + ) -> Any: """ API Version 2014-10-01 defines the following filters for DescribeSubnets: @@ -155,36 +168,36 @@ class Subnet(TaggedEC2Resource, CloudFormationModel): return super().get_filter_value(filter_name, "DescribeSubnets") @classmethod - def has_cfn_attr(cls, attr): + def has_cfn_attr(cls, attr: str) -> bool: return attr in ["AvailabilityZone"] - def get_cfn_attribute(self, attribute_name): + def get_cfn_attribute(self, attribute_name: str) -> None: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "AvailabilityZone": raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "AvailabilityZone" ]"') raise UnformattedGetAttTemplateException() - def get_available_subnet_ip(self, instance): + def get_available_subnet_ip(self, instance: "Instance") -> str: try: - new_ip = self._unused_ips.pop() + new_ip = str(self._unused_ips.pop()) except KeyError: - new_ip = next(self._subnet_ip_generator) + new_ip_v4 = next(self._subnet_ip_generator) # Skips any IP's if they've been manually specified - while str(new_ip) in self._subnet_ips: - new_ip = next(self._subnet_ip_generator) + while str(new_ip_v4) in self._subnet_ips: + new_ip_v4 = next(self._subnet_ip_generator) - if new_ip == self.cidr.broadcast_address: + if new_ip_v4 == self.cidr.broadcast_address: raise StopIteration() # Broadcast address cant be used obviously + new_ip = str(new_ip_v4) # TODO StopIteration will be raised if no ip's available, not sure how aws handles this. - new_ip = str(new_ip) self._subnet_ips[new_ip] = instance return new_ip - def request_ip(self, ip, instance): + def request_ip(self, ip: str, instance: "Instance") -> None: if ipaddress.ip_address(ip) not in self.cidr: raise Exception(f"IP does not fall in the subnet CIDR of {self.cidr}") @@ -196,7 +209,6 @@ class Subnet(TaggedEC2Resource, CloudFormationModel): pass self._subnet_ips[ip] = instance - return ip def del_subnet_ip(self, ip: str) -> None: try: @@ -205,35 +217,77 @@ class Subnet(TaggedEC2Resource, CloudFormationModel): except KeyError: pass # Unknown IP - def attach_ipv6_cidr_block_associations(self, ipv6_cidr_block): + def attach_ipv6_cidr_block_associations( + self, ipv6_cidr_block: str + ) -> Dict[str, str]: association = { "associationId": random_subnet_ipv6_cidr_block_association_id(), "ipv6CidrBlock": ipv6_cidr_block, "ipv6CidrBlockState": "associated", } - self.ipv6_cidr_block_associations[ - association.get("associationId") - ] = association + self.ipv6_cidr_block_associations[association["associationId"]] = association return association - def detach_subnet_cidr_block(self, association_id): + def detach_subnet_cidr_block(self, association_id: str) -> Dict[str, Any]: association = self.ipv6_cidr_block_associations.get(association_id) - association["ipv6CidrBlockState"] = "disassociated" - return association + association["ipv6CidrBlockState"] = "disassociated" # type: ignore[index] + return association # type: ignore[return-value] + + +class SubnetRouteTableAssociation(CloudFormationModel): + def __init__(self, route_table_id: str, subnet_id: str): + self.route_table_id = route_table_id + self.subnet_id = subnet_id + + @property + def physical_resource_id(self) -> str: + return self.route_table_id + + @staticmethod + def cloudformation_name_type() -> str: + return "" + + @staticmethod + def cloudformation_type() -> str: + # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-subnetroutetableassociation.html + return "AWS::EC2::SubnetRouteTableAssociation" + + @classmethod + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "SubnetRouteTableAssociation": + from ..models import ec2_backends + + properties = cloudformation_json["Properties"] + + route_table_id = properties["RouteTableId"] + subnet_id = properties["SubnetId"] + + ec2_backend = ec2_backends[account_id][region_name] + subnet_association = ec2_backend.create_subnet_association( + route_table_id=route_table_id, subnet_id=subnet_id + ) + return subnet_association class SubnetBackend: - def __init__(self): + def __init__(self) -> None: # maps availability zone to dict of (subnet_id, subnet) - self.subnets = defaultdict(dict) + self.subnets: Dict[str, Dict[str, Subnet]] = defaultdict(dict) + self.subnet_associations: Dict[str, SubnetRouteTableAssociation] = {} def get_subnet(self, subnet_id: str) -> Subnet: - for subnets in self.subnets.values(): - if subnet_id in subnets: - return subnets[subnet_id] + for subnets_per_zone in self.subnets.values(): + if subnet_id in subnets_per_zone: + return subnets_per_zone[subnet_id] raise InvalidSubnetIdError(subnet_id) - def get_default_subnet(self, availability_zone): + def get_default_subnet(self, availability_zone: str) -> Subnet: return [ subnet for subnet in self.get_all_subnets( @@ -244,17 +298,16 @@ class SubnetBackend: def create_subnet( self, - vpc_id, - cidr_block, - ipv6_cidr_block=None, - availability_zone=None, - availability_zone_id=None, - tags=None, - ): + vpc_id: str, + cidr_block: str, + ipv6_cidr_block: Optional[str] = None, + availability_zone: Optional[str] = None, + availability_zone_id: Optional[str] = None, + tags: Optional[List[Dict[str, str]]] = None, + ) -> Subnet: subnet_id = random_subnet_id() - vpc = self.get_vpc( - vpc_id - ) # Validate VPC exists and the supplied CIDR block is a subnet of the VPC's + # Validate VPC exists and the supplied CIDR block is a subnet of the VPC's + vpc = self.get_vpc(vpc_id) # type: ignore[attr-defined] vpc_cidr_blocks = [ ipaddress.IPv4Network( str(cidr_block_association["cidr_block"]), strict=False @@ -334,108 +387,73 @@ class SubnetBackend: ) for tag in tags or []: - tag_key = tag.get("Key") - tag_value = tag.get("Value") - subnet.add_tag(tag_key, tag_value) + subnet.add_tag(tag["Key"], tag["Value"]) # AWS associates a new subnet with the default Network ACL - self.associate_default_network_acl_with_subnet(subnet_id, vpc_id) - self.subnets[availability_zone][subnet_id] = subnet + self.associate_default_network_acl_with_subnet(subnet_id, vpc_id) # type: ignore[attr-defined] + self.subnets[availability_zone][subnet_id] = subnet # type: ignore[index] return subnet def get_all_subnets( self, subnet_ids: Optional[List[str]] = None, filters: Optional[Any] = None ) -> Iterable[Subnet]: # Extract a list of all subnets - matches = itertools.chain( - *[x.copy().values() for x in self.subnets.copy().values()] + matches = list( + itertools.chain(*[x.copy().values() for x in self.subnets.copy().values()]) ) if subnet_ids: matches = [sn for sn in matches if sn.id in subnet_ids] if len(subnet_ids) > len(matches): - unknown_ids = set(subnet_ids) - set(matches) + unknown_ids = set(subnet_ids) - set(matches) # type: ignore[arg-type] raise InvalidSubnetIdError(list(unknown_ids)[0]) if filters: matches = generic_filter(filters, matches) return matches - def delete_subnet(self, subnet_id): + def delete_subnet(self, subnet_id: str) -> Subnet: for subnets in self.subnets.values(): if subnet_id in subnets: - return subnets.pop(subnet_id, None) + return subnets.pop(subnet_id) raise InvalidSubnetIdError(subnet_id) - def modify_subnet_attribute(self, subnet_id, attr_name, attr_value): + def modify_subnet_attribute( + self, subnet_id: str, attr_name: str, attr_value: str + ) -> None: subnet = self.get_subnet(subnet_id) if attr_name in ("map_public_ip_on_launch", "assign_ipv6_address_on_creation"): setattr(subnet, attr_name, attr_value) else: raise InvalidParameterValueError(attr_name) - def get_subnet_from_ipv6_association(self, association_id): + def get_subnet_from_ipv6_association(self, association_id: str) -> Optional[Subnet]: subnet = None for s in self.get_all_subnets(): if association_id in s.ipv6_cidr_block_associations: subnet = s return subnet - def associate_subnet_cidr_block(self, subnet_id, ipv6_cidr_block): + def associate_subnet_cidr_block( + self, subnet_id: str, ipv6_cidr_block: str + ) -> Dict[str, str]: subnet = self.get_subnet(subnet_id) if not subnet: raise InvalidSubnetIdError(subnet_id) association = subnet.attach_ipv6_cidr_block_associations(ipv6_cidr_block) return association - def disassociate_subnet_cidr_block(self, association_id): + def disassociate_subnet_cidr_block( + self, association_id: str + ) -> Tuple[str, Dict[str, str]]: subnet = self.get_subnet_from_ipv6_association(association_id) if not subnet: raise InvalidSubnetCidrBlockAssociationID(association_id) association = subnet.detach_subnet_cidr_block(association_id) return subnet.id, association - -class SubnetRouteTableAssociation(CloudFormationModel): - def __init__(self, route_table_id, subnet_id): - self.route_table_id = route_table_id - self.subnet_id = subnet_id - - @property - def physical_resource_id(self): - return self.route_table_id - - @staticmethod - def cloudformation_name_type(): - return None - - @staticmethod - def cloudformation_type(): - # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-subnetroutetableassociation.html - return "AWS::EC2::SubnetRouteTableAssociation" - - @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): - from ..models import ec2_backends - - properties = cloudformation_json["Properties"] - - route_table_id = properties["RouteTableId"] - subnet_id = properties["SubnetId"] - - ec2_backend = ec2_backends[account_id][region_name] - subnet_association = ec2_backend.create_subnet_association( - route_table_id=route_table_id, subnet_id=subnet_id - ) - return subnet_association - - -class SubnetRouteTableAssociationBackend: - def __init__(self): - self.subnet_associations = {} - - def create_subnet_association(self, route_table_id, subnet_id): + def create_subnet_association( + self, route_table_id: str, subnet_id: str + ) -> SubnetRouteTableAssociation: subnet_association = SubnetRouteTableAssociation(route_table_id, subnet_id) self.subnet_associations[f"{route_table_id}:{subnet_id}"] = subnet_association return subnet_association diff --git a/moto/ec2/responses/security_groups.py b/moto/ec2/responses/security_groups.py index a1b0bdb23..5b60660c5 100644 --- a/moto/ec2/responses/security_groups.py +++ b/moto/ec2/responses/security_groups.py @@ -205,9 +205,7 @@ class SecurityGroups(EC2BaseResponse): def revoke_security_group_egress(self): if self.is_not_dryrun("RevokeSecurityGroupEgress"): for args in self._process_rules_from_querystring(): - success = self.ec2_backend.revoke_security_group_egress(*args) - if not success: - return "Could not find a matching egress rule", dict(status=404) + self.ec2_backend.revoke_security_group_egress(*args) return REVOKE_SECURITY_GROUP_EGRESS_RESPONSE def revoke_security_group_ingress(self): diff --git a/moto/ec2/responses/spot_fleets.py b/moto/ec2/responses/spot_fleets.py index 504611054..e4306c330 100644 --- a/moto/ec2/responses/spot_fleets.py +++ b/moto/ec2/responses/spot_fleets.py @@ -35,11 +35,10 @@ class SpotFleets(BaseResponse): terminate_instances = self._get_param( "ExcessCapacityTerminationPolicy", if_none="Default" ) - successful = self.ec2_backend.modify_spot_fleet_request( + self.ec2_backend.modify_spot_fleet_request( spot_fleet_request_id, target_capacity, terminate_instances ) - template = self.response_template(MODIFY_SPOT_FLEET_REQUEST_TEMPLATE) - return template.render(successful=successful) + return self.response_template(MODIFY_SPOT_FLEET_REQUEST_TEMPLATE).render() def request_spot_fleet(self): spot_config = self._get_multi_param_dict("SpotFleetRequestConfig") @@ -80,7 +79,7 @@ REQUEST_SPOT_FLEET_TEMPLATE = """ 21681fea-9987-aef3-2121-example - {{ 'true' if successful else 'false' }} + true """ DESCRIBE_SPOT_FLEET_TEMPLATE = """ diff --git a/moto/ec2/utils.py b/moto/ec2/utils.py index 6624e989d..22eb15015 100644 --- a/moto/ec2/utils.py +++ b/moto/ec2/utils.py @@ -84,11 +84,11 @@ def random_reservation_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["reservation"]) -def random_security_group_id(): +def random_security_group_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["security-group"], size=17) -def random_security_group_rule_id(): +def random_security_group_rule_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["security-group-rule"], size=17) @@ -104,19 +104,19 @@ def random_snapshot_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["snapshot"]) -def random_spot_request_id(): +def random_spot_request_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["spot-instance-request"]) -def random_spot_fleet_request_id(): +def random_spot_fleet_request_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["spot-fleet-request"]) -def random_subnet_id(): +def random_subnet_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["subnet"]) -def random_subnet_ipv6_cidr_block_association_id(): +def random_subnet_ipv6_cidr_block_association_id() -> str: return random_id( prefix=EC2_RESOURCE_TO_PREFIX["subnet-ipv6-cidr-block-association"] ) @@ -347,7 +347,7 @@ def get_object_value(obj, attr): return val -def is_tag_filter(filter_name): +def is_tag_filter(filter_name: str) -> bool: return ( filter_name.startswith("tag:") or filter_name.startswith("tag-value") @@ -380,7 +380,7 @@ def add_tag_specification(tags): return tags -def tag_filter_matches(obj, filter_name, filter_values): +def tag_filter_matches(obj: Any, filter_name: str, filter_values: List[str]) -> bool: regex_filters = [re.compile(simple_aws_filter_to_re(f)) for f in filter_values] if filter_name == "tag-key": tag_values = get_obj_tag_names(obj) @@ -598,13 +598,13 @@ def is_valid_resource_id(resource_id): return resource_pattern_re.match(resource_id) is not None -def is_valid_cidr(cird): +def is_valid_cidr(cird: str) -> bool: cidr_pattern = r"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\/(\d|[1-2]\d|3[0-2]))$" cidr_pattern_re = re.compile(cidr_pattern) return cidr_pattern_re.match(cird) is not None -def is_valid_ipv6_cidr(cird): +def is_valid_ipv6_cidr(cird: str) -> bool: cidr_pattern = r"^s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:)))(%.+)?s*(\/([0-9]|[1-9][0-9]|1[0-1][0-9]|12[0-8]))?$" cidr_pattern_re = re.compile(cidr_pattern) return cidr_pattern_re.match(cird) is not None diff --git a/moto/packages/boto/ec2/launchspecification.py b/moto/packages/boto/ec2/launchspecification.py deleted file mode 100644 index a667063bb..000000000 --- a/moto/packages/boto/ec2/launchspecification.py +++ /dev/null @@ -1,49 +0,0 @@ -# Copyright (c) 2006-2012 Mitch Garnaat http://garnaat.org/ -# Copyright (c) 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. - -""" -Represents a launch specification for Spot instances. -""" - -from moto.packages.boto.ec2.ec2object import EC2Object - - -class LaunchSpecification(EC2Object): - def __init__(self, connection=None): - super(LaunchSpecification, self).__init__(connection) - self.key_name = None - self.instance_type = None - self.image_id = None - self.groups = [] - self.placement = None - self.kernel = None - self.ramdisk = None - self.monitored = False - self.subnet_id = None - self.lifecycle = None - self._in_monitoring_element = False - self.block_device_mapping = None - self.instance_profile = None - self.ebs_optimized = False - - def __repr__(self): - return "LaunchSpecification(%s)" % self.image_id diff --git a/moto/packages/boto/ec2/spotinstancerequest.py b/moto/packages/boto/ec2/spotinstancerequest.py deleted file mode 100644 index c8630e74a..000000000 --- a/moto/packages/boto/ec2/spotinstancerequest.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright (c) 2006-2010 Mitch Garnaat http://garnaat.org/ -# Copyright (c) 2010, Eucalyptus Systems, Inc. -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. - -""" -Represents an EC2 Spot Instance Request -""" - -from moto.packages.boto.ec2.ec2object import TaggedEC2Object - - -class SpotInstanceRequest(TaggedEC2Object): - """ - - :ivar id: The ID of the Spot Instance Request. - :ivar price: The maximum hourly price for any Spot Instance launched to - fulfill the request. - :ivar type: The Spot Instance request type. - :ivar state: The state of the Spot Instance request. - :ivar fault: The fault codes for the Spot Instance request, if any. - :ivar valid_from: The start date of the request. If this is a one-time - request, the request becomes active at this date and time and remains - active until all instances launch, the request expires, or the request is - canceled. If the request is persistent, the request becomes active at this - date and time and remains active until it expires or is canceled. - :ivar valid_until: The end date of the request. If this is a one-time - request, the request remains active until all instances launch, the request - is canceled, or this date is reached. If the request is persistent, it - remains active until it is canceled or this date is reached. - :ivar launch_group: The instance launch group. Launch groups are Spot - Instances that launch together and terminate together. - :ivar launched_availability_zone: foo - :ivar product_description: The Availability Zone in which the bid is - launched. - :ivar availability_zone_group: The Availability Zone group. If you specify - the same Availability Zone group for all Spot Instance requests, all Spot - Instances are launched in the same Availability Zone. - :ivar create_time: The time stamp when the Spot Instance request was - created. - :ivar launch_specification: Additional information for launching instances. - :ivar instance_id: The instance ID, if an instance has been launched to - fulfill the Spot Instance request. - :ivar status: The status code and status message describing the Spot - Instance request. - - """ - - def __init__(self, connection=None): - super(SpotInstanceRequest, self).__init__(connection) - self.id = None - self.price = None - self.type = None - self.state = None - self.fault = None - self.valid_from = None - self.valid_until = None - self.launch_group = None - self.launched_availability_zone = None - self.product_description = None - self.availability_zone_group = None - self.create_time = None - self.launch_specification = None - self.instance_id = None - self.status = None - - def __repr__(self): - return "SpotInstanceRequest:%s" % self.id diff --git a/setup.cfg b/setup.cfg index 671d5df1c..289e84d00 100644 --- a/setup.cfg +++ b/setup.cfg @@ -229,7 +229,7 @@ disable = W,C,R,E enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import [mypy] -files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/ec2/models/d*,moto/ec2/models/e*,moto/ec2/models/f*,moto/ec2/models/h*,moto/ec2/models/i*,moto/ec2/models/k*,moto/ec2/models/l*,moto/ec2/models/m*,moto/ec2/models/n*,moto/ec2/models/r*,moto/moto_api +files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/ec2/models/d*,moto/ec2/models/e*,moto/ec2/models/f*,moto/ec2/models/h*,moto/ec2/models/i*,moto/ec2/models/k*,moto/ec2/models/l*,moto/ec2/models/m*,moto/ec2/models/n*,moto/ec2/models/r*,moto/ec2/models/s*,moto/moto_api show_column_numbers=True show_error_codes = True disable_error_code=abstract