diff --git a/moto/ebs/models.py b/moto/ebs/models.py index e8ac96437..a8f8f200b 100644 --- a/moto/ebs/models.py +++ b/moto/ebs/models.py @@ -71,7 +71,7 @@ class EBSBackend(BaseBackend): return ec2_backends[self.account_id][self.region_name] def start_snapshot( - self, volume_size: str, tags: Optional[List[Dict[str, str]]], description: str + self, volume_size: int, tags: Optional[List[Dict[str, str]]], description: str ) -> EBSSnapshot: zone_name = f"{self.region_name}a" vol = self.ec2_backend.create_volume(size=volume_size, zone_name=zone_name) diff --git a/moto/ec2/exceptions.py b/moto/ec2/exceptions.py index 1763b5a0e..2d55e9830 100644 --- a/moto/ec2/exceptions.py +++ b/moto/ec2/exceptions.py @@ -1,5 +1,5 @@ from moto.core.exceptions import RESTError -from typing import List, Optional, Union +from typing import Any, List, Optional, Union, Iterable # EC2 has a custom root-tag - vs @@ -51,7 +51,7 @@ class MissingParameterError(EC2ClientError): class InvalidDHCPOptionsIdError(EC2ClientError): - def __init__(self, dhcp_options_id): + def __init__(self, dhcp_options_id: str): super().__init__( "InvalidDhcpOptionID.NotFound", f"DhcpOptionID {dhcp_options_id} does not exist.", @@ -69,7 +69,7 @@ class InvalidParameterCombination(EC2ClientError): class MalformedDHCPOptionsIdError(EC2ClientError): - def __init__(self, dhcp_options_id): + def __init__(self, dhcp_options_id: Optional[str]): super().__init__( "InvalidDhcpOptionsId.Malformed", f'Invalid id: "{dhcp_options_id}" (expecting "dopt-...")', @@ -166,7 +166,7 @@ class InvalidCustomerGatewayIdError(EC2ClientError): class InvalidNetworkInterfaceIdError(EC2ClientError): - def __init__(self, eni_id): + def __init__(self, eni_id: str): super().__init__( "InvalidNetworkInterfaceID.NotFound", f"The network interface ID '{eni_id}' does not exist", @@ -174,7 +174,7 @@ class InvalidNetworkInterfaceIdError(EC2ClientError): class InvalidNetworkAttachmentIdError(EC2ClientError): - def __init__(self, attachment_id): + def __init__(self, attachment_id: str): super().__init__( "InvalidAttachmentID.NotFound", f"The network interface attachment ID '{attachment_id}' does not exist", @@ -274,7 +274,7 @@ class UnvailableAMIIdError(EC2ClientError): class InvalidAMIAttributeItemValueError(EC2ClientError): - def __init__(self, attribute: str, value: str): + def __init__(self, attribute: str, value: Union[str, Iterable[str], None]): super().__init__( "InvalidAMIAttributeItemValue", f'Invalid attribute item value "{value}" for {attribute} item type.', @@ -289,13 +289,13 @@ class MalformedAMIIdError(EC2ClientError): class InvalidSnapshotIdError(EC2ClientError): - def __init__(self): + def __init__(self) -> None: # Note: AWS returns empty message for this, as of 2014.08.22. super().__init__("InvalidSnapshot.NotFound", "") class InvalidSnapshotInUse(EC2ClientError): - def __init__(self, snapshot_id, ami_id): + def __init__(self, snapshot_id: str, ami_id: str): super().__init__( "InvalidSnapshot.InUse", f"The snapshot {snapshot_id} is currently in use by {ami_id}", @@ -303,14 +303,14 @@ class InvalidSnapshotInUse(EC2ClientError): class InvalidVolumeIdError(EC2ClientError): - def __init__(self, volume_id): + def __init__(self, volume_id: Any): super().__init__( "InvalidVolume.NotFound", f"The volume '{volume_id}' does not exist." ) class InvalidVolumeAttachmentError(EC2ClientError): - def __init__(self, volume_id, instance_id): + def __init__(self, volume_id: str, instance_id: str): super().__init__( "InvalidAttachment.NotFound", f"Volume {volume_id} can not be detached from {instance_id} because it is not attached", @@ -318,7 +318,7 @@ class InvalidVolumeAttachmentError(EC2ClientError): class InvalidVolumeDetachmentError(EC2ClientError): - def __init__(self, volume_id, instance_id, device): + def __init__(self, volume_id: str, instance_id: str, device: str): super().__init__( "InvalidAttachment.NotFound", f"The volume {volume_id} is not attached to instance {instance_id} as device {device}", @@ -326,7 +326,7 @@ class InvalidVolumeDetachmentError(EC2ClientError): class VolumeInUseError(EC2ClientError): - def __init__(self, volume_id, instance_id): + def __init__(self, volume_id: str, instance_id: str): super().__init__( "VolumeInUse", f"Volume {volume_id} is currently attached to {instance_id}", @@ -334,7 +334,7 @@ class VolumeInUseError(EC2ClientError): class InvalidAddressError(EC2ClientError): - def __init__(self, ip): + def __init__(self, ip: Any): super().__init__("InvalidAddress.NotFound", f"Address '{ip}' not found.") @@ -347,7 +347,7 @@ class LogDestinationNotFoundError(EC2ClientError): class InvalidAllocationIdError(EC2ClientError): - def __init__(self, allocation_id): + def __init__(self, allocation_id: Any): super().__init__( "InvalidAllocationID.NotFound", f"Allocation ID '{allocation_id}' not found.", @@ -355,7 +355,7 @@ class InvalidAllocationIdError(EC2ClientError): class InvalidAssociationIdError(EC2ClientError): - def __init__(self, association_id): + def __init__(self, association_id: Any): super().__init__( "InvalidAssociationID.NotFound", f"Association ID '{association_id}' not found.", @@ -426,7 +426,7 @@ class InvalidAggregationIntervalParameterError(EC2ClientError): class InvalidParameterValueError(EC2ClientError): - def __init__(self, parameter_value): + def __init__(self, parameter_value: str): super().__init__( "InvalidParameterValue", f"Value {parameter_value} is invalid for parameter.", @@ -480,7 +480,7 @@ class GatewayNotAttachedError(EC2ClientError): class ResourceAlreadyAssociatedError(EC2ClientError): - def __init__(self, resource_id): + def __init__(self, resource_id: str): super().__init__( "Resource.AlreadyAssociated", f"Resource {resource_id} is already associated.", @@ -662,7 +662,7 @@ class InvalidLaunchTemplateNameNotFoundWithNameError(EC2ClientError): class InvalidParameterDependency(EC2ClientError): - def __init__(self, param, param_needed): + def __init__(self, param: str, param_needed: str): super().__init__( "InvalidParameterDependency", f"The parameter [{param}] requires the parameter {param_needed} to be set.", diff --git a/moto/ec2/models/__init__.py b/moto/ec2/models/__init__.py index 29a39a74f..f003d5fc1 100644 --- a/moto/ec2/models/__init__.py +++ b/moto/ec2/models/__init__.py @@ -186,7 +186,7 @@ class EC2Backend( def raise_error(self, code, message): raise EC2ClientError(code, message) - def raise_not_implemented_error(self, blurb): + def raise_not_implemented_error(self, blurb: str): raise MotoNotImplementedError(blurb) def do_resources_exist(self, resource_ids): diff --git a/moto/ec2/models/dhcp_options.py b/moto/ec2/models/dhcp_options.py index 923e09d29..8f8e2ebe7 100644 --- a/moto/ec2/models/dhcp_options.py +++ b/moto/ec2/models/dhcp_options.py @@ -1,4 +1,5 @@ import itertools +from typing import Any, Dict, List, Optional from ..exceptions import ( DependencyViolationError, InvalidDHCPOptionsIdError, @@ -12,12 +13,12 @@ from ..utils import random_dhcp_option_id, generic_filter class DHCPOptionsSet(TaggedEC2Resource): def __init__( self, - ec2_backend, - domain_name_servers=None, - domain_name=None, - ntp_servers=None, - netbios_name_servers=None, - netbios_node_type=None, + ec2_backend: Any, + domain_name_servers: Optional[List[str]] = None, + domain_name: Optional[str] = None, + ntp_servers: Optional[List[str]] = None, + netbios_name_servers: Optional[List[str]] = None, + netbios_node_type: Optional[str] = None, ): self.ec2_backend = ec2_backend self._options = { @@ -30,7 +31,9 @@ class DHCPOptionsSet(TaggedEC2Resource): self.id = random_dhcp_option_id() self.vpc = None - def get_filter_value(self, filter_name): + def get_filter_value( + self, filter_name: str, method_name: Optional[str] = None + ) -> Any: """ API Version 2015-10-01 defines the following filters for DescribeDhcpOptions: @@ -54,26 +57,26 @@ class DHCPOptionsSet(TaggedEC2Resource): return super().get_filter_value(filter_name, "DescribeDhcpOptions") @property - def options(self): + def options(self) -> Dict[str, Any]: # type: ignore[misc] return self._options class DHCPOptionsSetBackend: - def __init__(self): - self.dhcp_options_sets = {} + def __init__(self) -> None: + self.dhcp_options_sets: Dict[str, DHCPOptionsSet] = {} - def associate_dhcp_options(self, dhcp_options, vpc): + def associate_dhcp_options(self, dhcp_options: DHCPOptionsSet, vpc: Any) -> None: dhcp_options.vpc = vpc vpc.dhcp_options = dhcp_options def create_dhcp_options( self, - domain_name_servers=None, - domain_name=None, - ntp_servers=None, - netbios_name_servers=None, - netbios_node_type=None, - ): + domain_name_servers: Optional[List[str]] = None, + domain_name: Optional[str] = None, + ntp_servers: Optional[List[str]] = None, + netbios_name_servers: Optional[List[str]] = None, + netbios_node_type: Optional[str] = None, + ) -> DHCPOptionsSet: NETBIOS_NODE_TYPES = [1, 2, 4, 8] @@ -95,7 +98,7 @@ class DHCPOptionsSetBackend: self.dhcp_options_sets[options.id] = options return options - def delete_dhcp_options_set(self, options_id): + def delete_dhcp_options_set(self, options_id: Optional[str]) -> None: if not (options_id and options_id.startswith("dopt-")): raise MalformedDHCPOptionsIdError(options_id) @@ -105,10 +108,11 @@ class DHCPOptionsSetBackend: self.dhcp_options_sets.pop(options_id) else: raise InvalidDHCPOptionsIdError(options_id) - return True - def describe_dhcp_options(self, dhcp_options_ids=None, filters=None): - dhcp_options_sets = self.dhcp_options_sets.copy().values() + def describe_dhcp_options( + self, dhcp_options_ids: Optional[List[str]] = None, filters: Any = None + ) -> List[DHCPOptionsSet]: + dhcp_options_sets = list(self.dhcp_options_sets.copy().values()) if dhcp_options_ids: dhcp_options_sets = [ diff --git a/moto/ec2/models/elastic_block_store.py b/moto/ec2/models/elastic_block_store.py index c2cff2478..020330d1b 100644 --- a/moto/ec2/models/elastic_block_store.py +++ b/moto/ec2/models/elastic_block_store.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Any, Dict, List, Optional, Set, Iterable from moto.core import CloudFormationModel from moto.packages.boto.ec2.blockdevicemapping import BlockDeviceType @@ -26,8 +26,13 @@ IOPS_SUPPORTED_VOLUME_TYPES = ["gp3", "io1", "io2"] GP3_DEFAULT_IOPS = 3000 -class VolumeModification(object): - def __init__(self, volume, target_size=None, target_volume_type=None): +class VolumeModification: + def __init__( + self, + volume: "Volume", + target_size: Optional[int] = None, + target_volume_type: Optional[str] = None, + ): if not any([target_size, target_volume_type]): raise InvalidParameterValueError( "Invalid input: Must specify at least one of size or type" @@ -42,7 +47,7 @@ class VolumeModification(object): self.start_time = utc_date_and_time() self.end_time = utc_date_and_time() - def get_filter_value(self, filter_name): + def get_filter_value(self, filter_name: str) -> Any: if filter_name == "original-size": return self.original_size elif filter_name == "original-volume-type": @@ -56,7 +61,7 @@ class VolumeModification(object): class VolumeAttachment(CloudFormationModel): - def __init__(self, volume, instance, device, status): + def __init__(self, volume: "Volume", instance: Any, device: str, status: str): self.volume = volume self.attach_time = utc_date_and_time() self.instance = instance @@ -64,18 +69,23 @@ class VolumeAttachment(CloudFormationModel): self.status = status @staticmethod - def cloudformation_name_type(): - return None + def cloudformation_name_type() -> str: + return "" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-volumeattachment.html return "AWS::EC2::VolumeAttachment" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any + ) -> "VolumeAttachment": from ..models import ec2_backends properties = cloudformation_json["Properties"] @@ -95,30 +105,34 @@ class VolumeAttachment(CloudFormationModel): class Volume(TaggedEC2Resource, CloudFormationModel): def __init__( self, - ec2_backend, - volume_id, - size, - zone, - snapshot_id=None, - encrypted=False, - kms_key_id=None, - volume_type=None, - iops=None, + ec2_backend: Any, + volume_id: str, + size: int, + zone: Any, + snapshot_id: Optional[str] = None, + encrypted: bool = False, + kms_key_id: Optional[str] = None, + volume_type: Optional[str] = None, + iops: Optional[int] = None, ): self.id = volume_id self.volume_type = volume_type or "gp2" self.size = size self.zone = zone self.create_time = utc_date_and_time() - self.attachment = None + self.attachment: Optional[VolumeAttachment] = None self.snapshot_id = snapshot_id self.ec2_backend = ec2_backend self.encrypted = encrypted self.kms_key_id = kms_key_id - self.modifications = [] + self.modifications: List[VolumeModification] = [] self.iops = iops - def modify(self, target_size=None, target_volume_type=None): + def modify( + self, + target_size: Optional[int] = None, + target_volume_type: Optional[str] = None, + ) -> None: modification = VolumeModification( volume=self, target_size=target_size, target_volume_type=target_volume_type ) @@ -128,18 +142,23 @@ class Volume(TaggedEC2Resource, CloudFormationModel): self.volume_type = modification.target_volume_type @staticmethod - def cloudformation_name_type(): - return None + def cloudformation_name_type() -> str: + return "" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-volume.html return "AWS::EC2::Volume" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any + ) -> "Volume": from ..models import ec2_backends properties = cloudformation_json["Properties"] @@ -151,27 +170,29 @@ class Volume(TaggedEC2Resource, CloudFormationModel): return volume @property - def physical_resource_id(self): + def physical_resource_id(self) -> str: return self.id @property - def status(self): + def status(self) -> str: if self.attachment: return "in-use" else: return "available" - def get_filter_value(self, filter_name): + def get_filter_value( + self, filter_name: str, method_name: Optional[str] = None + ) -> Any: if filter_name.startswith("attachment") and not self.attachment: return None elif filter_name == "attachment.attach-time": - return self.attachment.attach_time + return self.attachment.attach_time # type: ignore[union-attr] elif filter_name == "attachment.device": - return self.attachment.device + return self.attachment.device # type: ignore[union-attr] elif filter_name == "attachment.instance-id": - return self.attachment.instance.id + return self.attachment.instance.id # type: ignore[union-attr] elif filter_name == "attachment.status": - return self.attachment.status + return self.attachment.status # type: ignore[union-attr] elif filter_name == "create-time": return self.create_time elif filter_name == "size": @@ -193,27 +214,29 @@ class Volume(TaggedEC2Resource, CloudFormationModel): class Snapshot(TaggedEC2Resource): def __init__( self, - ec2_backend, - snapshot_id, - volume, - description, - encrypted=False, - owner_id=None, - from_ami=None, + ec2_backend: Any, + snapshot_id: str, + volume: Any, + description: str, + encrypted: bool = False, + owner_id: Optional[str] = None, + from_ami: Optional[str] = None, ): self.id = snapshot_id self.volume = volume self.description = description self.start_time = utc_date_and_time() - self.create_volume_permission_groups = set() - self.create_volume_permission_userids = set() + self.create_volume_permission_groups: Set[str] = set() + self.create_volume_permission_userids: Set[str] = set() self.ec2_backend = ec2_backend self.status = "completed" self.encrypted = encrypted self.owner_id = owner_id or ec2_backend.account_id self.from_ami = from_ami - def get_filter_value(self, filter_name): + def get_filter_value( + self, filter_name: str, method_name: Optional[str] = None + ) -> Any: if filter_name == "description": return self.description elif filter_name == "snapshot-id": @@ -235,20 +258,20 @@ class Snapshot(TaggedEC2Resource): class EBSBackend: - def __init__(self): - self.volumes = {} - self.attachments = {} - self.snapshots = {} + def __init__(self) -> None: + self.volumes: Dict[str, Volume] = {} + self.attachments: Dict[str, VolumeAttachment] = {} + self.snapshots: Dict[str, Snapshot] = {} def create_volume( self, - size: str, + size: int, zone_name: str, snapshot_id: Optional[str] = None, encrypted: bool = False, kms_key_id: Optional[str] = None, volume_type: Optional[str] = None, - iops: Optional[str] = None, + iops: Optional[int] = None, ) -> Volume: if kms_key_id and not encrypted: raise InvalidParameterDependency("KmsKeyId", "Encrypted") @@ -262,7 +285,7 @@ class EBSBackend: raise InvalidParameterDependency("VolumeType", "Iops") volume_id = random_volume_id() - zone = self.get_zone_by_name(zone_name) + zone = self.get_zone_by_name(zone_name) # type: ignore[attr-defined] if snapshot_id: snapshot = self.get_snapshot(snapshot_id) if size is None: @@ -283,23 +306,32 @@ class EBSBackend: self.volumes[volume_id] = volume return volume - def describe_volumes(self, volume_ids=None, filters=None): - matches = self.volumes.copy().values() + def describe_volumes( + self, volume_ids: Optional[List[str]] = None, filters: Any = None + ) -> List[Volume]: + matches = list(self.volumes.values()) if volume_ids: matches = [vol for vol in matches if vol.id in volume_ids] if len(volume_ids) > len(matches): - unknown_ids = set(volume_ids) - set(matches) + unknown_ids = set(volume_ids) - set(matches) # type: ignore[arg-type] raise InvalidVolumeIdError(unknown_ids) if filters: matches = generic_filter(filters, matches) return matches - def modify_volume(self, volume_id, target_size=None, target_volume_type=None): + def modify_volume( + self, + volume_id: str, + target_size: Optional[int] = None, + target_volume_type: Optional[str] = None, + ) -> Volume: volume = self.get_volume(volume_id) volume.modify(target_size=target_size, target_volume_type=target_volume_type) return volume - def describe_volumes_modifications(self, volume_ids=None, filters=None): + def describe_volumes_modifications( + self, volume_ids: Optional[List[str]] = None, filters: Any = None + ) -> List[VolumeModification]: volumes = self.describe_volumes(volume_ids) modifications = [] for volume in volumes: @@ -308,13 +340,13 @@ class EBSBackend: modifications = generic_filter(filters, modifications) return modifications - def get_volume(self, volume_id): + def get_volume(self, volume_id: str) -> Volume: volume = self.volumes.get(volume_id, None) if not volume: raise InvalidVolumeIdError(volume_id) return volume - def delete_volume(self, volume_id): + def delete_volume(self, volume_id: str) -> Volume: if volume_id in self.volumes: volume = self.volumes[volume_id] if volume.attachment: @@ -323,13 +355,17 @@ class EBSBackend: raise InvalidVolumeIdError(volume_id) def attach_volume( - self, volume_id, instance_id, device_path, delete_on_termination=False - ): + self, + volume_id: str, + instance_id: str, + device_path: str, + delete_on_termination: bool = False, + ) -> Optional[VolumeAttachment]: volume = self.get_volume(volume_id) - instance = self.get_instance(instance_id) + instance = self.get_instance(instance_id) # type: ignore[attr-defined] if not volume or not instance: - return False + return None volume.attachment = VolumeAttachment(volume, instance, device_path, "attached") # Modify instance to capture mount of block device. @@ -343,9 +379,11 @@ class EBSBackend: instance.block_device_mapping[device_path] = bdt return volume.attachment - def detach_volume(self, volume_id, instance_id, device_path): + def detach_volume( + self, volume_id: str, instance_id: str, device_path: str + ) -> VolumeAttachment: volume = self.get_volume(volume_id) - instance = self.get_instance(instance_id) + instance = self.get_instance(instance_id) # type: ignore[attr-defined] old_attachment = volume.attachment if not old_attachment: @@ -376,15 +414,17 @@ class EBSBackend: params.append(owner_id) if from_ami: params.append(from_ami) - snapshot = Snapshot(*params) + snapshot = Snapshot(*params) # type: ignore[arg-type] self.snapshots[snapshot_id] = snapshot return snapshot - def create_snapshots(self, instance_spec, description, tags): + def create_snapshots( + self, instance_spec: Dict[str, Any], description: str, tags: Dict[str, str] + ) -> List[Snapshot]: """ The CopyTagsFromSource-parameter is not yet implemented. """ - instance = self.get_instance(instance_spec["InstanceId"]) + instance = self.get_instance(instance_spec["InstanceId"]) # type: ignore[attr-defined] block_device_mappings = instance.block_device_mapping if str(instance_spec.get("ExcludeBootVolume", False)).lower() == "true": @@ -403,8 +443,10 @@ class EBSBackend: snapshot.add_tags(tags) return snapshots - def describe_snapshots(self, snapshot_ids=None, filters=None): - matches = self.snapshots.copy().values() + def describe_snapshots( + self, snapshot_ids: Optional[List[str]] = None, filters: Any = None + ) -> List[Snapshot]: + matches = list(self.snapshots.values()) if snapshot_ids: matches = [snap for snap in matches if snap.id in snapshot_ids] if len(snapshot_ids) > len(matches): @@ -413,12 +455,15 @@ class EBSBackend: matches = generic_filter(filters, matches) return matches - def copy_snapshot(self, source_snapshot_id, source_region, description=None): + def copy_snapshot( + self, source_snapshot_id: str, source_region: str, description: str + ) -> Snapshot: from ..models import ec2_backends - source_snapshot = ec2_backends[self.account_id][ - source_region - ].describe_snapshots(snapshot_ids=[source_snapshot_id])[0] + backend = ec2_backends[self.account_id][source_region] # type: ignore[attr-defined] + source_snapshot = backend.describe_snapshots(snapshot_ids=[source_snapshot_id])[ + 0 + ] snapshot_id = random_snapshot_id() snapshot = Snapshot( self, @@ -430,30 +475,32 @@ class EBSBackend: self.snapshots[snapshot_id] = snapshot return snapshot - def get_snapshot(self, snapshot_id): + def get_snapshot(self, snapshot_id: str) -> Snapshot: snapshot = self.snapshots.get(snapshot_id, None) if not snapshot: raise InvalidSnapshotIdError() return snapshot - def delete_snapshot(self, snapshot_id): - if snapshot_id in self.snapshots: + def delete_snapshot(self, snapshot_id: str) -> Snapshot: + if snapshot_id in self.snapshots: # type: ignore[attr-defined] snapshot = self.snapshots[snapshot_id] - if snapshot.from_ami and snapshot.from_ami in self.amis: + if snapshot.from_ami and snapshot.from_ami in self.amis: # type: ignore[attr-defined] raise InvalidSnapshotInUse(snapshot_id, snapshot.from_ami) return self.snapshots.pop(snapshot_id) raise InvalidSnapshotIdError() - def get_create_volume_permission_groups(self, snapshot_id): - snapshot = self.get_snapshot(snapshot_id) + def get_create_volume_permission_groups(self, snapshot_id: str) -> Set[str]: + snapshot = self.get_snapshot(snapshot_id) # type: ignore[attr-defined] return snapshot.create_volume_permission_groups - def get_create_volume_permission_userids(self, snapshot_id): - snapshot = self.get_snapshot(snapshot_id) + def get_create_volume_permission_userids(self, snapshot_id: str) -> Set[str]: + snapshot = self.get_snapshot(snapshot_id) # type: ignore[attr-defined] return snapshot.create_volume_permission_userids - def add_create_volume_permission(self, snapshot_id, user_ids=None, groups=None): - snapshot = self.get_snapshot(snapshot_id) + def add_create_volume_permission( + self, snapshot_id: str, user_ids: List[str], groups: List[str] + ) -> None: + snapshot = self.get_snapshot(snapshot_id) # type: ignore[attr-defined] if user_ids: snapshot.create_volume_permission_userids.update(user_ids) @@ -462,27 +509,28 @@ class EBSBackend: else: snapshot.create_volume_permission_groups.update(groups) - return True - - def remove_create_volume_permission(self, snapshot_id, user_ids=None, groups=None): - snapshot = self.get_snapshot(snapshot_id) + def remove_create_volume_permission( + self, + snapshot_id: str, + user_ids: Optional[List[str]] = None, + groups: Optional[Iterable[str]] = None, + ) -> None: + snapshot = self.get_snapshot(snapshot_id) # type: ignore[attr-defined] if user_ids: snapshot.create_volume_permission_userids.difference_update(user_ids) if groups and groups != ["all"]: raise InvalidAMIAttributeItemValueError("UserGroup", groups) else: - snapshot.create_volume_permission_groups.difference_update(groups) + snapshot.create_volume_permission_groups.difference_update(groups) # type: ignore[arg-type] - return True - - def _get_default_encryption_key(self): + def _get_default_encryption_key(self) -> str: # https://aws.amazon.com/kms/features/#AWS_Service_Integration # An AWS managed CMK is created automatically when you first create # an encrypted resource using an AWS service integrated with KMS. from moto.kms import kms_backends - kms = kms_backends[self.account_id][self.region_name] + kms = kms_backends[self.account_id][self.region_name] # type: ignore[attr-defined] ebs_alias = "alias/aws/ebs" if not kms.alias_exists(ebs_alias): key = kms.create_key( diff --git a/moto/ec2/models/elastic_ip_addresses.py b/moto/ec2/models/elastic_ip_addresses.py index 1e1d31ad2..3b2b57133 100644 --- a/moto/ec2/models/elastic_ip_addresses.py +++ b/moto/ec2/models/elastic_ip_addresses.py @@ -1,3 +1,4 @@ +from typing import Any, Dict, List, Optional from moto.core import CloudFormationModel from .core import TaggedEC2Resource from ..exceptions import ( @@ -16,7 +17,13 @@ from ..utils import ( class ElasticAddress(TaggedEC2Resource, CloudFormationModel): - def __init__(self, ec2_backend, domain, address=None, tags=None): + def __init__( + self, + ec2_backend: Any, + domain: str, + address: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + ): self.ec2_backend = ec2_backend if address: self.public_ip = address @@ -27,22 +34,27 @@ class ElasticAddress(TaggedEC2Resource, CloudFormationModel): self.domain = domain self.instance = None self.eni = None - self.association_id = None + self.association_id: Optional[str] = None self.add_tags(tags or {}) @staticmethod - def cloudformation_name_type(): - return None + def cloudformation_name_type() -> str: + return "" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-ec2-eip.html return "AWS::EC2::EIP" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any + ) -> "ElasticAddress": from ..models import ec2_backends ec2_backend = ec2_backends[account_id][region_name] @@ -64,21 +76,23 @@ class ElasticAddress(TaggedEC2Resource, CloudFormationModel): return eip @property - def physical_resource_id(self): + def physical_resource_id(self) -> str: return self.public_ip @classmethod - def has_cfn_attr(cls, attr): + def has_cfn_attr(cls, attr: str) -> bool: return attr in ["AllocationId"] - def get_cfn_attribute(self, attribute_name): + def get_cfn_attribute(self, attribute_name: str) -> Any: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "AllocationId": return self.allocation_id raise UnformattedGetAttTemplateException() - def get_filter_value(self, filter_name): + def get_filter_value( + self, filter_name: str, method_name: Optional[str] = None + ) -> Any: if filter_name == "allocation-id": return self.allocation_id elif filter_name == "association-id": @@ -107,20 +121,27 @@ class ElasticAddress(TaggedEC2Resource, CloudFormationModel): class ElasticAddressBackend: - def __init__(self): - self.addresses = [] + def __init__(self) -> None: + self.addresses: List[ElasticAddress] = [] - def allocate_address(self, domain, address=None, tags=None): + def allocate_address( + self, + domain: str, + address: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + ) -> ElasticAddress: if domain not in ["standard", "vpc"]: domain = "vpc" if address: - address = ElasticAddress(self, domain=domain, address=address, tags=tags) + ea = ElasticAddress(self, domain=domain, address=address, tags=tags) else: - address = ElasticAddress(self, domain=domain, tags=tags) - self.addresses.append(address) - return address + ea = ElasticAddress(self, domain=domain, tags=tags) + self.addresses.append(ea) + return ea - def address_by_ip(self, ips, fail_if_not_found=True): + def address_by_ip( + self, ips: List[str], fail_if_not_found: bool = True + ) -> List[ElasticAddress]: eips = [ address for address in self.addresses.copy() if address.public_ip in ips ] @@ -131,7 +152,7 @@ class ElasticAddressBackend: return eips - def address_by_allocation(self, allocation_ids): + def address_by_allocation(self, allocation_ids: List[str]) -> List[ElasticAddress]: eips = [ address for address in self.addresses @@ -144,7 +165,9 @@ class ElasticAddressBackend: return eips - def address_by_association(self, association_ids): + def address_by_association( + self, association_ids: List[str] + ) -> List[ElasticAddress]: eips = [ address for address in self.addresses @@ -159,12 +182,12 @@ class ElasticAddressBackend: def associate_address( self, - instance=None, - eni=None, - address=None, - allocation_id=None, - reassociate=False, - ): + instance: Any = None, + eni: Any = None, + address: Optional[str] = None, + allocation_id: Optional[str] = None, + reassociate: bool = False, + ) -> ElasticAddress: eips = [] if address: eips = self.address_by_ip([address]) @@ -192,24 +215,31 @@ class ElasticAddressBackend: raise ResourceAlreadyAssociatedError(eip.public_ip) - def describe_addresses(self, allocation_ids=None, public_ips=None, filters=None): + def describe_addresses( + self, + allocation_ids: Optional[List[str]] = None, + public_ips: Optional[List[str]] = None, + filters: Any = None, + ) -> List[ElasticAddress]: matches = self.addresses.copy() if allocation_ids: matches = [addr for addr in matches if addr.allocation_id in allocation_ids] if len(allocation_ids) > len(matches): - unknown_ids = set(allocation_ids) - set(matches) + unknown_ids = set(allocation_ids) - set(matches) # type: ignore[arg-type] raise InvalidAllocationIdError(unknown_ids) if public_ips: matches = [addr for addr in matches if addr.public_ip in public_ips] if len(public_ips) > len(matches): - unknown_ips = set(public_ips) - set(matches) + unknown_ips = set(public_ips) - set(matches) # type: ignore[arg-type] raise InvalidAddressError(unknown_ips) if filters: matches = generic_filter(filters, matches) return matches - def disassociate_address(self, address=None, association_id=None): + def disassociate_address( + self, address: Optional[str] = None, association_id: Optional[str] = None + ) -> None: eips = [] if address: eips = self.address_by_ip([address]) @@ -225,9 +255,10 @@ class ElasticAddressBackend: eip.instance = None eip.association_id = None - return True - def release_address(self, address=None, allocation_id=None): + def release_address( + self, address: Optional[str] = None, allocation_id: Optional[str] = None + ) -> None: eips = [] if address: eips = self.address_by_ip([address]) @@ -238,4 +269,3 @@ class ElasticAddressBackend: self.disassociate_address(address=eip.public_ip) eip.allocation_id = None self.addresses.remove(eip) - return True diff --git a/moto/ec2/models/elastic_network_interfaces.py b/moto/ec2/models/elastic_network_interfaces.py index 5c3be96e8..940761131 100644 --- a/moto/ec2/models/elastic_network_interfaces.py +++ b/moto/ec2/models/elastic_network_interfaces.py @@ -1,3 +1,4 @@ +from typing import Any, Dict, Optional, List, Union from moto.core import CloudFormationModel from ..exceptions import InvalidNetworkAttachmentIdError, InvalidNetworkInterfaceIdError from .core import TaggedEC2Resource @@ -15,16 +16,16 @@ from ..utils import ( class NetworkInterface(TaggedEC2Resource, CloudFormationModel): def __init__( self, - ec2_backend, - subnet, - private_ip_address, - private_ip_addresses=None, - device_index=0, - public_ip_auto_assign=False, - group_ids=None, - description=None, - tags=None, - **kwargs, + ec2_backend: Any, + subnet: Any, + private_ip_address: Union[List[str], str], + private_ip_addresses: Optional[List[Dict[str, Any]]] = None, + device_index: int = 0, + public_ip_auto_assign: bool = False, + group_ids: Optional[List[str]] = None, + description: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + **kwargs: Any, ): self.ec2_backend = ec2_backend self.id = random_eni_id() @@ -32,7 +33,7 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel): if isinstance(private_ip_address, list) and private_ip_address: private_ip_address = private_ip_address[0] self.private_ip_address = private_ip_address or None - self.private_ip_addresses = private_ip_addresses or [] + self.private_ip_addresses: List[Dict[str, Any]] = private_ip_addresses or [] self.ipv6_addresses = kwargs.get("ipv6_addresses") or [] self.subnet = subnet @@ -45,7 +46,7 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel): self.description = description self.source_dest_check = True - self.public_ip = None + self.public_ip: Optional[str] = None self.public_ip_auto_assign = public_ip_auto_assign self.start() self.add_tags(tags or {}) @@ -60,7 +61,7 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel): association = list(self.subnet.ipv6_cidr_block_associations.values())[0] subnet_ipv6_cidr_block = association.get("ipv6CidrBlock") if kwargs.get("ipv6_address_count"): - while len(self.ipv6_addresses) < kwargs.get("ipv6_address_count"): + while len(self.ipv6_addresses) < kwargs["ipv6_address_count"]: ip = random_private_ip(subnet_ipv6_cidr_block, ipv6=True) if ip not in self.ipv6_addresses: self.ipv6_addresses.append(ip) @@ -80,9 +81,9 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel): if not self.private_ip_address: if self.private_ip_addresses: - for ip in self.private_ip_addresses: - if isinstance(ip, dict) and ip.get("Primary"): - self.private_ip_address = ip.get("PrivateIpAddress") + for private_ip in self.private_ip_addresses: + if isinstance(private_ip, dict) and private_ip.get("Primary"): + self.private_ip_address = private_ip.get("PrivateIpAddress") break if not self.private_ip_addresses: self.private_ip_address = random_private_ip(self.subnet.cidr_block) @@ -133,12 +134,12 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel): self._group_set.append(group) @property - def owner_id(self): + def owner_id(self) -> str: return self.ec2_backend.account_id @property - def association(self): - association = {} + def association(self) -> Dict[str, Any]: # type: ignore[misc] + association: Dict[str, Any] = {} if self.public_ip: eips = self.ec2_backend.address_by_ip( [self.public_ip], fail_if_not_found=False @@ -150,18 +151,23 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel): return association @staticmethod - def cloudformation_name_type(): - return None + def cloudformation_name_type() -> str: + return "" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-networkinterface.html return "AWS::EC2::NetworkInterface" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "NetworkInterface": from ..models import ec2_backends properties = cloudformation_json["Properties"] @@ -186,14 +192,14 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel): ) return network_interface - def stop(self): + def stop(self) -> None: if self.public_ip_auto_assign: self.public_ip = None - def start(self): + def start(self) -> None: self.check_auto_public_ip() - def check_auto_public_ip(self): + def check_auto_public_ip(self) -> None: if ( self.public_ip_auto_assign and str(self.public_ip_auto_assign).lower() == "true" @@ -201,17 +207,17 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel): self.public_ip = random_public_ip() @property - def group_set(self): + def group_set(self) -> Any: # type: ignore[misc] if self.instance and self.instance.security_groups: return set(self._group_set) | set(self.instance.security_groups) else: return self._group_set @classmethod - def has_cfn_attr(cls, attr): + def has_cfn_attr(cls, attr: str) -> bool: return attr in ["PrimaryPrivateIpAddress", "SecondaryPrivateIpAddresses"] - def get_cfn_attribute(self, attribute_name): + def get_cfn_attribute(self, attribute_name: str) -> Any: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "PrimaryPrivateIpAddress": @@ -223,10 +229,12 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel): raise UnformattedGetAttTemplateException() @property - def physical_resource_id(self): + def physical_resource_id(self) -> str: return self.id - def get_filter_value(self, filter_name): + def get_filter_value( + self, filter_name: str, method_name: Optional[str] = None + ) -> Any: if filter_name == "network-interface-id": return self.id elif filter_name in ("addresses.private-ip-address", "private-ip-address"): @@ -242,7 +250,7 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel): elif filter_name == "description": return self.description elif filter_name == "attachment.instance-id": - return self.instance.id if self.instance else None + return self.instance.id if self.instance else None # type: ignore[attr-defined] elif filter_name == "attachment.instance-owner-id": return self.owner_id else: @@ -250,19 +258,19 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel): class NetworkInterfaceBackend: - def __init__(self): - self.enis = {} + def __init__(self) -> None: + self.enis: Dict[str, NetworkInterface] = {} def create_network_interface( self, - subnet, - private_ip_address, - private_ip_addresses=None, - group_ids=None, - description=None, - tags=None, - **kwargs, - ): + subnet: Any, + private_ip_address: Union[str, List[str]], + private_ip_addresses: Optional[List[Dict[str, Any]]] = None, + group_ids: Optional[List[str]] = None, + description: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + **kwargs: Any, + ) -> NetworkInterface: eni = NetworkInterface( self, subnet, @@ -276,23 +284,24 @@ class NetworkInterfaceBackend: self.enis[eni.id] = eni return eni - def get_network_interface(self, eni_id): + def get_network_interface(self, eni_id: str) -> NetworkInterface: for eni in self.enis.values(): if eni_id == eni.id: return eni raise InvalidNetworkInterfaceIdError(eni_id) - def delete_network_interface(self, eni_id): + def delete_network_interface(self, eni_id: str) -> None: deleted = self.enis.pop(eni_id, None) if not deleted: raise InvalidNetworkInterfaceIdError(eni_id) - return deleted - def describe_network_interfaces(self, filters=None): + def describe_network_interfaces( + self, filters: Any = None + ) -> List[NetworkInterface]: # Note: This is only used in EC2Backend#do_resources_exist # Client-calls use #get_all_network_interfaces() # We should probably merge these at some point.. - enis = self.enis.values() + enis = list(self.enis.values()) if filters: for (_filter, _filter_value) in filters.items(): @@ -302,33 +311,34 @@ class NetworkInterfaceBackend: eni for eni in enis if getattr(eni, _filter) in _filter_value ] else: - self.raise_not_implemented_error( + self.raise_not_implemented_error( # type: ignore f"The filter '{_filter}' for DescribeNetworkInterfaces" ) return enis - def attach_network_interface(self, eni_id, instance_id, device_index): + def attach_network_interface( + self, eni_id: str, instance_id: str, device_index: int + ) -> str: eni = self.get_network_interface(eni_id) - instance = self.get_instance(instance_id) + instance = self.get_instance(instance_id) # type: ignore[attr-defined] return instance.attach_eni(eni, device_index) - def detach_network_interface(self, attachment_id): - found_eni = None - + def detach_network_interface(self, attachment_id: str) -> None: for eni in self.enis.values(): if eni.attachment_id == attachment_id: - found_eni = eni - break - else: - raise InvalidNetworkAttachmentIdError(attachment_id) - - found_eni.instance.detach_eni(found_eni) + eni.instance.detach_eni(eni) # type: ignore[attr-defined] + return + raise InvalidNetworkAttachmentIdError(attachment_id) def modify_network_interface_attribute( - self, eni_id, group_ids, source_dest_check=None, description=None - ): + self, + eni_id: str, + group_ids: List[str], + source_dest_check: Optional[bool] = None, + description: Optional[str] = None, + ) -> None: eni = self.get_network_interface(eni_id) - groups = [self.get_security_group_from_id(group_id) for group_id in group_ids] + groups = [self.get_security_group_from_id(group_id) for group_id in group_ids] # type: ignore[attr-defined] if groups: eni._group_set = groups if source_dest_check in [True, False]: @@ -337,8 +347,10 @@ class NetworkInterfaceBackend: if description: eni.description = description - def get_all_network_interfaces(self, eni_ids=None, filters=None): - enis = self.enis.copy().values() + def get_all_network_interfaces( + self, eni_ids: Optional[List[str]] = None, filters: Any = None + ) -> List[NetworkInterface]: + enis = list(self.enis.values()) if eni_ids: enis = [eni for eni in enis if eni.id in eni_ids] @@ -350,7 +362,9 @@ class NetworkInterfaceBackend: return generic_filter(filters, enis) - def unassign_private_ip_addresses(self, eni_id=None, private_ip_address=None): + def unassign_private_ip_addresses( + self, eni_id: str, private_ip_address: Optional[List[str]] = None + ) -> NetworkInterface: eni = self.get_network_interface(eni_id) if private_ip_address: for item in eni.private_ip_addresses.copy(): @@ -358,7 +372,9 @@ class NetworkInterfaceBackend: eni.private_ip_addresses.remove(item) return eni - def assign_private_ip_addresses(self, eni_id=None, secondary_ips_count=None): + def assign_private_ip_addresses( + self, eni_id: str, secondary_ips_count: Optional[int] = None + ) -> NetworkInterface: eni = self.get_network_interface(eni_id) eni_assigned_ips = [ item.get("PrivateIpAddress") for item in eni.private_ip_addresses @@ -372,7 +388,12 @@ class NetworkInterfaceBackend: secondary_ips_count -= 1 return eni - def assign_ipv6_addresses(self, eni_id=None, ipv6_addresses=None, ipv6_count=None): + def assign_ipv6_addresses( + self, + eni_id: str, + ipv6_addresses: Optional[List[str]] = None, + ipv6_count: Optional[int] = None, + ) -> NetworkInterface: eni = self.get_network_interface(eni_id) if ipv6_addresses: eni.ipv6_addresses.extend(ipv6_addresses) @@ -386,10 +407,12 @@ class NetworkInterfaceBackend: ipv6_count -= 1 return eni - def unassign_ipv6_addresses(self, eni_id=None, ips=None): + def unassign_ipv6_addresses( + self, eni_id: str, ips: Optional[List[str]] = None + ) -> NetworkInterface: eni = self.get_network_interface(eni_id) if ips: for ip in eni.ipv6_addresses.copy(): if ip in ips: eni.ipv6_addresses.remove(ip) - return eni, ips + return eni diff --git a/moto/ec2/models/security_groups.py b/moto/ec2/models/security_groups.py index c0ba1470d..e0ffa6cf8 100644 --- a/moto/ec2/models/security_groups.py +++ b/moto/ec2/models/security_groups.py @@ -2,7 +2,7 @@ import copy import itertools import json from collections import defaultdict -from typing import Optional +from typing import Any, Dict, Optional from moto.core import CloudFormationModel from moto.core.utils import aws_api_matches from ..exceptions import ( @@ -110,13 +110,13 @@ class SecurityRule(object): class SecurityGroup(TaggedEC2Resource, CloudFormationModel): def __init__( self, - ec2_backend, - group_id, - name, - description, - vpc_id=None, - tags=None, - is_default=None, + ec2_backend: Any, + group_id: str, + name: str, + description: str, + vpc_id: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + is_default: Optional[bool] = None, ): self.ec2_backend = ec2_backend self.id = group_id diff --git a/moto/ec2/responses/dhcp_options.py b/moto/ec2/responses/dhcp_options.py index 5d4b90c7e..76b7b1a60 100644 --- a/moto/ec2/responses/dhcp_options.py +++ b/moto/ec2/responses/dhcp_options.py @@ -39,9 +39,9 @@ class DHCPOptions(EC2BaseResponse): def delete_dhcp_options(self): dhcp_opt_id = self._get_param("DhcpOptionsId") - delete_status = self.ec2_backend.delete_dhcp_options_set(dhcp_opt_id) + self.ec2_backend.delete_dhcp_options_set(dhcp_opt_id) template = self.response_template(DELETE_DHCP_OPTIONS_RESPONSE) - return template.render(delete_status=delete_status) + return template.render(delete_status="true") def describe_dhcp_options(self): dhcp_opt_ids = self._get_multi_param("DhcpOptionsId") diff --git a/moto/ec2/responses/elastic_network_interfaces.py b/moto/ec2/responses/elastic_network_interfaces.py index 6a4f8cd4e..c436f5305 100644 --- a/moto/ec2/responses/elastic_network_interfaces.py +++ b/moto/ec2/responses/elastic_network_interfaces.py @@ -133,9 +133,9 @@ class ElasticNetworkInterfaces(EC2BaseResponse): def unassign_ipv6_addresses(self): eni_id = self._get_param("NetworkInterfaceId") ips = self._get_multi_param("Ipv6Addresses") - eni, unassigned_ips = self.ec2_backend.unassign_ipv6_addresses(eni_id, ips) + eni = self.ec2_backend.unassign_ipv6_addresses(eni_id, ips) template = self.response_template(UNASSIGN_IPV6_ADDRESSES) - return template.render(eni=eni, unassigned_ips=unassigned_ips) + return template.render(eni=eni, unassigned_ips=ips) ASSIGN_PRIVATE_IP_ADDRESSES = """ diff --git a/moto/ec2/utils.py b/moto/ec2/utils.py index 77d2e76cc..6d26530a7 100644 --- a/moto/ec2/utils.py +++ b/moto/ec2/utils.py @@ -100,7 +100,7 @@ def random_flow_log_id(): return random_id(prefix=EC2_RESOURCE_TO_PREFIX["flow-logs"]) -def random_snapshot_id(): +def random_snapshot_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["snapshot"]) @@ -146,7 +146,7 @@ def random_customer_gateway_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["customer-gateway"]) -def random_volume_id(): +def random_volume_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["volume"]) @@ -170,7 +170,7 @@ def random_vpc_peering_connection_id(): return random_id(prefix=EC2_RESOURCE_TO_PREFIX["vpc-peering-connection"]) -def random_eip_association_id(): +def random_eip_association_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["vpc-elastic-ip-association"]) @@ -188,23 +188,23 @@ def random_route_table_id(): return random_id(prefix=EC2_RESOURCE_TO_PREFIX["route-table"]) -def random_eip_allocation_id(): +def random_eip_allocation_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["vpc-elastic-ip"]) -def random_dhcp_option_id(): +def random_dhcp_option_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["dhcp-options"]) -def random_eni_id(): +def random_eni_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["network-interface"]) -def random_eni_attach_id(): +def random_eni_attach_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["network-interface-attachment"]) -def random_nat_gateway_id(): +def random_nat_gateway_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["nat-gateway"], size=17) @@ -236,7 +236,7 @@ def random_carrier_gateway_id() -> str: return random_id(prefix=EC2_RESOURCE_TO_PREFIX["carrier-gateway"], size=17) -def random_public_ip(): +def random_public_ip() -> str: return f"54.214.{random.choice(range(255))}.{random.choice(range(255))}" @@ -244,7 +244,7 @@ def random_dedicated_host_id(): return random_id(prefix=EC2_RESOURCE_TO_PREFIX["dedicated_host"], size=17) -def random_private_ip(cidr=None, ipv6=False): +def random_private_ip(cidr: str = None, ipv6: bool = False) -> str: # prefix - ula.prefixlen : get number of remaing length for the IP. # prefix will be 32 for IPv4 and 128 for IPv6. # random.getrandbits() will generate remaining bits for IPv6 or Ipv4 in decimal format @@ -259,16 +259,16 @@ def random_private_ip(cidr=None, ipv6=False): return f"10.{random.choice(range(255))}.{random.choice(range(255))}.{random.choice(range(255))}" -def random_ip(): +def random_ip() -> str: return f"127.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}" -def generate_dns_from_ip(ip, dns_type="internal"): +def generate_dns_from_ip(ip: Any, dns_type: str = "internal") -> str: splits = ip.split("/")[0].split(".") if "/" in ip else ip.split(".") return f"ip-{splits[0]}-{splits[1]}-{splits[2]}-{splits[3]}.ec2.{dns_type}" -def random_mac_address(): +def random_mac_address() -> str: return f"02:00:00:{random.randint(0, 255)}02x:{random.randint(0, 255)}02x:{random.randint(0, 255)}02x" diff --git a/moto/packages/boto/ec2/blockdevicemapping.py b/moto/packages/boto/ec2/blockdevicemapping.py index 775d29460..c88f8053d 100644 --- a/moto/packages/boto/ec2/blockdevicemapping.py +++ b/moto/packages/boto/ec2/blockdevicemapping.py @@ -38,7 +38,7 @@ class BlockDeviceType(object): status: Optional[str] = None, attach_time: Optional[str] = None, delete_on_termination: bool = False, - size: Optional[str] = None, + size: Optional[int] = None, volume_type: Optional[str] = None, iops: Optional[str] = None, encrypted: Optional[str] = None, diff --git a/setup.cfg b/setup.cfg index fda491699..1bdd2075c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -229,7 +229,7 @@ disable = W,C,R,E enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import [mypy] -files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/moto_api +files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/ec2/models/d*,moto/ec2/models/e*,moto/moto_api show_column_numbers=True show_error_codes = True disable_error_code=abstract