Techdebt: MyPy EC2 (s-models) (#5917)

This commit is contained in:
Bert Blommers 2023-02-11 10:48:22 -01:00 committed by GitHub
parent c7416c3414
commit bb1b5f511a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 618 additions and 653 deletions

View File

@ -277,7 +277,7 @@ def merge_dicts(
dict1.pop(key)
def aws_api_matches(pattern: str, string: str) -> bool:
def aws_api_matches(pattern: str, string: Any) -> bool:
"""
AWS API can match a value based on a glob, or an exact match
"""

View File

@ -103,7 +103,7 @@ class InvalidVPCIdError(EC2ClientError):
class InvalidSubnetIdError(EC2ClientError):
def __init__(self, subnet_id):
def __init__(self, subnet_id: str):
super().__init__(
"InvalidSubnetID.NotFound", f"The subnet ID '{subnet_id}' does not exist"
)
@ -182,14 +182,14 @@ class InvalidNetworkAttachmentIdError(EC2ClientError):
class InvalidSecurityGroupDuplicateError(EC2ClientError):
def __init__(self, name):
def __init__(self, name: str):
super().__init__(
"InvalidGroup.Duplicate", f"The security group '{name}' already exists"
)
class InvalidSecurityGroupNotFoundError(EC2ClientError):
def __init__(self, name: str):
def __init__(self, name: Any):
super().__init__(
"InvalidGroup.NotFound",
f"The security group '{name}' does not exist",
@ -197,7 +197,7 @@ class InvalidSecurityGroupNotFoundError(EC2ClientError):
class InvalidPermissionNotFoundError(EC2ClientError):
def __init__(self):
def __init__(self) -> None:
super().__init__(
"InvalidPermission.NotFound",
"The specified rule does not exist in this security group",
@ -205,7 +205,7 @@ class InvalidPermissionNotFoundError(EC2ClientError):
class InvalidPermissionDuplicateError(EC2ClientError):
def __init__(self):
def __init__(self) -> None:
super().__init__(
"InvalidPermission.Duplicate", "The specified rule already exists"
)
@ -503,14 +503,14 @@ class InvalidID(EC2ClientError):
class InvalidCIDRSubnetError(EC2ClientError):
def __init__(self, cidr):
def __init__(self, cidr: Any):
super().__init__(
"InvalidParameterValue", f"invalid CIDR subnet specification: {cidr}"
)
class RulesPerSecurityGroupLimitExceededError(EC2ClientError):
def __init__(self):
def __init__(self) -> None:
super().__init__(
"RulesPerSecurityGroupLimitExceeded",
"The maximum number of rules per security group " "has been reached.",
@ -518,7 +518,7 @@ class RulesPerSecurityGroupLimitExceededError(EC2ClientError):
class MotoNotImplementedError(NotImplementedError):
def __init__(self, blurb):
def __init__(self, blurb: str):
super().__init__(
f"{blurb} has not been implemented in Moto yet."
" Feel free to open an issue at"
@ -555,7 +555,9 @@ class OperationNotPermitted(EC2ClientError):
class InvalidAvailabilityZoneError(EC2ClientError):
def __init__(self, availability_zone_value, valid_availability_zones):
def __init__(
self, availability_zone_value: Optional[str], valid_availability_zones: str
):
super().__init__(
"InvalidParameterValue",
f"Value ({availability_zone_value}) for parameter availabilityZone is invalid. "
@ -580,12 +582,12 @@ class NetworkAclEntryAlreadyExistsError(EC2ClientError):
class InvalidSubnetRangeError(EC2ClientError):
def __init__(self, cidr_block):
def __init__(self, cidr_block: str):
super().__init__("InvalidSubnet.Range", f"The CIDR '{cidr_block}' is invalid.")
class InvalidCIDRBlockParameterError(EC2ClientError):
def __init__(self, cidr_block):
def __init__(self, cidr_block: str):
super().__init__(
"InvalidParameterValue",
f"Value ({cidr_block}) for parameter cidrBlock is invalid. This is not a valid CIDR block.",
@ -601,7 +603,7 @@ class InvalidDestinationCIDRBlockParameterError(EC2ClientError):
class InvalidSubnetConflictError(EC2ClientError):
def __init__(self, cidr_block):
def __init__(self, cidr_block: str):
super().__init__(
"InvalidSubnet.Conflict",
f"The CIDR '{cidr_block}' conflicts with another subnet",
@ -704,7 +706,7 @@ class InvalidTaggableResourceType(EC2ClientError):
class GenericInvalidParameterValueError(EC2ClientError):
def __init__(self, attribute, value):
def __init__(self, attribute: str, value: str):
super().__init__(
"InvalidParameterValue",
f"invalid value for parameter {attribute}: {value}",
@ -712,7 +714,7 @@ class GenericInvalidParameterValueError(EC2ClientError):
class InvalidSubnetCidrBlockAssociationID(EC2ClientError):
def __init__(self, association_id):
def __init__(self, association_id: str):
super().__init__(
"InvalidSubnetCidrBlockAssociationID.NotFound",
f"The subnet CIDR block with association ID '{association_id}' does not exist",

View File

@ -30,12 +30,8 @@ from .network_acls import NetworkAclBackend
from .availability_zones_and_regions import RegionsAndZonesBackend
from .route_tables import RouteBackend
from .security_groups import SecurityGroupBackend
from .spot_requests import (
SpotRequestBackend,
SpotPriceBackend,
SpotFleetBackend,
)
from .subnets import SubnetBackend, SubnetRouteTableAssociationBackend
from .spot_requests import SpotRequestBackend
from .subnets import SubnetBackend
from .tags import TagBackend
from .transit_gateway import TransitGatewayBackend
from .transit_gateway_route_tables import (
@ -95,7 +91,6 @@ class EC2Backend(
VPCBackend,
ManagedPrefixListBackend,
SubnetBackend,
SubnetRouteTableAssociationBackend,
FlowLogsBackend,
NetworkInterfaceBackend,
VPNConnectionBackend,
@ -104,9 +99,7 @@ class EC2Backend(
RouteBackend,
InternetGatewayBackend,
EgressOnlyInternetGatewayBackend,
SpotFleetBackend,
SpotRequestBackend,
SpotPriceBackend,
ElasticAddressBackend,
KeyPairBackend,
SettingsBackend,

View File

@ -154,7 +154,7 @@ class Fleet(TaggedEC2Resource):
self.fulfilled_capacity += added_weight
return self.spot_requests
def create_on_demand_requests(self, weight_to_add: float) -> List[Dict[str, Any]]:
def create_on_demand_requests(self, weight_to_add: float) -> None:
weight_map, added_weight = self.get_launch_spec_counts(weight_to_add)
for launch_spec, count in weight_map.items():
reservation = self.ec2_backend.add_instances( # type: ignore[attr-defined]
@ -183,7 +183,6 @@ class Fleet(TaggedEC2Resource):
}
)
self.fulfilled_capacity += added_weight
return self.on_demand_instances
def get_launch_spec_counts(
self, weight_to_add: float

View File

@ -2,11 +2,10 @@ import copy
import itertools
import json
from collections import defaultdict
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional, Tuple
from moto.core import CloudFormationModel
from moto.core.utils import aws_api_matches
from ..exceptions import (
DependencyViolationError,
InvalidCIDRSubnetError,
InvalidPermissionNotFoundError,
InvalidPermissionDuplicateError,
@ -27,16 +26,16 @@ from ..utils import (
)
class SecurityRule(object):
class SecurityRule:
def __init__(
self,
account_id,
ip_protocol,
from_port,
to_port,
ip_ranges,
source_groups,
prefix_list_ids=None,
account_id: str,
ip_protocol: str,
from_port: Optional[str],
to_port: Optional[str],
ip_ranges: Optional[List[Any]],
source_groups: List[Dict[str, Any]],
prefix_list_ids: Optional[List[Dict[str, str]]] = None,
):
self.account_id = account_id
self.id = random_security_group_rule_id()
@ -47,8 +46,8 @@ class SecurityRule(object):
self.from_port = self.to_port = None
if self.ip_protocol != "-1":
self.from_port = int(from_port)
self.to_port = int(to_port)
self.from_port = int(from_port) # type: ignore[arg-type]
self.to_port = int(to_port) # type: ignore[arg-type]
ip_protocol_keywords = {
"tcp": "tcp",
@ -70,10 +69,10 @@ class SecurityRule(object):
self.ip_protocol = proto if proto else self.ip_protocol
@property
def owner_id(self):
def owner_id(self) -> str:
return self.account_id
def __eq__(self, other):
def __eq__(self, other: "SecurityRule") -> bool: # type: ignore[override]
if self.ip_protocol != other.ip_protocol:
return False
ip_ranges = list(
@ -124,10 +123,9 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
self.name = name
self.group_name = self.name
self.description = description
self.ingress_rules = []
self.egress_rules = []
self.enis = {}
self.vpc_id = vpc_id
self.ingress_rules: List[SecurityRule] = []
self.egress_rules: List[SecurityRule] = []
self.vpc_id: Optional[str] = vpc_id
self.owner_id = ec2_backend.account_id
self.add_tags(tags or {})
self.is_default = is_default or False
@ -177,18 +175,23 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
}
@staticmethod
def cloudformation_name_type():
def cloudformation_name_type() -> str:
return "GroupName"
@staticmethod
def cloudformation_type():
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-securitygroup.html
return "AWS::EC2::SecurityGroup"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any
) -> "SecurityGroup":
from ..models import ec2_backends
properties = cloudformation_json["Properties"]
@ -228,14 +231,14 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
return security_group
@classmethod
def update_from_cloudformation_json(
def update_from_cloudformation_json( # type: ignore[misc]
cls,
original_resource,
new_resource_name,
cloudformation_json,
account_id,
region_name,
):
original_resource: Any,
new_resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
) -> "SecurityGroup":
cls._delete_security_group_given_vpc_id(
original_resource.name, original_resource.vpc_id, account_id, region_name
)
@ -244,9 +247,13 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
)
@classmethod
def delete_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name
):
def delete_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
) -> None:
properties = cloudformation_json["Properties"]
vpc_id = properties.get("VpcId")
cls._delete_security_group_given_vpc_id(
@ -255,8 +262,8 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
@classmethod
def _delete_security_group_given_vpc_id(
cls, resource_name, vpc_id, account_id, region_name
):
cls, resource_name: str, vpc_id: str, account_id: str, region_name: str
) -> None:
from ..models import ec2_backends
ec2_backend = ec2_backends[account_id][region_name]
@ -266,21 +273,23 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
if security_group:
security_group.delete(account_id, region_name)
def delete(self, account_id, region_name): # pylint: disable=unused-argument
def delete(
self, account_id: str, region_name: str # pylint: disable=unused-argument
) -> None:
"""Not exposed as part of the ELB API - used for CloudFormation."""
self.ec2_backend.delete_security_group(group_id=self.id)
@property
def physical_resource_id(self):
def physical_resource_id(self) -> str:
return self.id
def filter_description(self, values):
def filter_description(self, values: List[Any]) -> bool:
for value in values:
if aws_api_matches(value, self.description):
return True
return False
def filter_egress__ip_permission__cidr(self, values):
def filter_egress__ip_permission__cidr(self, values: List[Any]) -> bool:
for value in values:
for rule in self.egress_rules:
for cidr in rule.ip_ranges:
@ -288,16 +297,16 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
return True
return False
def filter_egress__ip_permission__from_port(self, values):
def filter_egress__ip_permission__from_port(self, values: List[Any]) -> bool:
for value in values:
for rule in self.egress_rules:
if rule.ip_protocol != -1 and aws_api_matches(
if rule.ip_protocol != "-1" and aws_api_matches(
value, str(rule.from_port)
):
return True
return False
def filter_egress__ip_permission__group_id(self, values):
def filter_egress__ip_permission__group_id(self, values: List[Any]) -> bool:
for value in values:
for rule in self.egress_rules:
for sg in rule.source_groups:
@ -305,7 +314,7 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
return True
return False
def filter_egress__ip_permission__group_name(self, values):
def filter_egress__ip_permission__group_name(self, values: List[Any]) -> bool:
for value in values:
for rule in self.egress_rules:
for group in rule.source_groups:
@ -313,46 +322,46 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
return True
return False
def filter_egress__ip_permission__ipv6_cidr(self, values):
def filter_egress__ip_permission__ipv6_cidr(self, values: List[Any]) -> bool:
raise MotoNotImplementedError("egress.ip-permission.ipv6-cidr filter")
def filter_egress__ip_permission__prefix_list_id(self, values):
def filter_egress__ip_permission__prefix_list_id(self, values: List[Any]) -> bool:
raise MotoNotImplementedError("egress.ip-permission.prefix-list-id filter")
def filter_egress__ip_permission__protocol(self, values):
def filter_egress__ip_permission__protocol(self, values: List[Any]) -> bool:
for value in values:
for rule in self.egress_rules:
if aws_api_matches(value, rule.ip_protocol):
return True
return False
def filter_egress__ip_permission__to_port(self, values):
def filter_egress__ip_permission__to_port(self, values: List[Any]) -> bool:
for value in values:
for rule in self.egress_rules:
if aws_api_matches(value, rule.to_port):
return True
return False
def filter_egress__ip_permission__user_id(self, values):
def filter_egress__ip_permission__user_id(self, values: List[Any]) -> bool:
for value in values:
for rule in self.egress_rules:
if aws_api_matches(value, rule.owner_id):
return True
return False
def filter_group_id(self, values):
def filter_group_id(self, values: List[Any]) -> bool:
for value in values:
if aws_api_matches(value, self.id):
return True
return False
def filter_group_name(self, values):
def filter_group_name(self, values: List[Any]) -> bool:
for value in values:
if aws_api_matches(value, self.group_name):
return True
return False
def filter_ip_permission__cidr(self, values):
def filter_ip_permission__cidr(self, values: List[Any]) -> bool:
for value in values:
for rule in self.ingress_rules:
for cidr in rule.ip_ranges:
@ -360,14 +369,14 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
return True
return False
def filter_ip_permission__from_port(self, values):
def filter_ip_permission__from_port(self, values: List[Any]) -> bool:
for value in values:
for rule in self.ingress_rules:
if aws_api_matches(value, rule.from_port):
return True
return False
def filter_ip_permission__group_id(self, values):
def filter_ip_permission__group_id(self, values: List[Any]) -> bool:
for value in values:
for rule in self.ingress_rules:
for group in rule.source_groups:
@ -375,7 +384,7 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
return True
return False
def filter_ip_permission__group_name(self, values):
def filter_ip_permission__group_name(self, values: List[Any]) -> bool:
for value in values:
for rule in self.ingress_rules:
for group in rule.source_groups:
@ -383,46 +392,46 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
return True
return False
def filter_ip_permission__ipv6_cidr(self, values):
def filter_ip_permission__ipv6_cidr(self, values: List[Any]) -> None:
raise MotoNotImplementedError("ip-permission.ipv6 filter")
def filter_ip_permission__prefix_list_id(self, values):
def filter_ip_permission__prefix_list_id(self, values: List[Any]) -> None:
raise MotoNotImplementedError("ip-permission.prefix-list-id filter")
def filter_ip_permission__protocol(self, values):
def filter_ip_permission__protocol(self, values: List[Any]) -> bool:
for value in values:
for rule in self.ingress_rules:
if aws_api_matches(value, rule.protocol):
if aws_api_matches(value, rule.ip_protocol):
return True
return False
def filter_ip_permission__to_port(self, values):
def filter_ip_permission__to_port(self, values: List[Any]) -> bool:
for value in values:
for rule in self.ingress_rules:
if aws_api_matches(value, rule.to_port):
return True
return False
def filter_ip_permission__user_id(self, values):
def filter_ip_permission__user_id(self, values: List[Any]) -> bool:
for value in values:
for rule in self.ingress_rules:
if aws_api_matches(value, rule.owner_id):
return True
return False
def filter_owner_id(self, values):
def filter_owner_id(self, values: List[Any]) -> bool:
for value in values:
if aws_api_matches(value, self.owner_id):
return True
return False
def filter_vpc_id(self, values):
def filter_vpc_id(self, values: List[Any]) -> bool:
for value in values:
if aws_api_matches(value, self.vpc_id):
return True
return False
def matches_filter(self, key, filter_value):
def matches_filter(self, key: str, filter_value: Any) -> Any:
if is_tag_filter(key):
tag_value = self.get_filter_value(key)
if isinstance(filter_value, list):
@ -431,56 +440,62 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
else:
return self.filters[key](filter_value)
def matches_filters(self, filters):
def matches_filters(self, filters: Any) -> bool:
for key, value in filters.items():
if not self.matches_filter(key, value):
return False
return True
@classmethod
def has_cfn_attr(cls, attr):
def has_cfn_attr(cls, attr: str) -> bool:
return attr in ["GroupId"]
def get_cfn_attribute(self, attribute_name):
def get_cfn_attribute(self, attribute_name: str) -> str:
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == "GroupId":
return self.id
raise UnformattedGetAttTemplateException()
def add_ingress_rule(self, rule):
def add_ingress_rule(self, rule: SecurityRule) -> None:
if rule in self.ingress_rules:
raise InvalidPermissionDuplicateError()
self.ingress_rules.append(rule)
def add_egress_rule(self, rule):
def add_egress_rule(self, rule: SecurityRule) -> None:
if rule in self.egress_rules:
raise InvalidPermissionDuplicateError()
self.egress_rules.append(rule)
def get_number_of_ingress_rules(self):
def get_number_of_ingress_rules(self) -> int:
return sum(
len(rule.ip_ranges) + len(rule.source_groups) for rule in self.ingress_rules
)
def get_number_of_egress_rules(self):
def get_number_of_egress_rules(self) -> int:
return sum(
len(rule.ip_ranges) + len(rule.source_groups) for rule in self.egress_rules
)
class SecurityGroupBackend:
def __init__(self):
def __init__(self) -> None:
# the key in the dict group is the vpc_id or None (non-vpc)
self.groups = defaultdict(dict)
self.groups: Dict[str, Dict[str, SecurityGroup]] = defaultdict(dict)
# This will help us in RuleLimitExceed errors.
self.sg_old_ingress_ruls = {}
self.sg_old_egress_ruls = {}
self.sg_old_ingress_ruls: Dict[str, List[SecurityRule]] = {}
self.sg_old_egress_ruls: Dict[str, List[SecurityRule]] = {}
def create_security_group(
self, name, description, vpc_id=None, tags=None, force=False, is_default=None
):
vpc_id = vpc_id or self.default_vpc.id
self,
name: str,
description: str,
vpc_id: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
force: bool = False,
is_default: Optional[bool] = None,
) -> SecurityGroup:
vpc_id = vpc_id or self.default_vpc.id # type: ignore[attr-defined]
if not description:
raise MissingParameterError("GroupDescription")
@ -502,25 +517,34 @@ class SecurityGroupBackend:
self.groups[vpc_id][group_id] = group
return group
def describe_security_groups(self, group_ids=None, groupnames=None, filters=None):
def describe_security_groups(
self,
group_ids: Optional[List[str]] = None,
groupnames: Optional[List[str]] = None,
filters: Any = None,
) -> List[SecurityGroup]:
all_groups = self.groups.copy()
matches = itertools.chain(*[x.copy().values() for x in all_groups.values()])
matches = list(
itertools.chain(*[x.copy().values() for x in all_groups.values()])
)
if group_ids:
matches = [grp for grp in matches if grp.id in group_ids]
if len(group_ids) > len(matches):
unknown_ids = set(group_ids) - set(matches)
unknown_ids = set(group_ids) - set(matches) # type: ignore[arg-type]
raise InvalidSecurityGroupNotFoundError(unknown_ids)
if groupnames:
matches = [grp for grp in matches if grp.name in groupnames]
if len(groupnames) > len(matches):
unknown_names = set(groupnames) - set(matches)
unknown_names = set(groupnames) - set(matches) # type: ignore[arg-type]
raise InvalidSecurityGroupNotFoundError(unknown_names)
if filters:
matches = [grp for grp in matches if grp.matches_filters(filters)]
return matches
def describe_security_group_rules(self, group_ids=None, filters=None):
def describe_security_group_rules(
self, group_ids: Optional[List[str]] = None, filters: Any = None
) -> List[SecurityRule]:
matches = self.describe_security_groups(group_ids=group_ids, filters=filters)
if not matches:
raise InvalidSecurityGroupNotFoundError(
@ -533,13 +557,13 @@ class SecurityGroupBackend:
return rules
def _delete_security_group(self, vpc_id, group_id):
vpc_id = vpc_id or self.default_vpc.id
if self.groups[vpc_id][group_id].enis:
raise DependencyViolationError(f"{group_id} is being utilized by ENIs")
return self.groups[vpc_id].pop(group_id)
def _delete_security_group(self, vpc_id: Optional[str], group_id: str) -> None:
vpc_id = vpc_id or self.default_vpc.id # type: ignore[attr-defined]
self.groups[vpc_id].pop(group_id)
def delete_security_group(self, name=None, group_id=None):
def delete_security_group(
self, name: Optional[str] = None, group_id: Optional[str] = None
) -> None:
if group_id:
# loop over all the SGs, find the right one
for vpc_id, groups in self.groups.items():
@ -562,8 +586,11 @@ class SecurityGroupBackend:
for group in all_groups:
if group.id == group_id:
return group
return None
def get_security_group_from_name(self, name, vpc_id=None):
def get_security_group_from_name(
self, name: str, vpc_id: Optional[str] = None
) -> Optional[SecurityGroup]:
if vpc_id:
for group in self.groups[vpc_id].values():
if group.name == name:
@ -573,8 +600,11 @@ class SecurityGroupBackend:
for group in self.groups[vpc_id].values():
if group.name == name:
return group
return None
def get_security_group_by_name_or_id(self, group_name_or_id, vpc_id=None):
def get_security_group_by_name_or_id(
self, group_name_or_id: str, vpc_id: Optional[str] = None
) -> Optional[SecurityGroup]:
# try searching by id, fallbacks to name search
group = self.get_security_group_from_id(group_name_or_id)
@ -582,22 +612,25 @@ class SecurityGroupBackend:
group = self.get_security_group_from_name(group_name_or_id, vpc_id)
return group
def get_default_security_group(self, vpc_id=None):
for group in self.groups[vpc_id or self.default_vpc.id].values():
def get_default_security_group(
self, vpc_id: Optional[str] = None
) -> Optional[SecurityGroup]:
for group in self.groups[vpc_id or self.default_vpc.id].values(): # type: ignore[attr-defined]
if group.is_default:
return group
return None
def authorize_security_group_ingress(
self,
group_name_or_id,
ip_protocol,
from_port,
to_port,
ip_ranges,
source_groups=None,
prefix_list_ids=None,
vpc_id=None,
):
group_name_or_id: str,
ip_protocol: str,
from_port: str,
to_port: str,
ip_ranges: List[Any],
source_groups: Optional[List[Dict[str, str]]] = None,
prefix_list_ids: Optional[List[Dict[str, str]]] = None,
vpc_id: Optional[str] = None,
) -> Tuple[SecurityRule, SecurityGroup]:
group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id)
if group is None:
raise InvalidSecurityGroupNotFoundError(group_name_or_id)
@ -629,7 +662,7 @@ class SecurityGroupBackend:
_source_groups = self._add_source_group(source_groups, vpc_id)
security_rule = SecurityRule(
self.account_id,
self.account_id, # type: ignore[attr-defined]
ip_protocol,
from_port,
to_port,
@ -677,22 +710,22 @@ class SecurityGroupBackend:
def revoke_security_group_ingress(
self,
group_name_or_id,
ip_protocol,
from_port,
to_port,
ip_ranges,
source_groups=None,
prefix_list_ids=None,
vpc_id=None,
):
group_name_or_id: str,
ip_protocol: str,
from_port: str,
to_port: str,
ip_ranges: List[Any],
source_groups: Optional[List[Dict[str, Any]]] = None,
prefix_list_ids: Optional[List[Dict[str, str]]] = None,
vpc_id: Optional[str] = None,
) -> SecurityRule:
group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id)
group: SecurityGroup = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id) # type: ignore[assignment]
_source_groups = self._add_source_group(source_groups, vpc_id)
security_rule = SecurityRule(
self.account_id,
self.account_id, # type: ignore[attr-defined]
ip_protocol,
from_port,
to_port,
@ -718,7 +751,7 @@ class SecurityGroupBackend:
security_rule.prefix_list_ids.extend(
[
item
for item in prefix_list_ids
for item in prefix_list_ids # type: ignore[union-attr]
if item not in rule.prefix_list_ids
]
)
@ -742,15 +775,15 @@ class SecurityGroupBackend:
def authorize_security_group_egress(
self,
group_name_or_id,
ip_protocol,
from_port,
to_port,
ip_ranges,
source_groups=None,
prefix_list_ids=None,
vpc_id=None,
):
group_name_or_id: str,
ip_protocol: str,
from_port: str,
to_port: str,
ip_ranges: List[Any],
source_groups: Optional[List[Dict[str, Any]]] = None,
prefix_list_ids: Optional[List[Dict[str, str]]] = None,
vpc_id: Optional[str] = None,
) -> Tuple[SecurityRule, SecurityGroup]:
group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id)
if group is None:
raise InvalidSecurityGroupNotFoundError(group_name_or_id)
@ -786,7 +819,7 @@ class SecurityGroupBackend:
_source_groups = self._add_source_group(source_groups, vpc_id)
security_rule = SecurityRule(
self.account_id,
self.account_id, # type: ignore[attr-defined]
ip_protocol,
from_port,
to_port,
@ -835,17 +868,17 @@ class SecurityGroupBackend:
def revoke_security_group_egress(
self,
group_name_or_id,
ip_protocol,
from_port,
to_port,
ip_ranges,
source_groups=None,
prefix_list_ids=None,
vpc_id=None,
):
group_name_or_id: str,
ip_protocol: str,
from_port: str,
to_port: str,
ip_ranges: List[Any],
source_groups: Optional[List[Dict[str, Any]]] = None,
prefix_list_ids: Optional[List[Dict[str, str]]] = None,
vpc_id: Optional[str] = None,
) -> SecurityRule:
group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id)
group: SecurityGroup = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id) # type: ignore[assignment]
_source_groups = self._add_source_group(source_groups, vpc_id)
@ -857,14 +890,14 @@ class SecurityGroupBackend:
# ip_ranges = [ip.get("CidrIp") if ip.get("CidrIp") == "0.0.0.0/0" else ip]
if group.vpc_id:
vpc = self.vpcs.get(group.vpc_id)
vpc = self.vpcs.get(group.vpc_id) # type: ignore[attr-defined]
if vpc and not len(vpc.get_cidr_block_association_set(ipv6=True)) > 0:
for item in ip_ranges.copy():
if "CidrIpv6" in item:
ip_ranges.remove(item)
security_rule = SecurityRule(
self.account_id,
self.account_id, # type: ignore[attr-defined]
ip_protocol,
from_port,
to_port,
@ -891,7 +924,7 @@ class SecurityGroupBackend:
security_rule.prefix_list_ids.extend(
[
item
for item in prefix_list_ids
for item in prefix_list_ids # type: ignore[union-attr]
if item not in rule.prefix_list_ids
]
)
@ -914,15 +947,15 @@ class SecurityGroupBackend:
def update_security_group_rule_descriptions_ingress(
self,
group_name_or_id,
ip_protocol,
from_port,
to_port,
ip_ranges,
source_groups=None,
prefix_list_ids=None,
vpc_id=None,
):
group_name_or_id: str,
ip_protocol: str,
from_port: str,
to_port: str,
ip_ranges: List[str],
source_groups: Optional[List[Dict[str, Any]]] = None,
prefix_list_ids: Optional[List[Dict[str, str]]] = None,
vpc_id: Optional[str] = None,
) -> SecurityGroup:
group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id)
if group is None:
@ -936,7 +969,7 @@ class SecurityGroupBackend:
if ip_ranges:
for cidr in ip_ranges:
if (
type(cidr) is dict
type(cidr) is dict # type: ignore
and not any(
[
is_valid_cidr(cidr.get("CidrIp", "")),
@ -951,7 +984,7 @@ class SecurityGroupBackend:
_source_groups = self._add_source_group(source_groups, vpc_id)
security_rule = SecurityRule(
self.account_id,
self.account_id, # type: ignore[attr-defined]
ip_protocol,
from_port,
to_port,
@ -970,15 +1003,15 @@ class SecurityGroupBackend:
def update_security_group_rule_descriptions_egress(
self,
group_name_or_id,
ip_protocol,
from_port,
to_port,
ip_ranges,
source_groups=None,
prefix_list_ids=None,
vpc_id=None,
):
group_name_or_id: str,
ip_protocol: str,
from_port: str,
to_port: str,
ip_ranges: List[str],
source_groups: Optional[List[Dict[str, Any]]] = None,
prefix_list_ids: Optional[List[Dict[str, str]]] = None,
vpc_id: Optional[str] = None,
) -> SecurityGroup:
group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id)
if group is None:
@ -992,7 +1025,7 @@ class SecurityGroupBackend:
if ip_ranges:
for cidr in ip_ranges:
if (
type(cidr) is dict
type(cidr) is dict # type: ignore
and not any(
[
is_valid_cidr(cidr.get("CidrIp", "")),
@ -1007,11 +1040,11 @@ class SecurityGroupBackend:
_source_groups = self._add_source_group(source_groups, vpc_id)
security_rule = SecurityRule(
self.account_id,
self.account_id, # type: ignore[attr-defined]
ip_protocol,
from_port,
to_port,
ip_ranges,
ip_ranges, # type: ignore[arg-type]
_source_groups,
prefix_list_ids,
)
@ -1024,7 +1057,9 @@ class SecurityGroupBackend:
self._sg_update_description(security_rule, rule)
return group
def _sg_update_description(self, security_rule, rule):
def _sg_update_description(
self, security_rule: SecurityRule, rule: SecurityRule
) -> None:
for item in security_rule.ip_ranges:
for cidr_item in rule.ip_ranges:
if cidr_item.get("CidrIp") == item.get("CidrIp"):
@ -1039,7 +1074,13 @@ class SecurityGroupBackend:
) or source_group.get("GroupName") == group.get("GroupName"):
source_group["Description"] = group.get("Description")
def _remove_items_from_rule(self, ip_ranges, _source_groups, prefix_list_ids, rule):
def _remove_items_from_rule(
self,
ip_ranges: List[Any],
_source_groups: List[Any],
prefix_list_ids: Optional[List[Any]],
rule: SecurityRule,
) -> None:
for item in ip_ranges:
if item not in rule.ip_ranges:
raise InvalidPermissionNotFoundError()
@ -1052,30 +1093,29 @@ class SecurityGroupBackend:
else:
rule.source_groups.remove(item)
for item in prefix_list_ids:
for item in prefix_list_ids: # type: ignore[union-attr]
if item not in rule.prefix_list_ids:
raise InvalidPermissionNotFoundError()
else:
rule.prefix_list_ids.remove(item)
pass
def _add_source_group(self, source_groups, vpc_id):
def _add_source_group(
self, source_groups: Optional[List[Dict[str, Any]]], vpc_id: Optional[str]
) -> List[Dict[str, Any]]:
_source_groups = []
for item in source_groups or []:
if "OwnerId" not in item:
item["OwnerId"] = self.account_id
item["OwnerId"] = self.account_id # type: ignore[attr-defined]
# for VPCs
if "GroupId" in item:
if not self.get_security_group_by_name_or_id(
item.get("GroupId"), vpc_id
):
raise InvalidSecurityGroupNotFoundError(item.get("GroupId"))
if not self.get_security_group_by_name_or_id(item["GroupId"], vpc_id):
raise InvalidSecurityGroupNotFoundError(item["GroupId"])
if "GroupName" in item:
source_group = self.get_security_group_by_name_or_id(
item.get("GroupName"), vpc_id
item["GroupName"], vpc_id
)
if not source_group:
raise InvalidSecurityGroupNotFoundError(item.get("GroupName"))
raise InvalidSecurityGroupNotFoundError(item["GroupName"])
else:
item["GroupId"] = source_group.id
item.pop("GroupName")
@ -1084,8 +1124,13 @@ class SecurityGroupBackend:
return _source_groups
def _verify_group_will_respect_rule_count_limit(
self, group, current_rule_nb, ip_ranges, source_groups=None, egress=False
):
self,
group: SecurityGroup,
current_rule_nb: int,
ip_ranges: List[str],
source_groups: Optional[List[Dict[str, str]]] = None,
egress: bool = False,
) -> None:
max_nb_rules = 60 if group.vpc_id else 100
future_group_nb_rules = current_rule_nb
if ip_ranges:
@ -1101,23 +1146,28 @@ class SecurityGroupBackend:
class SecurityGroupIngress(CloudFormationModel):
def __init__(self, security_group, properties):
def __init__(self, security_group: SecurityGroup, properties: Any):
self.security_group = security_group
self.properties = properties
@staticmethod
def cloudformation_name_type():
return None
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type():
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-securitygroupingress.html
return "AWS::EC2::SecurityGroupIngress"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any
) -> "SecurityGroupIngress":
from ..models import ec2_backends
properties = cloudformation_json["Properties"]

View File

@ -1,11 +1,11 @@
from collections import defaultdict
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING
from moto.core.common_models import CloudFormationModel
from moto.packages.boto.ec2.launchspecification import LaunchSpecification
from moto.packages.boto.ec2.spotinstancerequest import (
SpotInstanceRequest as BotoSpotRequest,
)
from moto.core.common_models import BaseModel, CloudFormationModel
if TYPE_CHECKING:
from moto.ec2.models.instances import Instance
from moto.ec2.models.security_groups import SecurityGroup
from .core import TaggedEC2Resource
from .instance_types import INSTANCE_TYPE_OFFERINGS
from ..utils import (
@ -16,43 +16,73 @@ from ..utils import (
)
class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource):
class LaunchSpecification(BaseModel):
def __init__(
self,
ec2_backend,
spot_request_id,
price,
image_id,
spot_instance_type,
valid_from,
valid_until,
launch_group,
availability_zone_group,
key_name,
security_groups,
user_data,
instance_type,
placement,
kernel_id,
ramdisk_id,
monitoring_enabled,
subnet_id,
tags,
spot_fleet_id,
instance_interruption_behaviour,
**kwargs,
kernel_id: Optional[str],
ramdisk_id: Optional[str],
image_id: Optional[str],
key_name: Optional[str],
instance_type: str,
placement: Optional[str],
monitored: bool,
subnet_id: str,
):
super().__init__(**kwargs)
ls = LaunchSpecification()
self.key_name = key_name
self.instance_type = instance_type
self.image_id = image_id
self.groups: List[SecurityGroup] = []
self.placement = placement
self.kernel = kernel_id
self.ramdisk = ramdisk_id
self.monitored = monitored
self.subnet_id = subnet_id
self.ebs_optimized = False
class SpotInstanceRequest(TaggedEC2Resource):
def __init__(
self,
ec2_backend: Any,
spot_request_id: str,
price: str,
image_id: str,
spot_instance_type: str,
valid_from: Optional[str],
valid_until: Optional[str],
launch_group: Optional[str],
availability_zone_group: Optional[str],
key_name: str,
security_groups: List[str],
user_data: Dict[str, Any],
instance_type: str,
placement: Optional[str],
kernel_id: Optional[str],
ramdisk_id: Optional[str],
monitoring_enabled: bool,
subnet_id: str,
tags: Dict[str, Dict[str, str]],
spot_fleet_id: Optional[str],
instance_interruption_behaviour: Optional[str],
):
super().__init__()
self.ec2_backend = ec2_backend
self.launch_specification = ls
self.launch_specification = LaunchSpecification(
kernel_id=kernel_id,
ramdisk_id=ramdisk_id,
image_id=image_id,
key_name=key_name,
instance_type=instance_type,
placement=placement,
monitored=monitoring_enabled,
subnet_id=subnet_id,
)
self.id = spot_request_id
self.state = "open"
self.status = "pending-evaluation"
self.status_message = "Your Spot request has been submitted for review, and is pending evaluation."
if price:
price = float(price)
price = f"{price:.6f}" # round up/down to 6 decimals
price = f"{float(price):.6f}" # round up/down to 6 decimals
self.price = price
self.type = spot_instance_type
self.valid_from = valid_from
@ -63,14 +93,6 @@ class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource):
instance_interruption_behaviour or "terminate"
)
self.user_data = user_data # NOT
ls.kernel = kernel_id
ls.ramdisk = ramdisk_id
ls.image_id = image_id
ls.key_name = key_name
ls.instance_type = instance_type
ls.placement = placement
ls.monitored = monitoring_enabled
ls.subnet_id = subnet_id
self.spot_fleet_id = spot_fleet_id
tag_map = tags.get("spot-instances-request", {})
self.add_tags(tag_map)
@ -80,18 +102,20 @@ class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource):
for group_name in security_groups:
group = self.ec2_backend.get_security_group_by_name_or_id(group_name)
if group:
ls.groups.append(group)
self.launch_specification.groups.append(group)
else:
# If not security groups, add the default
default_group = self.ec2_backend.get_security_group_by_name_or_id("default")
ls.groups.append(default_group)
self.launch_specification.groups.append(default_group)
self.instance = self.launch_instance()
self.state = "active"
self.status = "fulfilled"
self.status_message = ""
def get_filter_value(self, filter_name):
def get_filter_value(
self, filter_name: str, method_name: Optional[str] = None
) -> Any:
if filter_name == "state":
return self.state
elif filter_name == "spot-instance-request-id":
@ -99,7 +123,7 @@ class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource):
else:
return super().get_filter_value(filter_name, "DescribeSpotInstanceRequests")
def launch_instance(self):
def launch_instance(self) -> "Instance":
reservation = self.ec2_backend.add_instances(
image_id=self.launch_specification.image_id,
count=1,
@ -118,80 +142,7 @@ class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource):
return instance
class SpotRequestBackend:
def __init__(self):
self.spot_instance_requests = {}
def request_spot_instances(
self,
price,
image_id,
count,
spot_instance_type,
valid_from,
valid_until,
launch_group,
availability_zone_group,
key_name,
security_groups,
user_data,
instance_type,
placement,
kernel_id,
ramdisk_id,
monitoring_enabled,
subnet_id,
tags=None,
spot_fleet_id=None,
instance_interruption_behaviour=None,
):
requests = []
tags = tags or {}
for _ in range(count):
spot_request_id = random_spot_request_id()
request = SpotInstanceRequest(
self,
spot_request_id,
price,
image_id,
spot_instance_type,
valid_from,
valid_until,
launch_group,
availability_zone_group,
key_name,
security_groups,
user_data,
instance_type,
placement,
kernel_id,
ramdisk_id,
monitoring_enabled,
subnet_id,
tags,
spot_fleet_id,
instance_interruption_behaviour,
)
self.spot_instance_requests[spot_request_id] = request
requests.append(request)
return requests
def describe_spot_instance_requests(self, filters=None, spot_instance_ids=None):
requests = self.spot_instance_requests.copy().values()
if spot_instance_ids:
requests = [i for i in requests if i.id in spot_instance_ids]
return generic_filter(filters, requests)
def cancel_spot_instance_requests(self, request_ids):
requests = []
for request_id in request_ids:
requests.append(self.spot_instance_requests.pop(request_id))
return requests
class SpotFleetLaunchSpec(object):
class SpotFleetLaunchSpec:
def __init__(
self,
ebs_optimized: Any,
@ -224,18 +175,20 @@ class SpotFleetLaunchSpec(object):
class SpotFleetRequest(TaggedEC2Resource, CloudFormationModel):
def __init__(
self,
ec2_backend,
spot_fleet_request_id,
spot_price,
target_capacity,
iam_fleet_role,
allocation_strategy,
launch_specs,
launch_template_config,
instance_interruption_behaviour,
ec2_backend: Any,
spot_backend: "SpotRequestBackend",
spot_fleet_request_id: str,
spot_price: str,
target_capacity: str,
iam_fleet_role: str,
allocation_strategy: str,
launch_specs: List[Dict[str, Any]],
launch_template_config: Optional[List[Dict[str, Any]]],
instance_interruption_behaviour: Optional[str],
):
self.ec2_backend = ec2_backend
self.spot_backend = spot_backend
self.id = spot_fleet_request_id
self.spot_price = spot_price
self.target_capacity = int(target_capacity)
@ -289,26 +242,31 @@ class SpotFleetRequest(TaggedEC2Resource, CloudFormationModel):
)
)
self.spot_requests = []
self.spot_requests: List[SpotInstanceRequest] = []
self.create_spot_requests(self.target_capacity)
@property
def physical_resource_id(self):
def physical_resource_id(self) -> str:
return self.id
@staticmethod
def cloudformation_name_type():
return None
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type():
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-spotfleet.html
return "AWS::EC2::SpotFleet"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "SpotFleetRequest":
from ..models import ec2_backends
properties = cloudformation_json["Properties"]["SpotFleetRequestConfigData"]
@ -330,10 +288,12 @@ class SpotFleetRequest(TaggedEC2Resource, CloudFormationModel):
return spot_fleet_request
def get_launch_spec_counts(self, weight_to_add):
weight_map = defaultdict(int)
def get_launch_spec_counts(
self, weight_to_add: float
) -> Tuple[Dict[Any, int], float]:
weight_map: Dict[Any, int] = defaultdict(int)
weight_so_far = 0
weight_so_far = 0.0
if self.allocation_strategy == "diversified":
launch_spec_index = 0
while True:
@ -360,10 +320,10 @@ class SpotFleetRequest(TaggedEC2Resource, CloudFormationModel):
return weight_map, weight_so_far
def create_spot_requests(self, weight_to_add):
def create_spot_requests(self, weight_to_add: float) -> None:
weight_map, added_weight = self.get_launch_spec_counts(weight_to_add)
for launch_spec, count in weight_map.items():
requests = self.ec2_backend.request_spot_instances(
requests = self.spot_backend.request_spot_instances(
price=launch_spec.spot_price,
image_id=launch_spec.image_id,
count=count,
@ -386,9 +346,8 @@ class SpotFleetRequest(TaggedEC2Resource, CloudFormationModel):
)
self.spot_requests.extend(requests)
self.fulfilled_capacity += added_weight
return self.spot_requests
def terminate_instances(self):
def terminate_instances(self) -> None:
instance_ids = []
new_fulfilled_capacity = self.fulfilled_capacity
for req in self.spot_requests:
@ -408,50 +367,130 @@ class SpotFleetRequest(TaggedEC2Resource, CloudFormationModel):
self.spot_requests = [
req for req in self.spot_requests if req.instance.id not in instance_ids
]
self.ec2_backend.terminate_instances(instance_ids)
self.ec2_backend.terminate_instances(instance_ids) # type: ignore[attr-defined]
class SpotFleetBackend:
def __init__(self):
self.spot_fleet_requests = {}
class SpotRequestBackend:
def __init__(self) -> None:
self.spot_instance_requests: Dict[str, SpotInstanceRequest] = {}
self.spot_fleet_requests: Dict[str, SpotFleetRequest] = {}
def request_spot_instances(
self,
price: str,
image_id: str,
count: int,
spot_instance_type: str,
valid_from: Optional[str],
valid_until: Optional[str],
launch_group: Optional[str],
availability_zone_group: Optional[str],
key_name: str,
security_groups: List[str],
user_data: Dict[str, Any],
instance_type: str,
placement: Optional[str],
kernel_id: Optional[str],
ramdisk_id: Optional[str],
monitoring_enabled: bool,
subnet_id: str,
tags: Optional[Dict[str, Dict[str, str]]] = None,
spot_fleet_id: Optional[str] = None,
instance_interruption_behaviour: Optional[str] = None,
) -> List[SpotInstanceRequest]:
requests = []
tags = tags or {}
for _ in range(count):
spot_request_id = random_spot_request_id()
request = SpotInstanceRequest(
self,
spot_request_id,
price,
image_id,
spot_instance_type,
valid_from,
valid_until,
launch_group,
availability_zone_group,
key_name,
security_groups,
user_data,
instance_type,
placement,
kernel_id,
ramdisk_id,
monitoring_enabled,
subnet_id,
tags,
spot_fleet_id,
instance_interruption_behaviour,
)
self.spot_instance_requests[spot_request_id] = request
requests.append(request)
return requests
def describe_spot_instance_requests(
self, filters: Any = None, spot_instance_ids: Optional[List[str]] = None
) -> List[SpotInstanceRequest]:
requests = list(self.spot_instance_requests.values())
if spot_instance_ids:
requests = [i for i in requests if i.id in spot_instance_ids]
return generic_filter(filters, requests)
def cancel_spot_instance_requests(
self, request_ids: List[str]
) -> List[SpotInstanceRequest]:
requests = []
for request_id in request_ids:
requests.append(self.spot_instance_requests.pop(request_id))
return requests
def request_spot_fleet(
self,
spot_price,
target_capacity,
iam_fleet_role,
allocation_strategy,
launch_specs,
launch_template_config=None,
instance_interruption_behaviour=None,
):
spot_price: str,
target_capacity: str,
iam_fleet_role: str,
allocation_strategy: str,
launch_specs: List[Dict[str, Any]],
launch_template_config: Optional[List[Dict[str, Any]]] = None,
instance_interruption_behaviour: Optional[str] = None,
) -> SpotFleetRequest:
spot_fleet_request_id = random_spot_fleet_request_id()
request = SpotFleetRequest(
self,
spot_fleet_request_id,
spot_price,
target_capacity,
iam_fleet_role,
allocation_strategy,
launch_specs,
launch_template_config,
instance_interruption_behaviour,
ec2_backend=self,
spot_backend=self,
spot_fleet_request_id=spot_fleet_request_id,
spot_price=spot_price,
target_capacity=target_capacity,
iam_fleet_role=iam_fleet_role,
allocation_strategy=allocation_strategy,
launch_specs=launch_specs,
launch_template_config=launch_template_config,
instance_interruption_behaviour=instance_interruption_behaviour,
)
self.spot_fleet_requests[spot_fleet_request_id] = request
return request
def get_spot_fleet_request(self, spot_fleet_request_id):
def get_spot_fleet_request(
self, spot_fleet_request_id: str
) -> Optional[SpotFleetRequest]:
return self.spot_fleet_requests.get(spot_fleet_request_id)
def describe_spot_fleet_instances(self, spot_fleet_request_id):
def describe_spot_fleet_instances(
self, spot_fleet_request_id: str
) -> List[SpotInstanceRequest]:
spot_fleet = self.get_spot_fleet_request(spot_fleet_request_id)
if not spot_fleet:
return []
return spot_fleet.spot_requests
def describe_spot_fleet_requests(self, spot_fleet_request_ids):
requests = self.spot_fleet_requests.values()
def describe_spot_fleet_requests(
self, spot_fleet_request_ids: List[str]
) -> List[SpotFleetRequest]:
requests = list(self.spot_fleet_requests.values())
if spot_fleet_request_ids:
requests = [
@ -460,7 +499,9 @@ class SpotFleetBackend:
return requests
def cancel_spot_fleet_requests(self, spot_fleet_request_ids, terminate_instances):
def cancel_spot_fleet_requests(
self, spot_fleet_request_ids: List[str], terminate_instances: bool
) -> List[SpotFleetRequest]:
spot_requests = []
for spot_fleet_request_id in spot_fleet_request_ids:
spot_fleet = self.spot_fleet_requests[spot_fleet_request_id]
@ -474,8 +515,8 @@ class SpotFleetBackend:
return spot_requests
def modify_spot_fleet_request(
self, spot_fleet_request_id, target_capacity, terminate_instances
):
self, spot_fleet_request_id: str, target_capacity: int, terminate_instances: str
) -> None:
if target_capacity < 0:
raise ValueError("Cannot reduce spot fleet capacity below 0")
spot_fleet_request = self.spot_fleet_requests[spot_fleet_request_id]
@ -485,16 +526,15 @@ class SpotFleetBackend:
spot_fleet_request.create_spot_requests(delta)
elif delta < 0 and terminate_instances == "Default":
spot_fleet_request.terminate_instances()
return True
class SpotPriceBackend:
def describe_spot_price_history(self, instance_types=None, filters=None):
def describe_spot_price_history(
self, instance_types: Optional[List[str]] = None, filters: Any = None
) -> List[Dict[str, str]]:
matches = INSTANCE_TYPE_OFFERINGS["availability-zone"]
matches = matches.get(self.region_name, [])
matches = matches.get(self.region_name, []) # type: ignore[attr-defined]
def matches_filters(offering, filters):
def matches_filter(key, values):
def matches_filters(offering: Dict[str, Any], filters: Any) -> bool:
def matches_filter(key: str, values: List[str]) -> bool:
if key == "availability-zone":
return offering.get("Location") in values
elif key == "instance-type":

View File

@ -1,9 +1,13 @@
import ipaddress
import itertools
from collections import defaultdict
from typing import Any, Iterable, List, Optional
from typing import Any, Dict, Iterable, List, Optional, Tuple, TYPE_CHECKING, Set
from moto.core import CloudFormationModel
if TYPE_CHECKING:
from moto.ec2.models.instances import Instance
from moto.ec2.models.availability_zones_and_regions import Zone
from ..exceptions import (
GenericInvalidParameterValueError,
InvalidAvailabilityZoneError,
@ -26,15 +30,15 @@ from ..utils import (
class Subnet(TaggedEC2Resource, CloudFormationModel):
def __init__(
self,
ec2_backend,
subnet_id,
vpc_id,
cidr_block,
ipv6_cidr_block,
availability_zone,
default_for_az,
map_public_ip_on_launch,
assign_ipv6_address_on_creation=False,
ec2_backend: Any,
subnet_id: str,
vpc_id: str,
cidr_block: str,
ipv6_cidr_block: Optional[str],
availability_zone: Zone,
default_for_az: str,
map_public_ip_on_launch: str,
assign_ipv6_address_on_creation: bool = False,
):
self.ec2_backend = ec2_backend
self.id = subnet_id
@ -46,7 +50,7 @@ class Subnet(TaggedEC2Resource, CloudFormationModel):
self.default_for_az = default_for_az
self.map_public_ip_on_launch = map_public_ip_on_launch
self.assign_ipv6_address_on_creation = assign_ipv6_address_on_creation
self.ipv6_cidr_block_associations = {}
self.ipv6_cidr_block_associations: Dict[str, Dict[str, Any]] = {}
if ipv6_cidr_block:
self.attach_ipv6_cidr_block_associations(ipv6_cidr_block)
@ -55,30 +59,37 @@ class Subnet(TaggedEC2Resource, CloudFormationModel):
self.reserved_ips = [
next(self._subnet_ip_generator) for _ in range(0, 3)
] # Reserved by AWS
self._unused_ips = set() # if instance is destroyed hold IP here for reuse
self._subnet_ips = {} # has IP: instance
self._unused_ips: Set[
str
] = set() # if instance is destroyed hold IP here for reuse
self._subnet_ips: Dict[str, "Instance"] = {}
self.state = "available"
# Placeholder for response templates until Ipv6 support implemented.
self.ipv6_native = False
@property
def owner_id(self):
def owner_id(self) -> str:
return self.ec2_backend.account_id
@staticmethod
def cloudformation_name_type():
return None
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type():
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-subnet.html
return "AWS::EC2::Subnet"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "Subnet":
from ..models import ec2_backends
properties = cloudformation_json["Properties"]
@ -98,7 +109,7 @@ class Subnet(TaggedEC2Resource, CloudFormationModel):
return subnet
@property
def available_ip_addresses(self):
def available_ip_addresses(self) -> str:
enis = [
eni
for eni in self.ec2_backend.get_all_network_interfaces()
@ -111,18 +122,20 @@ class Subnet(TaggedEC2Resource, CloudFormationModel):
return str(self._available_ip_addresses - len(addresses_taken))
@property
def availability_zone(self):
def availability_zone(self) -> str:
return self._availability_zone.name
@property
def availability_zone_id(self):
def availability_zone_id(self) -> str:
return self._availability_zone.zone_id
@property
def physical_resource_id(self):
def physical_resource_id(self) -> str:
return self.id
def get_filter_value(self, filter_name):
def get_filter_value(
self, filter_name: str, method_name: Optional[str] = None
) -> Any:
"""
API Version 2014-10-01 defines the following filters for DescribeSubnets:
@ -155,36 +168,36 @@ class Subnet(TaggedEC2Resource, CloudFormationModel):
return super().get_filter_value(filter_name, "DescribeSubnets")
@classmethod
def has_cfn_attr(cls, attr):
def has_cfn_attr(cls, attr: str) -> bool:
return attr in ["AvailabilityZone"]
def get_cfn_attribute(self, attribute_name):
def get_cfn_attribute(self, attribute_name: str) -> None:
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == "AvailabilityZone":
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "AvailabilityZone" ]"')
raise UnformattedGetAttTemplateException()
def get_available_subnet_ip(self, instance):
def get_available_subnet_ip(self, instance: "Instance") -> str:
try:
new_ip = self._unused_ips.pop()
new_ip = str(self._unused_ips.pop())
except KeyError:
new_ip = next(self._subnet_ip_generator)
new_ip_v4 = next(self._subnet_ip_generator)
# Skips any IP's if they've been manually specified
while str(new_ip) in self._subnet_ips:
new_ip = next(self._subnet_ip_generator)
while str(new_ip_v4) in self._subnet_ips:
new_ip_v4 = next(self._subnet_ip_generator)
if new_ip == self.cidr.broadcast_address:
if new_ip_v4 == self.cidr.broadcast_address:
raise StopIteration() # Broadcast address cant be used obviously
new_ip = str(new_ip_v4)
# TODO StopIteration will be raised if no ip's available, not sure how aws handles this.
new_ip = str(new_ip)
self._subnet_ips[new_ip] = instance
return new_ip
def request_ip(self, ip, instance):
def request_ip(self, ip: str, instance: "Instance") -> None:
if ipaddress.ip_address(ip) not in self.cidr:
raise Exception(f"IP does not fall in the subnet CIDR of {self.cidr}")
@ -196,7 +209,6 @@ class Subnet(TaggedEC2Resource, CloudFormationModel):
pass
self._subnet_ips[ip] = instance
return ip
def del_subnet_ip(self, ip: str) -> None:
try:
@ -205,35 +217,77 @@ class Subnet(TaggedEC2Resource, CloudFormationModel):
except KeyError:
pass # Unknown IP
def attach_ipv6_cidr_block_associations(self, ipv6_cidr_block):
def attach_ipv6_cidr_block_associations(
self, ipv6_cidr_block: str
) -> Dict[str, str]:
association = {
"associationId": random_subnet_ipv6_cidr_block_association_id(),
"ipv6CidrBlock": ipv6_cidr_block,
"ipv6CidrBlockState": "associated",
}
self.ipv6_cidr_block_associations[
association.get("associationId")
] = association
self.ipv6_cidr_block_associations[association["associationId"]] = association
return association
def detach_subnet_cidr_block(self, association_id):
def detach_subnet_cidr_block(self, association_id: str) -> Dict[str, Any]:
association = self.ipv6_cidr_block_associations.get(association_id)
association["ipv6CidrBlockState"] = "disassociated"
return association
association["ipv6CidrBlockState"] = "disassociated" # type: ignore[index]
return association # type: ignore[return-value]
class SubnetRouteTableAssociation(CloudFormationModel):
def __init__(self, route_table_id: str, subnet_id: str):
self.route_table_id = route_table_id
self.subnet_id = subnet_id
@property
def physical_resource_id(self) -> str:
return self.route_table_id
@staticmethod
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-subnetroutetableassociation.html
return "AWS::EC2::SubnetRouteTableAssociation"
@classmethod
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "SubnetRouteTableAssociation":
from ..models import ec2_backends
properties = cloudformation_json["Properties"]
route_table_id = properties["RouteTableId"]
subnet_id = properties["SubnetId"]
ec2_backend = ec2_backends[account_id][region_name]
subnet_association = ec2_backend.create_subnet_association(
route_table_id=route_table_id, subnet_id=subnet_id
)
return subnet_association
class SubnetBackend:
def __init__(self):
def __init__(self) -> None:
# maps availability zone to dict of (subnet_id, subnet)
self.subnets = defaultdict(dict)
self.subnets: Dict[str, Dict[str, Subnet]] = defaultdict(dict)
self.subnet_associations: Dict[str, SubnetRouteTableAssociation] = {}
def get_subnet(self, subnet_id: str) -> Subnet:
for subnets in self.subnets.values():
if subnet_id in subnets:
return subnets[subnet_id]
for subnets_per_zone in self.subnets.values():
if subnet_id in subnets_per_zone:
return subnets_per_zone[subnet_id]
raise InvalidSubnetIdError(subnet_id)
def get_default_subnet(self, availability_zone):
def get_default_subnet(self, availability_zone: str) -> Subnet:
return [
subnet
for subnet in self.get_all_subnets(
@ -244,17 +298,16 @@ class SubnetBackend:
def create_subnet(
self,
vpc_id,
cidr_block,
ipv6_cidr_block=None,
availability_zone=None,
availability_zone_id=None,
tags=None,
):
vpc_id: str,
cidr_block: str,
ipv6_cidr_block: Optional[str] = None,
availability_zone: Optional[str] = None,
availability_zone_id: Optional[str] = None,
tags: Optional[List[Dict[str, str]]] = None,
) -> Subnet:
subnet_id = random_subnet_id()
vpc = self.get_vpc(
vpc_id
) # Validate VPC exists and the supplied CIDR block is a subnet of the VPC's
# Validate VPC exists and the supplied CIDR block is a subnet of the VPC's
vpc = self.get_vpc(vpc_id) # type: ignore[attr-defined]
vpc_cidr_blocks = [
ipaddress.IPv4Network(
str(cidr_block_association["cidr_block"]), strict=False
@ -334,108 +387,73 @@ class SubnetBackend:
)
for tag in tags or []:
tag_key = tag.get("Key")
tag_value = tag.get("Value")
subnet.add_tag(tag_key, tag_value)
subnet.add_tag(tag["Key"], tag["Value"])
# AWS associates a new subnet with the default Network ACL
self.associate_default_network_acl_with_subnet(subnet_id, vpc_id)
self.subnets[availability_zone][subnet_id] = subnet
self.associate_default_network_acl_with_subnet(subnet_id, vpc_id) # type: ignore[attr-defined]
self.subnets[availability_zone][subnet_id] = subnet # type: ignore[index]
return subnet
def get_all_subnets(
self, subnet_ids: Optional[List[str]] = None, filters: Optional[Any] = None
) -> Iterable[Subnet]:
# Extract a list of all subnets
matches = itertools.chain(
*[x.copy().values() for x in self.subnets.copy().values()]
matches = list(
itertools.chain(*[x.copy().values() for x in self.subnets.copy().values()])
)
if subnet_ids:
matches = [sn for sn in matches if sn.id in subnet_ids]
if len(subnet_ids) > len(matches):
unknown_ids = set(subnet_ids) - set(matches)
unknown_ids = set(subnet_ids) - set(matches) # type: ignore[arg-type]
raise InvalidSubnetIdError(list(unknown_ids)[0])
if filters:
matches = generic_filter(filters, matches)
return matches
def delete_subnet(self, subnet_id):
def delete_subnet(self, subnet_id: str) -> Subnet:
for subnets in self.subnets.values():
if subnet_id in subnets:
return subnets.pop(subnet_id, None)
return subnets.pop(subnet_id)
raise InvalidSubnetIdError(subnet_id)
def modify_subnet_attribute(self, subnet_id, attr_name, attr_value):
def modify_subnet_attribute(
self, subnet_id: str, attr_name: str, attr_value: str
) -> None:
subnet = self.get_subnet(subnet_id)
if attr_name in ("map_public_ip_on_launch", "assign_ipv6_address_on_creation"):
setattr(subnet, attr_name, attr_value)
else:
raise InvalidParameterValueError(attr_name)
def get_subnet_from_ipv6_association(self, association_id):
def get_subnet_from_ipv6_association(self, association_id: str) -> Optional[Subnet]:
subnet = None
for s in self.get_all_subnets():
if association_id in s.ipv6_cidr_block_associations:
subnet = s
return subnet
def associate_subnet_cidr_block(self, subnet_id, ipv6_cidr_block):
def associate_subnet_cidr_block(
self, subnet_id: str, ipv6_cidr_block: str
) -> Dict[str, str]:
subnet = self.get_subnet(subnet_id)
if not subnet:
raise InvalidSubnetIdError(subnet_id)
association = subnet.attach_ipv6_cidr_block_associations(ipv6_cidr_block)
return association
def disassociate_subnet_cidr_block(self, association_id):
def disassociate_subnet_cidr_block(
self, association_id: str
) -> Tuple[str, Dict[str, str]]:
subnet = self.get_subnet_from_ipv6_association(association_id)
if not subnet:
raise InvalidSubnetCidrBlockAssociationID(association_id)
association = subnet.detach_subnet_cidr_block(association_id)
return subnet.id, association
class SubnetRouteTableAssociation(CloudFormationModel):
def __init__(self, route_table_id, subnet_id):
self.route_table_id = route_table_id
self.subnet_id = subnet_id
@property
def physical_resource_id(self):
return self.route_table_id
@staticmethod
def cloudformation_name_type():
return None
@staticmethod
def cloudformation_type():
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-subnetroutetableassociation.html
return "AWS::EC2::SubnetRouteTableAssociation"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
from ..models import ec2_backends
properties = cloudformation_json["Properties"]
route_table_id = properties["RouteTableId"]
subnet_id = properties["SubnetId"]
ec2_backend = ec2_backends[account_id][region_name]
subnet_association = ec2_backend.create_subnet_association(
route_table_id=route_table_id, subnet_id=subnet_id
)
return subnet_association
class SubnetRouteTableAssociationBackend:
def __init__(self):
self.subnet_associations = {}
def create_subnet_association(self, route_table_id, subnet_id):
def create_subnet_association(
self, route_table_id: str, subnet_id: str
) -> SubnetRouteTableAssociation:
subnet_association = SubnetRouteTableAssociation(route_table_id, subnet_id)
self.subnet_associations[f"{route_table_id}:{subnet_id}"] = subnet_association
return subnet_association

View File

@ -205,9 +205,7 @@ class SecurityGroups(EC2BaseResponse):
def revoke_security_group_egress(self):
if self.is_not_dryrun("RevokeSecurityGroupEgress"):
for args in self._process_rules_from_querystring():
success = self.ec2_backend.revoke_security_group_egress(*args)
if not success:
return "Could not find a matching egress rule", dict(status=404)
self.ec2_backend.revoke_security_group_egress(*args)
return REVOKE_SECURITY_GROUP_EGRESS_RESPONSE
def revoke_security_group_ingress(self):

View File

@ -35,11 +35,10 @@ class SpotFleets(BaseResponse):
terminate_instances = self._get_param(
"ExcessCapacityTerminationPolicy", if_none="Default"
)
successful = self.ec2_backend.modify_spot_fleet_request(
self.ec2_backend.modify_spot_fleet_request(
spot_fleet_request_id, target_capacity, terminate_instances
)
template = self.response_template(MODIFY_SPOT_FLEET_REQUEST_TEMPLATE)
return template.render(successful=successful)
return self.response_template(MODIFY_SPOT_FLEET_REQUEST_TEMPLATE).render()
def request_spot_fleet(self):
spot_config = self._get_multi_param_dict("SpotFleetRequestConfig")
@ -80,7 +79,7 @@ REQUEST_SPOT_FLEET_TEMPLATE = """<RequestSpotFleetResponse xmlns="http://ec2.ama
MODIFY_SPOT_FLEET_REQUEST_TEMPLATE = """<ModifySpotFleetRequestResponse xmlns="http://ec2.amazonaws.com/doc/2016-09-15/">
<requestId>21681fea-9987-aef3-2121-example</requestId>
<return>{{ 'true' if successful else 'false' }}</return>
<return>true</return>
</ModifySpotFleetRequestResponse>"""
DESCRIBE_SPOT_FLEET_TEMPLATE = """<DescribeSpotFleetRequestsResponse xmlns="http://ec2.amazonaws.com/doc/2016-09-15/">

View File

@ -84,11 +84,11 @@ def random_reservation_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["reservation"])
def random_security_group_id():
def random_security_group_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["security-group"], size=17)
def random_security_group_rule_id():
def random_security_group_rule_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["security-group-rule"], size=17)
@ -104,19 +104,19 @@ def random_snapshot_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["snapshot"])
def random_spot_request_id():
def random_spot_request_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["spot-instance-request"])
def random_spot_fleet_request_id():
def random_spot_fleet_request_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["spot-fleet-request"])
def random_subnet_id():
def random_subnet_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["subnet"])
def random_subnet_ipv6_cidr_block_association_id():
def random_subnet_ipv6_cidr_block_association_id() -> str:
return random_id(
prefix=EC2_RESOURCE_TO_PREFIX["subnet-ipv6-cidr-block-association"]
)
@ -347,7 +347,7 @@ def get_object_value(obj, attr):
return val
def is_tag_filter(filter_name):
def is_tag_filter(filter_name: str) -> bool:
return (
filter_name.startswith("tag:")
or filter_name.startswith("tag-value")
@ -380,7 +380,7 @@ def add_tag_specification(tags):
return tags
def tag_filter_matches(obj, filter_name, filter_values):
def tag_filter_matches(obj: Any, filter_name: str, filter_values: List[str]) -> bool:
regex_filters = [re.compile(simple_aws_filter_to_re(f)) for f in filter_values]
if filter_name == "tag-key":
tag_values = get_obj_tag_names(obj)
@ -598,13 +598,13 @@ def is_valid_resource_id(resource_id):
return resource_pattern_re.match(resource_id) is not None
def is_valid_cidr(cird):
def is_valid_cidr(cird: str) -> bool:
cidr_pattern = r"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\/(\d|[1-2]\d|3[0-2]))$"
cidr_pattern_re = re.compile(cidr_pattern)
return cidr_pattern_re.match(cird) is not None
def is_valid_ipv6_cidr(cird):
def is_valid_ipv6_cidr(cird: str) -> bool:
cidr_pattern = r"^s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:)))(%.+)?s*(\/([0-9]|[1-9][0-9]|1[0-1][0-9]|12[0-8]))?$"
cidr_pattern_re = re.compile(cidr_pattern)
return cidr_pattern_re.match(cird) is not None

View File

@ -1,49 +0,0 @@
# Copyright (c) 2006-2012 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
"""
Represents a launch specification for Spot instances.
"""
from moto.packages.boto.ec2.ec2object import EC2Object
class LaunchSpecification(EC2Object):
def __init__(self, connection=None):
super(LaunchSpecification, self).__init__(connection)
self.key_name = None
self.instance_type = None
self.image_id = None
self.groups = []
self.placement = None
self.kernel = None
self.ramdisk = None
self.monitored = False
self.subnet_id = None
self.lifecycle = None
self._in_monitoring_element = False
self.block_device_mapping = None
self.instance_profile = None
self.ebs_optimized = False
def __repr__(self):
return "LaunchSpecification(%s)" % self.image_id

View File

@ -1,85 +0,0 @@
# Copyright (c) 2006-2010 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2010, Eucalyptus Systems, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
"""
Represents an EC2 Spot Instance Request
"""
from moto.packages.boto.ec2.ec2object import TaggedEC2Object
class SpotInstanceRequest(TaggedEC2Object):
"""
:ivar id: The ID of the Spot Instance Request.
:ivar price: The maximum hourly price for any Spot Instance launched to
fulfill the request.
:ivar type: The Spot Instance request type.
:ivar state: The state of the Spot Instance request.
:ivar fault: The fault codes for the Spot Instance request, if any.
:ivar valid_from: The start date of the request. If this is a one-time
request, the request becomes active at this date and time and remains
active until all instances launch, the request expires, or the request is
canceled. If the request is persistent, the request becomes active at this
date and time and remains active until it expires or is canceled.
:ivar valid_until: The end date of the request. If this is a one-time
request, the request remains active until all instances launch, the request
is canceled, or this date is reached. If the request is persistent, it
remains active until it is canceled or this date is reached.
:ivar launch_group: The instance launch group. Launch groups are Spot
Instances that launch together and terminate together.
:ivar launched_availability_zone: foo
:ivar product_description: The Availability Zone in which the bid is
launched.
:ivar availability_zone_group: The Availability Zone group. If you specify
the same Availability Zone group for all Spot Instance requests, all Spot
Instances are launched in the same Availability Zone.
:ivar create_time: The time stamp when the Spot Instance request was
created.
:ivar launch_specification: Additional information for launching instances.
:ivar instance_id: The instance ID, if an instance has been launched to
fulfill the Spot Instance request.
:ivar status: The status code and status message describing the Spot
Instance request.
"""
def __init__(self, connection=None):
super(SpotInstanceRequest, self).__init__(connection)
self.id = None
self.price = None
self.type = None
self.state = None
self.fault = None
self.valid_from = None
self.valid_until = None
self.launch_group = None
self.launched_availability_zone = None
self.product_description = None
self.availability_zone_group = None
self.create_time = None
self.launch_specification = None
self.instance_id = None
self.status = None
def __repr__(self):
return "SpotInstanceRequest:%s" % self.id

View File

@ -229,7 +229,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/ec2/models/d*,moto/ec2/models/e*,moto/ec2/models/f*,moto/ec2/models/h*,moto/ec2/models/i*,moto/ec2/models/k*,moto/ec2/models/l*,moto/ec2/models/m*,moto/ec2/models/n*,moto/ec2/models/r*,moto/moto_api
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/ec2/models/d*,moto/ec2/models/e*,moto/ec2/models/f*,moto/ec2/models/h*,moto/ec2/models/i*,moto/ec2/models/k*,moto/ec2/models/l*,moto/ec2/models/m*,moto/ec2/models/n*,moto/ec2/models/r*,moto/ec2/models/s*,moto/moto_api
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract