Techdebt: MyPy EC2 (i-models) (#5903)

This commit is contained in:
Bert Blommers 2023-02-05 19:26:25 -01:00 committed by GitHub
parent 5c93bd6443
commit 9870a7af6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 378 additions and 276 deletions

View File

@ -174,13 +174,14 @@ class FakeLaunchConfiguration(CloudFormationModel):
def create_from_instance(
cls, name: str, instance: Instance, backend: "AutoScalingBackend"
) -> "FakeLaunchConfiguration":
security_group_names = [sg.name for sg in instance.security_groups]
config = backend.create_launch_configuration(
name=name,
image_id=instance.image_id,
kernel_id="",
ramdisk_id="",
key_name=instance.key_name,
security_groups=instance.security_groups,
security_groups=security_group_names,
user_data=instance.user_data,
instance_type=instance.instance_type,
instance_monitoring=False,

View File

@ -23,7 +23,7 @@ class EC2ClientError(RESTError):
# EC2 uses <RequestID> as tag name in the XML response
request_id_tag_name = "RequestID"
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any):
kwargs.setdefault("template", "custom_response")
self.templates["custom_response"] = EC2_ERROR_RESPONSE
super().__init__(*args, **kwargs)
@ -38,7 +38,7 @@ class DefaultVpcAlreadyExists(EC2ClientError):
class DependencyViolationError(EC2ClientError):
def __init__(self, message):
def __init__(self, message: str):
super().__init__("DependencyViolation", message)
@ -189,7 +189,7 @@ class InvalidSecurityGroupDuplicateError(EC2ClientError):
class InvalidSecurityGroupNotFoundError(EC2ClientError):
def __init__(self, name):
def __init__(self, name: str):
super().__init__(
"InvalidGroup.NotFound",
f"The security group '{name}' does not exist",
@ -235,7 +235,7 @@ class RouteAlreadyExistsError(EC2ClientError):
class InvalidInstanceIdError(EC2ClientError):
def __init__(self, instance_id):
def __init__(self, instance_id: Any):
if isinstance(instance_id, str):
instance_id = [instance_id]
if len(instance_id) > 1:
@ -246,7 +246,9 @@ class InvalidInstanceIdError(EC2ClientError):
class InvalidInstanceTypeError(EC2ClientError):
def __init__(self, instance_type, error_type="InvalidInstanceType.NotFound"):
def __init__(
self, instance_type: Any, error_type: str = "InvalidInstanceType.NotFound"
):
if isinstance(instance_type, str):
msg = f"The instance type '{instance_type}' does not exist"
else:
@ -395,7 +397,7 @@ class InvalidServiceName(EC2ClientError):
class InvalidFilter(EC2ClientError):
def __init__(self, filter_name, error_type="InvalidFilter"):
def __init__(self, filter_name: str, error_type: str = "InvalidFilter"):
super().__init__(error_type, f"The filter '{filter_name}' is invalid")
@ -449,7 +451,7 @@ class InvalidParameterValueErrorTagNull(EC2ClientError):
class InvalidParameterValueErrorUnknownAttribute(EC2ClientError):
def __init__(self, parameter_value):
def __init__(self, parameter_value: str):
super().__init__(
"InvalidParameterValue",
f"Value ({parameter_value}) for parameter attribute is invalid. Unknown attribute.",
@ -457,14 +459,14 @@ class InvalidParameterValueErrorUnknownAttribute(EC2ClientError):
class InvalidGatewayIDError(EC2ClientError):
def __init__(self, gateway_id):
def __init__(self, gateway_id: str):
super().__init__(
"InvalidGatewayID.NotFound", f"The eigw ID '{gateway_id}' does not exist"
)
class InvalidInternetGatewayIdError(EC2ClientError):
def __init__(self, internet_gateway_id):
def __init__(self, internet_gateway_id: str):
super().__init__(
"InvalidInternetGatewayID.NotFound",
f"InternetGatewayID {internet_gateway_id} does not exist.",
@ -472,7 +474,7 @@ class InvalidInternetGatewayIdError(EC2ClientError):
class GatewayNotAttachedError(EC2ClientError):
def __init__(self, internet_gateway_id, vpc_id):
def __init__(self, internet_gateway_id: str, vpc_id: str):
super().__init__(
"Gateway.NotAttached",
f"InternetGatewayID {internet_gateway_id} is not attached to a VPC {vpc_id}.",
@ -562,7 +564,7 @@ class InvalidAvailabilityZoneError(EC2ClientError):
class AvailabilityZoneNotFromRegionError(EC2ClientError):
def __init__(self, availability_zone_value):
def __init__(self, availability_zone_value: str):
super().__init__(
"InvalidParameterValue",
f"Invalid Availability Zone ({availability_zone_value})",
@ -630,7 +632,7 @@ class OperationNotPermitted3(EC2ClientError):
class OperationNotPermitted4(EC2ClientError):
def __init__(self, instance_id):
def __init__(self, instance_id: str):
super().__init__(
"OperationNotPermitted",
f"The instance '{instance_id}' may not be terminated. Modify its 'disableApiTermination' instance attribute and try again.",
@ -670,7 +672,7 @@ class InvalidParameterDependency(EC2ClientError):
class IncorrectStateIamProfileAssociationError(EC2ClientError):
def __init__(self, instance_id):
def __init__(self, instance_id: str):
super().__init__(
"IncorrectState",
f"There is an existing association for instance {instance_id}",
@ -678,7 +680,7 @@ class IncorrectStateIamProfileAssociationError(EC2ClientError):
class InvalidAssociationIDIamProfileAssociationError(EC2ClientError):
def __init__(self, association_id):
def __init__(self, association_id: str):
super().__init__(
"InvalidAssociationID.NotFound",
f"An invalid association-id of '{association_id}' was given",

View File

@ -1,8 +1,11 @@
from typing import Any, Dict, Optional, List, Union
from typing import Any, Dict, Optional, List, Union, TYPE_CHECKING
from moto.core import CloudFormationModel
from ..exceptions import InvalidNetworkAttachmentIdError, InvalidNetworkInterfaceIdError
from .core import TaggedEC2Resource
from .security_groups import SecurityGroup
if TYPE_CHECKING:
from .instances import Instance
from ..utils import (
random_eni_id,
generate_dns_from_ip,
@ -29,19 +32,19 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
):
self.ec2_backend = ec2_backend
self.id = random_eni_id()
self.device_index = device_index
self.device_index: Optional[int] = device_index
if isinstance(private_ip_address, list) and private_ip_address:
private_ip_address = private_ip_address[0]
self.private_ip_address = private_ip_address or None
self.private_ip_address: Optional[str] = private_ip_address or None # type: ignore
self.private_ip_addresses: List[Dict[str, Any]] = private_ip_addresses or []
self.ipv6_addresses = kwargs.get("ipv6_addresses") or []
self.subnet = subnet
if isinstance(subnet, str):
self.subnet = self.ec2_backend.get_subnet(subnet)
self.instance = None
self.attachment_id = None
self.attach_time = None
self.instance: Optional[Instance] = None
self.attachment_id: Optional[str] = None
self.attach_time: Optional[str] = None
self.delete_on_termination = False
self.description = description
self.source_dest_check = True
@ -326,7 +329,7 @@ class NetworkInterfaceBackend:
def detach_network_interface(self, attachment_id: str) -> None:
for eni in self.enis.values():
if eni.attachment_id == attachment_id:
eni.instance.detach_eni(eni) # type: ignore[attr-defined]
eni.instance.detach_eni(eni) # type: ignore
return
raise InvalidNetworkAttachmentIdError(attachment_id)

View File

@ -1,4 +1,8 @@
from typing import Any, Dict, List, Optional, Tuple
from moto.core import CloudFormationModel
from moto.ec2.models.instances import Instance
from moto.iam.models import InstanceProfile
from ..exceptions import (
IncorrectStateIamProfileAssociationError,
InvalidAssociationIDIamProfileAssociationError,
@ -11,7 +15,13 @@ from ..utils import (
class IamInstanceProfileAssociation(CloudFormationModel):
def __init__(self, ec2_backend, association_id, instance, iam_instance_profile):
def __init__(
self,
ec2_backend: Any,
association_id: str,
instance: Instance,
iam_instance_profile: InstanceProfile,
):
self.ec2_backend = ec2_backend
self.id = association_id
self.instance = instance
@ -25,37 +35,46 @@ class IamInstanceProfileAssociation(CloudFormationModel):
class IamInstanceProfileAssociationBackend:
def __init__(self):
self.iam_instance_profile_associations = {}
def __init__(self) -> None:
self.iam_instance_profile_associations: Dict[
str, IamInstanceProfileAssociation
] = {}
def associate_iam_instance_profile(
self, instance_id, iam_instance_profile_name=None, iam_instance_profile_arn=None
):
self,
instance_id: str,
iam_instance_profile_name: Optional[str] = None,
iam_instance_profile_arn: Optional[str] = None,
) -> IamInstanceProfileAssociation:
iam_association_id = random_iam_instance_profile_association_id()
instance_profile = filter_iam_instance_profiles(
self.account_id, iam_instance_profile_arn, iam_instance_profile_name
self.account_id, iam_instance_profile_arn, iam_instance_profile_name # type: ignore[attr-defined]
)
if instance_id in self.iam_instance_profile_associations.keys():
raise IncorrectStateIamProfileAssociationError(instance_id)
iam_instance_profile_associations = IamInstanceProfileAssociation(
iam_instance_profile_association = IamInstanceProfileAssociation(
self,
iam_association_id,
self.get_instance(instance_id) if instance_id else None,
self.get_instance(instance_id) if instance_id else None, # type: ignore[attr-defined]
instance_profile,
)
# Regarding to AWS there can be only one association with ec2.
self.iam_instance_profile_associations[
instance_id
] = iam_instance_profile_associations
return iam_instance_profile_associations
] = iam_instance_profile_association
return iam_instance_profile_association
def describe_iam_instance_profile_associations(
self, association_ids, filters=None, max_results=100, next_token=None
):
associations_list = []
self,
association_ids: List[str],
filters: Any = None,
max_results: int = 100,
next_token: Optional[str] = None,
) -> Tuple[List[IamInstanceProfileAssociation], Optional[str]]:
associations_list: List[IamInstanceProfileAssociation] = []
if association_ids:
for association in self.iam_instance_profile_associations.values():
if association.id in association_ids:
@ -77,33 +96,35 @@ class IamInstanceProfileAssociationBackend:
return associations_page, new_next_token
def disassociate_iam_instance_profile(self, association_id):
iam_instance_profile_associations = None
def disassociate_iam_instance_profile(
self, association_id: str
) -> IamInstanceProfileAssociation:
iam_instance_profile_association = None
for association_key in self.iam_instance_profile_associations.keys():
if (
self.iam_instance_profile_associations[association_key].id
== association_id
):
iam_instance_profile_associations = (
iam_instance_profile_association = (
self.iam_instance_profile_associations[association_key]
)
del self.iam_instance_profile_associations[association_key]
# Deleting once and avoiding `RuntimeError: dictionary changed size during iteration`
break
if not iam_instance_profile_associations:
if not iam_instance_profile_association:
raise InvalidAssociationIDIamProfileAssociationError(association_id)
return iam_instance_profile_associations
return iam_instance_profile_association
def replace_iam_instance_profile_association(
self,
association_id,
iam_instance_profile_name=None,
iam_instance_profile_arn=None,
):
association_id: str,
iam_instance_profile_name: Optional[str] = None,
iam_instance_profile_arn: Optional[str] = None,
) -> IamInstanceProfileAssociation:
instance_profile = filter_iam_instance_profiles(
self.account_id, iam_instance_profile_arn, iam_instance_profile_name
self.account_id, iam_instance_profile_arn, iam_instance_profile_name # type: ignore[attr-defined]
)
iam_instance_profile_association = None

View File

@ -1,5 +1,5 @@
import pathlib
from typing import Any, Dict
from typing import Any, Dict, List, Optional
from os import listdir
from ..utils import generic_filter
@ -13,18 +13,18 @@ INSTANCE_FAMILIES = list(set([i.split(".")[0] for i in INSTANCE_TYPES.keys()]))
root = pathlib.Path(__file__).parent
offerings_path = "../resources/instance_type_offerings"
INSTANCE_TYPE_OFFERINGS = {}
INSTANCE_TYPE_OFFERINGS: Dict[str, Any] = {}
for _location_type in listdir(root / offerings_path):
INSTANCE_TYPE_OFFERINGS[_location_type] = {}
for _region in listdir(root / offerings_path / _location_type):
full_path = offerings_path + "/" + _location_type + "/" + _region
res = load_resource(__name__, full_path)
for instance in res:
instance["LocationType"] = _location_type
instance["LocationType"] = _location_type # type: ignore
INSTANCE_TYPE_OFFERINGS[_location_type][_region.replace(".json", "")] = res
class InstanceType(dict):
class InstanceType(Dict[str, Any]):
_filter_attributes = {
"auto-recovery-supported": ["AutoRecoverySupported"],
"bare-metal": ["BareMetal"],
@ -106,21 +106,21 @@ class InstanceType(dict):
"vcpu-info.valid-threads-per-core": ["VCpuInfo", "ValidThreadsPerCore"],
} # fmt: skip
def __init__(self, name):
def __init__(self, name: str):
self.name = name
self.update(INSTANCE_TYPES[name])
def __getattr__(self, name):
def __getattr__(self, name: str) -> str:
return self[name]
def __setattr__(self, name, value):
def __setattr__(self, name: str, value: str) -> None:
self[name] = value
def __repr__(self):
def __repr__(self) -> str:
return f"<InstanceType: {self.name}>"
def get_filter_value(self, filter_name):
def stringify(v):
def get_filter_value(self, filter_name: str) -> Any:
def stringify(v: Any) -> Any:
if isinstance(v, (bool, int)):
return str(v).lower()
elif isinstance(v, list):
@ -130,7 +130,7 @@ class InstanceType(dict):
path = self._filter_attributes.get(filter_name)
if not path:
raise InvalidFilter(filter_name, error_type="InvalidParameterValue")
value = self
value: Any = self
for key in path:
value = (value or {}).get(key)
return stringify(value)
@ -139,7 +139,9 @@ class InstanceType(dict):
class InstanceTypeBackend:
instance_types = list(map(InstanceType, INSTANCE_TYPES.keys()))
def describe_instance_types(self, instance_types=None, filters=None):
def describe_instance_types(
self, instance_types: Optional[List[str]] = None, filters: Any = None
) -> List[InstanceType]:
matches = self.instance_types
if instance_types:
matches = [t for t in matches if t.get("InstanceType") in instance_types]
@ -156,24 +158,28 @@ class InstanceTypeBackend:
class InstanceTypeOfferingBackend:
def describe_instance_type_offerings(self, location_type=None, filters=None):
def describe_instance_type_offerings(
self,
location_type: Optional[str] = None,
filters: Optional[Dict[str, Any]] = None,
) -> List[Any]:
location_type = location_type or "region"
matches = INSTANCE_TYPE_OFFERINGS[location_type]
matches = matches.get(self.region_name, [])
matches = matches.get(self.region_name, []) # type: ignore[attr-defined]
matches = [
o for o in matches if self.matches_filters(o, filters or {}, location_type)
]
return matches
def matches_filters(self, offering, filters, location_type):
def matches_filter(key, values):
def matches_filters(
self, offering: Dict[str, Any], filters: Any, location_type: str
) -> bool:
def matches_filter(key: str, values: List[str]) -> bool:
if key == "location":
if location_type in ("availability-zone", "availability-zone-id"):
return offering.get("Location") in values
elif location_type == "region":
return any(
v for v in values if offering.get("Location").startswith(v)
)
return any(v for v in values if offering["Location"].startswith(v))
else:
return False
elif key == "instance-type":

View File

@ -2,16 +2,20 @@ import copy
import warnings
from collections import OrderedDict
from datetime import datetime
from typing import Any, List, Tuple, Optional
from typing import Any, Dict, ItemsView, List, Tuple, Optional, Set
from moto import settings
from moto.core import CloudFormationModel
from moto.core.utils import camelcase_to_underscores
from moto.ec2.models.fleets import Fleet
from moto.ec2.models.elastic_network_interfaces import NetworkInterface
from moto.ec2.models.launch_templates import LaunchTemplateVersion
from moto.ec2.models.instance_types import (
INSTANCE_TYPE_OFFERINGS,
InstanceTypeOfferingBackend,
)
from moto.ec2.models.security_groups import SecurityGroup
from moto.ec2.models.subnets import Subnet
from moto.packages.boto.ec2.blockdevicemapping import BlockDeviceMapping
from moto.packages.boto.ec2.instance import Instance as BotoInstance
from moto.packages.boto.ec2.instance import Reservation
@ -37,14 +41,14 @@ from ..utils import (
from .core import TaggedEC2Resource
class InstanceState(object):
def __init__(self, name="pending", code=0):
class InstanceState:
def __init__(self, name: str = "pending", code: int = 0):
self.name = name
self.code = code
class StateReason(object):
def __init__(self, message="", code=""):
class StateReason:
def __init__(self, message: str = "", code: str = ""):
self.message = message
self.code = code
@ -67,12 +71,19 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
"disableApiStop",
}
def __init__(self, ec2_backend, image_id, user_data, security_groups, **kwargs):
def __init__(
self,
ec2_backend: Any,
image_id: str,
user_data: Any,
security_groups: List[SecurityGroup],
**kwargs: Any,
):
super().__init__()
self.ec2_backend = ec2_backend
self.id: str = random_instance_id()
self.id = random_instance_id()
self.owner_id = ec2_backend.account_id
self.lifecycle = kwargs.get("lifecycle")
self.lifecycle: Optional[str] = kwargs.get("lifecycle")
nics = copy.deepcopy(kwargs.get("nics", []))
@ -80,9 +91,9 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
if launch_template_arg and not image_id:
# the image id from the template should be used
template_version = ec2_backend._get_template_from_args(launch_template_arg)
self.image_id: str = template_version.image_id
self.image_id = template_version.image_id
else:
self.image_id: str = image_id
self.image_id = image_id # type: ignore
# Check if we have tags to process
if launch_template_arg:
template_version = ec2_backend._get_template_from_args(launch_template_arg)
@ -150,7 +161,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
self.user_data[0] = self.user_data[0].decode("utf-8")
if self.subnet_id:
subnet = ec2_backend.get_subnet(self.subnet_id)
subnet: Subnet = ec2_backend.get_subnet(self.subnet_id)
self._placement.zone = subnet.availability_zone
if self.associate_public_ip is None:
@ -163,7 +174,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
self.block_device_mapping: BlockDeviceMapping = BlockDeviceMapping()
self._private_ips = set()
self._private_ips: Set[str] = set()
self.prep_nics(
nics,
private_ip=kwargs.get("private_ip"),
@ -172,17 +183,17 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
)
@property
def vpc_id(self):
def vpc_id(self) -> Optional[str]:
if self.subnet_id:
subnet = self.ec2_backend.get_subnet(self.subnet_id)
subnet: Subnet = self.ec2_backend.get_subnet(self.subnet_id)
return subnet.vpc_id
if self.nics and 0 in self.nics:
return self.nics[0].subnet.vpc_id
return None
def __del__(self):
def __del__(self) -> None:
try:
subnet = self.ec2_backend.get_subnet(self.subnet_id)
subnet: Subnet = self.ec2_backend.get_subnet(self.subnet_id)
for ip in self._private_ips:
subnet.del_subnet_ip(ip)
except Exception:
@ -192,35 +203,33 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
def add_block_device(
self,
size,
device_path,
snapshot_id=None,
encrypted=False,
delete_on_termination=False,
kms_key_id=None,
volume_type=None,
iops=None,
):
volume = self.ec2_backend.create_volume(
size: int,
device_path: str,
snapshot_id: Optional[str],
encrypted: bool,
delete_on_termination: bool,
kms_key_id: Optional[str],
volume_type: Optional[str],
) -> None:
volume = self.ec2_backend.create_volume( # type: ignore[attr-defined]
size=size,
zone_name=self._placement.zone,
snapshot_id=snapshot_id,
encrypted=encrypted,
kms_key_id=kms_key_id,
volume_type=volume_type,
iops=iops,
)
self.ec2_backend.attach_volume(
volume.id, self.id, device_path, delete_on_termination
)
def setup_defaults(self):
def setup_defaults(self) -> None:
# Default have an instance with root volume should you not wish to
# override with attach volume cmd.
volume = self.ec2_backend.create_volume(size=8, zone_name=self._placement.zone)
self.ec2_backend.attach_volume(volume.id, self.id, "/dev/sda1", True)
def teardown_defaults(self):
def teardown_defaults(self) -> None:
for device_path in list(self.block_device_mapping.keys()):
volume = self.block_device_mapping[device_path]
volume_id = volume.volume_id
@ -229,47 +238,53 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
self.ec2_backend.delete_volume(volume_id)
@property
def get_block_device_mapping(self):
def get_block_device_mapping(self) -> ItemsView[str, Any]: # type: ignore[misc]
return self.block_device_mapping.items()
@property
def private_ip(self):
def private_ip(self) -> Optional[str]:
return self.nics[0].private_ip_address
@property
def private_dns(self):
formatted_ip = self.private_ip.replace(".", "-")
def private_dns(self) -> str:
formatted_ip = self.private_ip.replace(".", "-") # type: ignore[union-attr]
if self.region_name == "us-east-1":
return f"ip-{formatted_ip}.ec2.internal"
else:
return f"ip-{formatted_ip}.{self.region_name}.compute.internal"
@property
def public_ip(self):
def public_ip(self) -> Optional[str]:
return self.nics[0].public_ip
@property
def public_dns(self):
def public_dns(self) -> Optional[str]:
if self.public_ip:
formatted_ip = self.public_ip.replace(".", "-")
if self.region_name == "us-east-1":
return f"ec2-{formatted_ip}.compute-1.amazonaws.com"
else:
return f"ec2-{formatted_ip}.{self.region_name}.compute.amazonaws.com"
@staticmethod
def cloudformation_name_type():
return None
@staticmethod
def cloudformation_type():
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-instance.html
return "AWS::EC2::Instance"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "Instance":
from ..models import ec2_backends
properties = cloudformation_json["Properties"]
@ -309,9 +324,13 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
return instance
@classmethod
def delete_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name
):
def delete_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
) -> None:
from ..models import ec2_backends
ec2_backend = ec2_backends[account_id][region_name]
@ -332,10 +351,10 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
instance.delete(account_id, region_name)
@property
def physical_resource_id(self):
def physical_resource_id(self) -> str:
return self.id
def start(self):
def start(self) -> InstanceState:
previous_state = copy.copy(self._state)
for nic in self.nics.values():
@ -349,7 +368,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
return previous_state
def stop(self):
def stop(self) -> InstanceState:
previous_state = copy.copy(self._state)
for nic in self.nics.values():
@ -368,13 +387,15 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
return previous_state
def is_running(self):
def is_running(self) -> bool:
return self._state.name == "running"
def delete(self, account_id, region): # pylint: disable=unused-argument
def delete(
self, account_id: str, region: str # pylint: disable=unused-argument
) -> None:
self.terminate()
def terminate(self):
def terminate(self) -> InstanceState:
previous_state = copy.copy(self._state)
for nic in self.nics.values():
@ -427,7 +448,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
return previous_state
def reboot(self):
def reboot(self) -> None:
self._state.name = "running"
self._state.code = 16
@ -435,23 +456,28 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
self._state_reason = StateReason()
@property
def dynamic_group_list(self):
def dynamic_group_list(self) -> List[SecurityGroup]:
return self.security_groups
def _get_private_ip_from_nic(self, nic):
def _get_private_ip_from_nic(self, nic: Dict[str, Any]) -> Optional[str]:
private_ip = nic.get("PrivateIpAddress")
if private_ip:
return private_ip
for address in nic.get("PrivateIpAddresses", []):
if address.get("Primary") == "true":
return address.get("PrivateIpAddress")
return None
def prep_nics(
self, nic_spec, private_ip=None, associate_public_ip=None, security_groups=None
):
self.nics = {}
self,
nic_spec: List[Dict[str, Any]],
private_ip: Optional[str] = None,
associate_public_ip: Optional[bool] = None,
security_groups: Optional[List[SecurityGroup]] = None,
) -> None:
self.nics: Dict[int, NetworkInterface] = {}
for nic in nic_spec:
if int(nic.get("DeviceIndex")) == 0:
if int(nic.get("DeviceIndex")) == 0: # type: ignore[arg-type]
nic_associate_public_ip = nic.get("AssociatePublicIpAddress")
if nic_associate_public_ip is not None:
associate_public_ip = nic_associate_public_ip == "true"
@ -460,7 +486,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
break
if self.subnet_id:
subnet = self.ec2_backend.get_subnet(self.subnet_id)
subnet: Subnet = self.ec2_backend.get_subnet(self.subnet_id)
if not private_ip:
private_ip = subnet.get_available_subnet_ip(instance=self)
else:
@ -487,7 +513,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
# Flesh out data structures and associations
for nic in nic_spec:
device_index = int(nic.get("DeviceIndex"))
device_index = int(nic.get("DeviceIndex")) # type: ignore[arg-type]
nic_id = nic.get("NetworkInterfaceId")
if nic_id:
@ -502,18 +528,20 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
nic.update(primary_nic)
if "SubnetId" in nic:
subnet = self.ec2_backend.get_subnet(nic["SubnetId"])
nic_subnet: Subnet = self.ec2_backend.get_subnet(nic["SubnetId"])
else:
# Get default Subnet
zone = self._placement.zone
subnet = self.ec2_backend.get_default_subnet(availability_zone=zone)
nic_subnet = self.ec2_backend.get_default_subnet(
availability_zone=zone
)
group_ids = nic.get("SecurityGroupId") or []
if security_groups:
group_ids.extend([group.id for group in security_groups])
use_nic = self.ec2_backend.create_network_interface(
subnet,
nic_subnet,
nic.get("PrivateIpAddress"),
device_index=device_index,
public_ip_auto_assign=nic.get("AssociatePublicIpAddress", False),
@ -522,7 +550,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
self.attach_eni(use_nic, device_index)
def attach_eni(self, eni, device_index):
def attach_eni(self, eni: NetworkInterface, device_index: int) -> str:
device_index = int(device_index)
self.nics[device_index] = eni
@ -535,8 +563,8 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
return eni.attachment_id
def detach_eni(self, eni):
self.nics.pop(eni.device_index, None)
def detach_eni(self, eni: NetworkInterface) -> None:
self.nics.pop(eni.device_index, None) # type: ignore[arg-type]
eni.instance = None
eni.attachment_id = None
eni.attach_time = None
@ -544,7 +572,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
eni.device_index = None
@classmethod
def has_cfn_attr(cls, attr):
def has_cfn_attr(cls, attr: str) -> bool:
return attr in [
"AvailabilityZone",
"PrivateDnsName",
@ -553,7 +581,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
"PublicIp",
]
def get_cfn_attribute(self, attribute_name):
def get_cfn_attribute(self, attribute_name: str) -> Any:
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == "AvailabilityZone":
@ -568,7 +596,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
return self.public_ip
raise UnformattedGetAttTemplateException()
def applies(self, filters):
def applies(self, filters: List[Dict[str, Any]]) -> bool:
if filters:
applicable = False
for f in filters:
@ -585,10 +613,10 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
class InstanceBackend:
def __init__(self):
self.reservations = OrderedDict()
def __init__(self) -> None:
self.reservations: Dict[str, Reservation] = OrderedDict()
def get_instance(self, instance_id) -> Instance:
def get_instance(self, instance_id: str) -> Instance:
for instance in self.all_instances():
if instance.id == instance_id:
return instance
@ -605,9 +633,9 @@ class InstanceBackend:
location_type = "availability-zone" if kwargs.get("placement") else "region"
default_region = "us-east-1"
if settings.ENABLE_KEYPAIR_VALIDATION:
self.describe_key_pairs(key_names=[kwargs.get("key_name")])
self.describe_key_pairs(key_names=[kwargs.get("key_name")]) # type: ignore[attr-defined]
if settings.ENABLE_AMI_VALIDATION:
self.describe_images(ami_ids=[image_id] if image_id else [])
self.describe_images(ami_ids=[image_id] if image_id else []) # type: ignore[attr-defined]
valid_instance_types = INSTANCE_TYPE_OFFERINGS[location_type]
if "region_name" in kwargs and kwargs.get("placement"):
valid_availability_zones = {
@ -636,20 +664,19 @@ class InstanceBackend:
raise InvalidInstanceTypeError(kwargs["instance_type"])
security_groups = [
self.get_security_group_by_name_or_id(name) for name in security_group_names
self.get_security_group_by_name_or_id(name) for name in security_group_names # type: ignore[attr-defined]
]
for sg_id in kwargs.pop("security_group_ids", []):
if isinstance(sg_id, str):
sg = self.get_security_group_from_id(sg_id)
sg = self.get_security_group_from_id(sg_id) # type: ignore[attr-defined]
if sg is None:
raise InvalidSecurityGroupNotFoundError(sg_id)
security_groups.append(sg)
else:
security_groups.append(sg_id)
new_reservation = Reservation()
new_reservation.id = random_reservation_id()
new_reservation = Reservation(reservation_id=random_reservation_id())
self.reservations[new_reservation.id] = new_reservation
@ -691,7 +718,7 @@ class InstanceBackend:
kms_key_id = block_device["Ebs"].get("KmsKeyId")
if block_device.get("NoDevice") != "":
new_instance.add_block_device(
new_instance.add_block_device( # type: ignore[attr-defined]
volume_size,
device_name,
snapshot_id,
@ -704,13 +731,13 @@ class InstanceBackend:
new_instance.lifecycle = "spot"
# Tag all created volumes.
for _, device in new_instance.get_block_device_mapping:
volumes = self.describe_volumes(volume_ids=[device.volume_id])
volumes = self.describe_volumes(volume_ids=[device.volume_id]) # type: ignore
for volume in volumes:
volume.add_tags(volume_tags)
return new_reservation
def run_instances(self):
def run_instances(self) -> None:
"""
The Placement-parameter is validated to verify the availability-zone exists for the current region.
@ -727,7 +754,9 @@ class InstanceBackend:
# Fake method here to make implementation coverage script aware that this method is implemented
pass
def start_instances(self, instance_ids):
def start_instances(
self, instance_ids: List[str]
) -> List[Tuple[Instance, InstanceState]]:
started_instances = []
for instance in self.get_multi_instances_by_id(instance_ids):
previous_state = instance.start()
@ -735,7 +764,9 @@ class InstanceBackend:
return started_instances
def stop_instances(self, instance_ids):
def stop_instances(
self, instance_ids: List[str]
) -> List[Tuple[Instance, InstanceState]]:
stopped_instances = []
for instance in self.get_multi_instances_by_id(instance_ids):
previous_state = instance.stop()
@ -743,7 +774,9 @@ class InstanceBackend:
return stopped_instances
def terminate_instances(self, instance_ids: List[str]) -> List[Tuple[str, str]]:
def terminate_instances(
self, instance_ids: List[str]
) -> List[Tuple[Instance, InstanceState]]:
terminated_instances = []
if not instance_ids:
raise EC2ClientError(
@ -757,7 +790,7 @@ class InstanceBackend:
return terminated_instances
def reboot_instances(self, instance_ids):
def reboot_instances(self, instance_ids: List[str]) -> List[Instance]:
rebooted_instances = []
for instance in self.get_multi_instances_by_id(instance_ids):
instance.reboot()
@ -765,20 +798,26 @@ class InstanceBackend:
return rebooted_instances
def modify_instance_attribute(self, instance_id, key, value):
def modify_instance_attribute(
self, instance_id: str, key: str, value: Any
) -> Instance:
instance = self.get_instance(instance_id)
setattr(instance, key, value)
return instance
def modify_instance_security_groups(self, instance_id, new_group_id_list):
def modify_instance_security_groups(
self, instance_id: str, new_group_id_list: List[str]
) -> Instance:
instance = self.get_instance(instance_id)
new_group_list = []
for new_group_id in new_group_id_list:
new_group_list.append(self.get_security_group_from_id(new_group_id))
new_group_list.append(self.get_security_group_from_id(new_group_id)) # type: ignore[attr-defined]
setattr(instance, "security_groups", new_group_list)
return instance
def describe_instance_attribute(self, instance_id, attribute):
def describe_instance_attribute(
self, instance_id: str, attribute: str
) -> Tuple[Instance, Any]:
if attribute not in Instance.VALID_ATTRIBUTES:
raise InvalidParameterValueErrorUnknownAttribute(attribute)
@ -790,13 +829,15 @@ class InstanceBackend:
value = getattr(instance, key)
return instance, value
def describe_instance_credit_specifications(self, instance_ids):
def describe_instance_credit_specifications(
self, instance_ids: List[str]
) -> List[Instance]:
queried_instances = []
for instance in self.get_multi_instances_by_id(instance_ids):
queried_instances.append(instance)
return queried_instances
def all_instances(self, filters=None):
def all_instances(self, filters: Any = None) -> List[Instance]:
instances = []
for reservation in self.all_reservations():
for instance in reservation.instances:
@ -804,7 +845,7 @@ class InstanceBackend:
instances.append(instance)
return instances
def all_running_instances(self, filters=None):
def all_running_instances(self, filters: Any = None) -> List[Instance]:
instances = []
for reservation in self.all_reservations():
for instance in reservation.instances:
@ -812,7 +853,9 @@ class InstanceBackend:
instances.append(instance)
return instances
def get_multi_instances_by_id(self, instance_ids, filters=None):
def get_multi_instances_by_id(
self, instance_ids: List[str], filters: Any = None
) -> List[Instance]:
"""
:param instance_ids: A string list with instance ids
:return: A list with instance objects
@ -832,13 +875,16 @@ class InstanceBackend:
return result
def get_instance_by_id(self, instance_id):
def get_instance_by_id(self, instance_id: str) -> Optional[Instance]:
for reservation in self.all_reservations():
for instance in reservation.instances:
if instance.id == instance_id:
return instance
return None
def get_reservations_by_instance_ids(self, instance_ids, filters=None):
def get_reservations_by_instance_ids(
self, instance_ids: List[str], filters: Any = None
) -> List[Reservation]:
"""Go through all of the reservations and filter to only return those
associated with the given instance_ids.
"""
@ -869,10 +915,12 @@ class InstanceBackend:
reservations = filter_reservations(reservations, filters)
return reservations
def describe_instances(self, filters=None):
def describe_instances(self, filters: Any = None) -> List[Reservation]:
return self.all_reservations(filters)
def describe_instance_status(self, instance_ids, include_all_instances, filters):
def describe_instance_status(
self, instance_ids: List[str], include_all_instances: bool, filters: Any
) -> List[Instance]:
if instance_ids:
return self.get_multi_instances_by_id(instance_ids, filters)
elif include_all_instances:
@ -880,7 +928,7 @@ class InstanceBackend:
else:
return self.all_running_instances(filters)
def all_reservations(self, filters=None):
def all_reservations(self, filters: Any = None) -> List[Reservation]:
reservations = [
copy.copy(reservation) for reservation in self.reservations.copy().values()
]
@ -888,13 +936,15 @@ class InstanceBackend:
reservations = filter_reservations(reservations, filters)
return reservations
def _get_template_from_args(self, launch_template_arg):
def _get_template_from_args(
self, launch_template_arg: Dict[str, Any]
) -> LaunchTemplateVersion:
template = (
self.describe_launch_templates(
self.describe_launch_templates( # type: ignore[attr-defined]
template_ids=[launch_template_arg["LaunchTemplateId"]]
)[0]
if "LaunchTemplateId" in launch_template_arg
else self.describe_launch_templates(
else self.describe_launch_templates( # type: ignore[attr-defined]
template_names=[launch_template_arg["LaunchTemplateName"]]
)[0]
)

View File

@ -1,3 +1,4 @@
from typing import Any, Dict, List, Optional
from moto.core import CloudFormationModel
from .core import TaggedEC2Resource
@ -18,7 +19,9 @@ from ..utils import (
class EgressOnlyInternetGateway(TaggedEC2Resource):
def __init__(self, ec2_backend, vpc_id, tags=None):
def __init__(
self, ec2_backend: Any, vpc_id: str, tags: Optional[Dict[str, str]] = None
):
self.id = random_egress_only_internet_gateway_id()
self.ec2_backend = ec2_backend
self.vpc_id = vpc_id
@ -26,27 +29,31 @@ class EgressOnlyInternetGateway(TaggedEC2Resource):
self.add_tags(tags or {})
@property
def physical_resource_id(self):
def physical_resource_id(self) -> str:
return self.id
class EgressOnlyInternetGatewayBackend:
def __init__(self):
self.egress_only_internet_gateway_backend = {}
def __init__(self) -> None:
self.egress_only_internet_gateways: Dict[str, EgressOnlyInternetGateway] = {}
def create_egress_only_internet_gateway(self, vpc_id, tags=None):
vpc = self.get_vpc(vpc_id)
def create_egress_only_internet_gateway(
self, vpc_id: str, tags: Optional[Dict[str, str]] = None
) -> EgressOnlyInternetGateway:
vpc = self.get_vpc(vpc_id) # type: ignore[attr-defined]
if not vpc:
raise InvalidVPCIdError(vpc_id)
egress_only_igw = EgressOnlyInternetGateway(self, vpc_id, tags)
self.egress_only_internet_gateway_backend[egress_only_igw.id] = egress_only_igw
self.egress_only_internet_gateways[egress_only_igw.id] = egress_only_igw
return egress_only_igw
def describe_egress_only_internet_gateways(self, ids=None):
def describe_egress_only_internet_gateways(
self, ids: Optional[List[str]] = None
) -> List[EgressOnlyInternetGateway]:
"""
The Filters-argument is not yet supported
"""
egress_only_igws = list(self.egress_only_internet_gateway_backend.values())
egress_only_igws = list(self.egress_only_internet_gateways.values())
if ids:
egress_only_igws = [
@ -56,56 +63,59 @@ class EgressOnlyInternetGatewayBackend:
]
return egress_only_igws
def delete_egress_only_internet_gateway(self, gateway_id):
egress_only_igw = self.egress_only_internet_gateway_backend.get(gateway_id)
def delete_egress_only_internet_gateway(self, gateway_id: str) -> None:
egress_only_igw = self.egress_only_internet_gateways.get(gateway_id)
if not egress_only_igw:
raise InvalidGatewayIDError(gateway_id)
if egress_only_igw:
self.egress_only_internet_gateway_backend.pop(gateway_id)
self.egress_only_internet_gateways.pop(gateway_id)
def get_egress_only_igw(self, gateway_id):
egress_only_igw = self.egress_only_internet_gateway_backend.get(
gateway_id, None
)
if not egress_only_igw:
def get_egress_only_igw(self, gateway_id: str) -> EgressOnlyInternetGateway:
igw = self.egress_only_internet_gateways.get(gateway_id)
if not igw:
raise InvalidGatewayIDError(gateway_id)
return egress_only_igw
return igw
class InternetGateway(TaggedEC2Resource, CloudFormationModel):
def __init__(self, ec2_backend):
def __init__(self, ec2_backend: Any):
self.ec2_backend = ec2_backend
self.id = random_internet_gateway_id()
self.vpc = None
@property
def owner_id(self):
def owner_id(self) -> str:
return self.ec2_backend.account_id
@staticmethod
def cloudformation_name_type():
return None
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type():
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-internetgateway.html
return "AWS::EC2::InternetGateway"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "InternetGateway":
from ..models import ec2_backends
ec2_backend = ec2_backends[account_id][region_name]
return ec2_backend.create_internet_gateway()
@property
def physical_resource_id(self):
def physical_resource_id(self) -> str:
return self.id
@property
def attachment_state(self):
def attachment_state(self) -> str:
if self.vpc:
return "available"
else:
@ -113,20 +123,24 @@ class InternetGateway(TaggedEC2Resource, CloudFormationModel):
class InternetGatewayBackend:
def __init__(self):
self.internet_gateways = {}
def __init__(self) -> None:
self.internet_gateways: Dict[str, InternetGateway] = {}
def create_internet_gateway(self, tags=None):
def create_internet_gateway(
self, tags: Optional[List[Dict[str, str]]] = None
) -> InternetGateway:
igw = InternetGateway(self)
for tag in tags or []:
igw.add_tag(tag.get("Key"), tag.get("Value"))
igw.add_tag(tag["Key"], tag["Value"])
self.internet_gateways[igw.id] = igw
return igw
def describe_internet_gateways(self, internet_gateway_ids=None, filters=None):
def describe_internet_gateways(
self, internet_gateway_ids: Optional[List[str]] = None, filters: Any = None
) -> List[InternetGateway]:
igws = []
if internet_gateway_ids is None:
igws = self.internet_gateways.values()
igws = list(self.internet_gateways.values())
else:
for igw_id in internet_gateway_ids:
if igw_id in self.internet_gateways:
@ -137,30 +151,30 @@ class InternetGatewayBackend:
igws = filter_internet_gateways(igws, filters)
return igws
def delete_internet_gateway(self, internet_gateway_id):
def delete_internet_gateway(self, internet_gateway_id: str) -> None:
igw = self.get_internet_gateway(internet_gateway_id)
if igw.vpc:
raise DependencyViolationError(
f"{internet_gateway_id} is being utilized by {igw.vpc.id}"
)
self.internet_gateways.pop(internet_gateway_id)
return True
def detach_internet_gateway(self, internet_gateway_id, vpc_id):
def detach_internet_gateway(self, internet_gateway_id: str, vpc_id: str) -> None:
igw = self.get_internet_gateway(internet_gateway_id)
if not igw.vpc or igw.vpc.id != vpc_id:
raise GatewayNotAttachedError(internet_gateway_id, vpc_id)
igw.vpc = None
return True
def attach_internet_gateway(self, internet_gateway_id, vpc_id):
def attach_internet_gateway(
self, internet_gateway_id: str, vpc_id: str
) -> VPCGatewayAttachment:
igw = self.get_internet_gateway(internet_gateway_id)
if igw.vpc:
raise ResourceAlreadyAssociatedError(internet_gateway_id)
vpc = self.get_vpc(vpc_id)
vpc = self.get_vpc(vpc_id) # type: ignore[attr-defined]
igw.vpc = vpc
return VPCGatewayAttachment(gateway_id=internet_gateway_id, vpc_id=vpc_id)
def get_internet_gateway(self, internet_gateway_id):
def get_internet_gateway(self, internet_gateway_id: str) -> InternetGateway:
igw_ids = [internet_gateway_id]
return self.describe_internet_gateways(internet_gateway_ids=igw_ids)[0]

View File

@ -198,7 +198,7 @@ class Subnet(TaggedEC2Resource, CloudFormationModel):
self._subnet_ips[ip] = instance
return ip
def del_subnet_ip(self, ip):
def del_subnet_ip(self, ip: str) -> None:
try:
del self._subnet_ips[ip]
self._unused_ips.add(ip)

View File

@ -1,3 +1,4 @@
from typing import Optional
from moto.core import CloudFormationModel
from .core import TaggedEC2Resource
from ..exceptions import InvalidVpnGatewayIdError, InvalidVpnGatewayAttachmentError
@ -6,7 +7,9 @@ from ..utils import generic_filter, random_vpn_gateway_id
class VPCGatewayAttachment(CloudFormationModel):
# Represents both VPNGatewayAttachment and VPCGatewayAttachment
def __init__(self, vpc_id, gateway_id=None, state=None):
def __init__(
self, vpc_id: str, gateway_id: Optional[str] = None, state: Optional[str] = None
):
self.vpc_id = vpc_id
self.gateway_id = gateway_id
self.state = state

View File

@ -7,7 +7,7 @@ from datetime import datetime
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from typing import Any, Dict, List, TypeVar
from typing import Any, Dict, List, TypeVar, Optional
from moto.iam import iam_backends
from moto.moto_api._internal import mock_random as random
@ -80,7 +80,7 @@ def random_instance_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["instance"], size=17)
def random_reservation_id():
def random_reservation_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["reservation"])
@ -174,11 +174,11 @@ def random_eip_association_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["vpc-elastic-ip-association"])
def random_internet_gateway_id():
def random_internet_gateway_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["internet-gateway"])
def random_egress_only_internet_gateway_id():
def random_egress_only_internet_gateway_id() -> str:
return random_id(
prefix=EC2_RESOURCE_TO_PREFIX["egress-only-internet-gateway"], size=17
)
@ -228,7 +228,7 @@ def random_launch_template_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["launch-template"], size=17)
def random_iam_instance_profile_association_id():
def random_iam_instance_profile_association_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["iam-instance-profile-association"])
@ -448,7 +448,12 @@ def instance_value_in_filter_values(instance_value, filter_values):
return True
def filter_reservations(reservations, filter_dict):
FILTER_TYPE = TypeVar("FILTER_TYPE")
def filter_reservations(
reservations: List[FILTER_TYPE], filter_dict: Any
) -> List[FILTER_TYPE]:
result = []
for reservation in reservations:
new_instances = []
@ -485,7 +490,9 @@ def passes_igw_filter_dict(igw, filter_dict):
return True
def filter_internet_gateways(igws, filter_dict):
def filter_internet_gateways(
igws: List[FILTER_TYPE], filter_dict: Any
) -> List[FILTER_TYPE]:
result = []
for igw in igws:
if passes_igw_filter_dict(igw, filter_dict):
@ -519,12 +526,9 @@ def is_filter_matching(obj, _filter, filter_value):
return value in filter_value
GENERIC_FILTER_TYPE = TypeVar("GENERIC_FILTER_TYPE")
def generic_filter(
filters: Dict[str, Any], objects: List[GENERIC_FILTER_TYPE]
) -> List[GENERIC_FILTER_TYPE]:
filters: Dict[str, Any], objects: List[FILTER_TYPE]
) -> List[FILTER_TYPE]:
if filters:
for (_filter, _filter_value) in filters.items():
objects = [
@ -665,7 +669,9 @@ def rsa_public_key_fingerprint(rsa_public_key):
return fingerprint
def filter_iam_instance_profile_associations(iam_instance_associations, filter_dict):
def filter_iam_instance_profile_associations(
iam_instance_associations: List[FILTER_TYPE], filter_dict: Any
) -> List[FILTER_TYPE]:
if not filter_dict:
return iam_instance_associations
result = []
@ -686,8 +692,10 @@ def filter_iam_instance_profile_associations(iam_instance_associations, filter_d
def filter_iam_instance_profiles(
account_id, iam_instance_profile_arn, iam_instance_profile_name
):
account_id: str,
iam_instance_profile_arn: Optional[str],
iam_instance_profile_name: Optional[str],
) -> Any:
instance_profile = None
instance_profile_by_name = None
instance_profile_by_arn = None

View File

@ -24,43 +24,12 @@
"""
Represents an EC2 Instance
"""
from typing import Any
from moto.packages.boto.ec2.ec2object import EC2Object, TaggedEC2Object
from moto.packages.boto.ec2.image import ProductCodes
class InstanceState(object):
"""
The state of the instance.
:ivar code: The low byte represents the state. The high byte is an
opaque internal value and should be ignored. Valid values:
* 0 (pending)
* 16 (running)
* 32 (shutting-down)
* 48 (terminated)
* 64 (stopping)
* 80 (stopped)
:ivar name: The name of the state of the instance. Valid values:
* "pending"
* "running"
* "shutting-down"
* "terminated"
* "stopping"
* "stopped"
"""
def __init__(self, code=0, name=None):
self.code = code
self.name = name
def __repr__(self):
return "%s(%d)" % (self.name, self.code)
class InstancePlacement(object):
class InstancePlacement:
"""
The location where the instance launched.
@ -93,14 +62,14 @@ class Reservation(EC2Object):
Reservation.
"""
def __init__(self, connection=None):
super(Reservation, self).__init__(connection)
self.id = None
def __init__(self, reservation_id) -> None:
super().__init__(connection=None)
self.id = reservation_id
self.owner_id = None
self.groups = []
self.instances = []
def __repr__(self):
def __repr__(self) -> str:
return "Reservation:%s" % self.id
@ -159,16 +128,12 @@ class Instance(TaggedEC2Object):
profile id and arn associated with this instance.
"""
def __init__(self, connection=None):
super(Instance, self).__init__(connection)
self.id = None
def __init__(self, connection: Any = None):
super().__init__(connection)
self.dns_name = None
self.public_dns_name = None
self.private_dns_name = None
self.key_name = None
self.instance_type = None
self.launch_time = None
self.image_id = None
self.kernel = None
self.ramdisk = None
self.product_codes = ProductCodes()
@ -177,7 +142,6 @@ class Instance(TaggedEC2Object):
self.monitoring_state = None
self.spot_instance_request_id = None
self.subnet_id = None
self.lifecycle = None
self.private_ip_address = None
self.ip_address = None
self.requester_id = None
@ -185,7 +149,6 @@ class Instance(TaggedEC2Object):
self.persistent = False
self.root_device_name = None
self.root_device_type = None
self.block_device_mapping = None
self.state_reason = None
self.group_name = None
self.client_token = None
@ -198,20 +161,19 @@ class Instance(TaggedEC2Object):
self.architecture = None
self.instance_profile = None
self._previous_state = None
self._state = InstanceState()
self._placement = InstancePlacement()
def __repr__(self):
def __repr__(self) -> str:
return "Instance:%s" % self.id
@property
def state(self):
def state(self) -> str:
return self._state.name
@property
def state_code(self):
def state_code(self) -> str:
return self._state.code
@property
def placement(self):
def placement(self) -> str:
return self._placement.zone

View File

@ -229,7 +229,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/ec2/models/d*,moto/ec2/models/e*,moto/ec2/models/f*,moto/ec2/models/h*,moto/moto_api
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/ec2/models/d*,moto/ec2/models/e*,moto/ec2/models/f*,moto/ec2/models/h*,moto/ec2/models/i*,moto/moto_api
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract

View File

@ -6,6 +6,7 @@ from botocore.exceptions import ClientError
from moto import mock_autoscaling, mock_ec2
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from tests import EXAMPLE_AMI_ID
from uuid import uuid4
from .utils import setup_networking, setup_instance_with_networking
@ -57,7 +58,7 @@ def test_create_autoscaling_group_from_instance():
client = boto3.client("autoscaling", region_name="us-east-1")
response = client.create_auto_scaling_group(
AutoScalingGroupName=autoscaling_group_name,
InstanceId=mocked_instance_with_networking["instance"],
InstanceId=mocked_instance_with_networking["instances"][0].id,
MinSize=1,
MaxSize=3,
DesiredCapacity=2,
@ -80,6 +81,37 @@ def test_create_autoscaling_group_from_instance():
launch_configuration_from_instance["InstanceType"].should.equal(instance_type)
@mock_autoscaling
@mock_ec2
def test_create_autoscaling_group_from_instance_with_security_groups():
autoscaling_group_name = "test_asg"
image_id = EXAMPLE_AMI_ID
instance_type = "t2.micro"
mocked_instance_with_networking = setup_instance_with_networking(
image_id, instance_type
)
instance = mocked_instance_with_networking["instances"][0]
# create sg
ec2 = boto3.resource("ec2", region_name="us-east-1")
sg_id = ec2.create_security_group(GroupName=str(uuid4()), Description="d").id
instance.modify_attribute(Groups=[sg_id])
client = boto3.client("autoscaling", region_name="us-east-1")
response = client.create_auto_scaling_group(
AutoScalingGroupName=autoscaling_group_name,
InstanceId=instance.id,
MinSize=1,
MaxSize=3,
DesiredCapacity=2,
VPCZoneIdentifier=mocked_instance_with_networking["subnet1"],
NewInstancesProtectedFromScaleIn=False,
)
# Just verifying this works - used to throw an error when supplying a instance that belonged to an SG
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
@mock_autoscaling
def test_create_autoscaling_group_from_invalid_instance_id():
invalid_instance_id = "invalid_instance"

View File

@ -26,5 +26,5 @@ def setup_instance_with_networking(image_id, instance_type):
MinCount=1,
SubnetId=mock_data["subnet1"],
)
mock_data["instance"] = instances[0].id
mock_data["instances"] = instances
return mock_data