Techdebt: MyPy EC2 (d-e-models) (#5898)

This commit is contained in:
Bert Blommers 2023-02-03 12:06:38 -01:00 committed by GitHub
parent 7d7663d703
commit b41533d42d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 373 additions and 268 deletions

View File

@ -71,7 +71,7 @@ class EBSBackend(BaseBackend):
return ec2_backends[self.account_id][self.region_name]
def start_snapshot(
self, volume_size: str, tags: Optional[List[Dict[str, str]]], description: str
self, volume_size: int, tags: Optional[List[Dict[str, str]]], description: str
) -> EBSSnapshot:
zone_name = f"{self.region_name}a"
vol = self.ec2_backend.create_volume(size=volume_size, zone_name=zone_name)

View File

@ -1,5 +1,5 @@
from moto.core.exceptions import RESTError
from typing import List, Optional, Union
from typing import Any, List, Optional, Union, Iterable
# EC2 has a custom root-tag - <Response> vs <ErrorResponse>
@ -51,7 +51,7 @@ class MissingParameterError(EC2ClientError):
class InvalidDHCPOptionsIdError(EC2ClientError):
def __init__(self, dhcp_options_id):
def __init__(self, dhcp_options_id: str):
super().__init__(
"InvalidDhcpOptionID.NotFound",
f"DhcpOptionID {dhcp_options_id} does not exist.",
@ -69,7 +69,7 @@ class InvalidParameterCombination(EC2ClientError):
class MalformedDHCPOptionsIdError(EC2ClientError):
def __init__(self, dhcp_options_id):
def __init__(self, dhcp_options_id: Optional[str]):
super().__init__(
"InvalidDhcpOptionsId.Malformed",
f'Invalid id: "{dhcp_options_id}" (expecting "dopt-...")',
@ -166,7 +166,7 @@ class InvalidCustomerGatewayIdError(EC2ClientError):
class InvalidNetworkInterfaceIdError(EC2ClientError):
def __init__(self, eni_id):
def __init__(self, eni_id: str):
super().__init__(
"InvalidNetworkInterfaceID.NotFound",
f"The network interface ID '{eni_id}' does not exist",
@ -174,7 +174,7 @@ class InvalidNetworkInterfaceIdError(EC2ClientError):
class InvalidNetworkAttachmentIdError(EC2ClientError):
def __init__(self, attachment_id):
def __init__(self, attachment_id: str):
super().__init__(
"InvalidAttachmentID.NotFound",
f"The network interface attachment ID '{attachment_id}' does not exist",
@ -274,7 +274,7 @@ class UnvailableAMIIdError(EC2ClientError):
class InvalidAMIAttributeItemValueError(EC2ClientError):
def __init__(self, attribute: str, value: str):
def __init__(self, attribute: str, value: Union[str, Iterable[str], None]):
super().__init__(
"InvalidAMIAttributeItemValue",
f'Invalid attribute item value "{value}" for {attribute} item type.',
@ -289,13 +289,13 @@ class MalformedAMIIdError(EC2ClientError):
class InvalidSnapshotIdError(EC2ClientError):
def __init__(self):
def __init__(self) -> None:
# Note: AWS returns empty message for this, as of 2014.08.22.
super().__init__("InvalidSnapshot.NotFound", "")
class InvalidSnapshotInUse(EC2ClientError):
def __init__(self, snapshot_id, ami_id):
def __init__(self, snapshot_id: str, ami_id: str):
super().__init__(
"InvalidSnapshot.InUse",
f"The snapshot {snapshot_id} is currently in use by {ami_id}",
@ -303,14 +303,14 @@ class InvalidSnapshotInUse(EC2ClientError):
class InvalidVolumeIdError(EC2ClientError):
def __init__(self, volume_id):
def __init__(self, volume_id: Any):
super().__init__(
"InvalidVolume.NotFound", f"The volume '{volume_id}' does not exist."
)
class InvalidVolumeAttachmentError(EC2ClientError):
def __init__(self, volume_id, instance_id):
def __init__(self, volume_id: str, instance_id: str):
super().__init__(
"InvalidAttachment.NotFound",
f"Volume {volume_id} can not be detached from {instance_id} because it is not attached",
@ -318,7 +318,7 @@ class InvalidVolumeAttachmentError(EC2ClientError):
class InvalidVolumeDetachmentError(EC2ClientError):
def __init__(self, volume_id, instance_id, device):
def __init__(self, volume_id: str, instance_id: str, device: str):
super().__init__(
"InvalidAttachment.NotFound",
f"The volume {volume_id} is not attached to instance {instance_id} as device {device}",
@ -326,7 +326,7 @@ class InvalidVolumeDetachmentError(EC2ClientError):
class VolumeInUseError(EC2ClientError):
def __init__(self, volume_id, instance_id):
def __init__(self, volume_id: str, instance_id: str):
super().__init__(
"VolumeInUse",
f"Volume {volume_id} is currently attached to {instance_id}",
@ -334,7 +334,7 @@ class VolumeInUseError(EC2ClientError):
class InvalidAddressError(EC2ClientError):
def __init__(self, ip):
def __init__(self, ip: Any):
super().__init__("InvalidAddress.NotFound", f"Address '{ip}' not found.")
@ -347,7 +347,7 @@ class LogDestinationNotFoundError(EC2ClientError):
class InvalidAllocationIdError(EC2ClientError):
def __init__(self, allocation_id):
def __init__(self, allocation_id: Any):
super().__init__(
"InvalidAllocationID.NotFound",
f"Allocation ID '{allocation_id}' not found.",
@ -355,7 +355,7 @@ class InvalidAllocationIdError(EC2ClientError):
class InvalidAssociationIdError(EC2ClientError):
def __init__(self, association_id):
def __init__(self, association_id: Any):
super().__init__(
"InvalidAssociationID.NotFound",
f"Association ID '{association_id}' not found.",
@ -426,7 +426,7 @@ class InvalidAggregationIntervalParameterError(EC2ClientError):
class InvalidParameterValueError(EC2ClientError):
def __init__(self, parameter_value):
def __init__(self, parameter_value: str):
super().__init__(
"InvalidParameterValue",
f"Value {parameter_value} is invalid for parameter.",
@ -480,7 +480,7 @@ class GatewayNotAttachedError(EC2ClientError):
class ResourceAlreadyAssociatedError(EC2ClientError):
def __init__(self, resource_id):
def __init__(self, resource_id: str):
super().__init__(
"Resource.AlreadyAssociated",
f"Resource {resource_id} is already associated.",
@ -662,7 +662,7 @@ class InvalidLaunchTemplateNameNotFoundWithNameError(EC2ClientError):
class InvalidParameterDependency(EC2ClientError):
def __init__(self, param, param_needed):
def __init__(self, param: str, param_needed: str):
super().__init__(
"InvalidParameterDependency",
f"The parameter [{param}] requires the parameter {param_needed} to be set.",

View File

@ -186,7 +186,7 @@ class EC2Backend(
def raise_error(self, code, message):
raise EC2ClientError(code, message)
def raise_not_implemented_error(self, blurb):
def raise_not_implemented_error(self, blurb: str):
raise MotoNotImplementedError(blurb)
def do_resources_exist(self, resource_ids):

View File

@ -1,4 +1,5 @@
import itertools
from typing import Any, Dict, List, Optional
from ..exceptions import (
DependencyViolationError,
InvalidDHCPOptionsIdError,
@ -12,12 +13,12 @@ from ..utils import random_dhcp_option_id, generic_filter
class DHCPOptionsSet(TaggedEC2Resource):
def __init__(
self,
ec2_backend,
domain_name_servers=None,
domain_name=None,
ntp_servers=None,
netbios_name_servers=None,
netbios_node_type=None,
ec2_backend: Any,
domain_name_servers: Optional[List[str]] = None,
domain_name: Optional[str] = None,
ntp_servers: Optional[List[str]] = None,
netbios_name_servers: Optional[List[str]] = None,
netbios_node_type: Optional[str] = None,
):
self.ec2_backend = ec2_backend
self._options = {
@ -30,7 +31,9 @@ class DHCPOptionsSet(TaggedEC2Resource):
self.id = random_dhcp_option_id()
self.vpc = None
def get_filter_value(self, filter_name):
def get_filter_value(
self, filter_name: str, method_name: Optional[str] = None
) -> Any:
"""
API Version 2015-10-01 defines the following filters for DescribeDhcpOptions:
@ -54,26 +57,26 @@ class DHCPOptionsSet(TaggedEC2Resource):
return super().get_filter_value(filter_name, "DescribeDhcpOptions")
@property
def options(self):
def options(self) -> Dict[str, Any]: # type: ignore[misc]
return self._options
class DHCPOptionsSetBackend:
def __init__(self):
self.dhcp_options_sets = {}
def __init__(self) -> None:
self.dhcp_options_sets: Dict[str, DHCPOptionsSet] = {}
def associate_dhcp_options(self, dhcp_options, vpc):
def associate_dhcp_options(self, dhcp_options: DHCPOptionsSet, vpc: Any) -> None:
dhcp_options.vpc = vpc
vpc.dhcp_options = dhcp_options
def create_dhcp_options(
self,
domain_name_servers=None,
domain_name=None,
ntp_servers=None,
netbios_name_servers=None,
netbios_node_type=None,
):
domain_name_servers: Optional[List[str]] = None,
domain_name: Optional[str] = None,
ntp_servers: Optional[List[str]] = None,
netbios_name_servers: Optional[List[str]] = None,
netbios_node_type: Optional[str] = None,
) -> DHCPOptionsSet:
NETBIOS_NODE_TYPES = [1, 2, 4, 8]
@ -95,7 +98,7 @@ class DHCPOptionsSetBackend:
self.dhcp_options_sets[options.id] = options
return options
def delete_dhcp_options_set(self, options_id):
def delete_dhcp_options_set(self, options_id: Optional[str]) -> None:
if not (options_id and options_id.startswith("dopt-")):
raise MalformedDHCPOptionsIdError(options_id)
@ -105,10 +108,11 @@ class DHCPOptionsSetBackend:
self.dhcp_options_sets.pop(options_id)
else:
raise InvalidDHCPOptionsIdError(options_id)
return True
def describe_dhcp_options(self, dhcp_options_ids=None, filters=None):
dhcp_options_sets = self.dhcp_options_sets.copy().values()
def describe_dhcp_options(
self, dhcp_options_ids: Optional[List[str]] = None, filters: Any = None
) -> List[DHCPOptionsSet]:
dhcp_options_sets = list(self.dhcp_options_sets.copy().values())
if dhcp_options_ids:
dhcp_options_sets = [

View File

@ -1,4 +1,4 @@
from typing import Optional
from typing import Any, Dict, List, Optional, Set, Iterable
from moto.core import CloudFormationModel
from moto.packages.boto.ec2.blockdevicemapping import BlockDeviceType
@ -26,8 +26,13 @@ IOPS_SUPPORTED_VOLUME_TYPES = ["gp3", "io1", "io2"]
GP3_DEFAULT_IOPS = 3000
class VolumeModification(object):
def __init__(self, volume, target_size=None, target_volume_type=None):
class VolumeModification:
def __init__(
self,
volume: "Volume",
target_size: Optional[int] = None,
target_volume_type: Optional[str] = None,
):
if not any([target_size, target_volume_type]):
raise InvalidParameterValueError(
"Invalid input: Must specify at least one of size or type"
@ -42,7 +47,7 @@ class VolumeModification(object):
self.start_time = utc_date_and_time()
self.end_time = utc_date_and_time()
def get_filter_value(self, filter_name):
def get_filter_value(self, filter_name: str) -> Any:
if filter_name == "original-size":
return self.original_size
elif filter_name == "original-volume-type":
@ -56,7 +61,7 @@ class VolumeModification(object):
class VolumeAttachment(CloudFormationModel):
def __init__(self, volume, instance, device, status):
def __init__(self, volume: "Volume", instance: Any, device: str, status: str):
self.volume = volume
self.attach_time = utc_date_and_time()
self.instance = instance
@ -64,18 +69,23 @@ class VolumeAttachment(CloudFormationModel):
self.status = status
@staticmethod
def cloudformation_name_type():
return None
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type():
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-volumeattachment.html
return "AWS::EC2::VolumeAttachment"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any
) -> "VolumeAttachment":
from ..models import ec2_backends
properties = cloudformation_json["Properties"]
@ -95,30 +105,34 @@ class VolumeAttachment(CloudFormationModel):
class Volume(TaggedEC2Resource, CloudFormationModel):
def __init__(
self,
ec2_backend,
volume_id,
size,
zone,
snapshot_id=None,
encrypted=False,
kms_key_id=None,
volume_type=None,
iops=None,
ec2_backend: Any,
volume_id: str,
size: int,
zone: Any,
snapshot_id: Optional[str] = None,
encrypted: bool = False,
kms_key_id: Optional[str] = None,
volume_type: Optional[str] = None,
iops: Optional[int] = None,
):
self.id = volume_id
self.volume_type = volume_type or "gp2"
self.size = size
self.zone = zone
self.create_time = utc_date_and_time()
self.attachment = None
self.attachment: Optional[VolumeAttachment] = None
self.snapshot_id = snapshot_id
self.ec2_backend = ec2_backend
self.encrypted = encrypted
self.kms_key_id = kms_key_id
self.modifications = []
self.modifications: List[VolumeModification] = []
self.iops = iops
def modify(self, target_size=None, target_volume_type=None):
def modify(
self,
target_size: Optional[int] = None,
target_volume_type: Optional[str] = None,
) -> None:
modification = VolumeModification(
volume=self, target_size=target_size, target_volume_type=target_volume_type
)
@ -128,18 +142,23 @@ class Volume(TaggedEC2Resource, CloudFormationModel):
self.volume_type = modification.target_volume_type
@staticmethod
def cloudformation_name_type():
return None
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type():
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-volume.html
return "AWS::EC2::Volume"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any
) -> "Volume":
from ..models import ec2_backends
properties = cloudformation_json["Properties"]
@ -151,27 +170,29 @@ class Volume(TaggedEC2Resource, CloudFormationModel):
return volume
@property
def physical_resource_id(self):
def physical_resource_id(self) -> str:
return self.id
@property
def status(self):
def status(self) -> str:
if self.attachment:
return "in-use"
else:
return "available"
def get_filter_value(self, filter_name):
def get_filter_value(
self, filter_name: str, method_name: Optional[str] = None
) -> Any:
if filter_name.startswith("attachment") and not self.attachment:
return None
elif filter_name == "attachment.attach-time":
return self.attachment.attach_time
return self.attachment.attach_time # type: ignore[union-attr]
elif filter_name == "attachment.device":
return self.attachment.device
return self.attachment.device # type: ignore[union-attr]
elif filter_name == "attachment.instance-id":
return self.attachment.instance.id
return self.attachment.instance.id # type: ignore[union-attr]
elif filter_name == "attachment.status":
return self.attachment.status
return self.attachment.status # type: ignore[union-attr]
elif filter_name == "create-time":
return self.create_time
elif filter_name == "size":
@ -193,27 +214,29 @@ class Volume(TaggedEC2Resource, CloudFormationModel):
class Snapshot(TaggedEC2Resource):
def __init__(
self,
ec2_backend,
snapshot_id,
volume,
description,
encrypted=False,
owner_id=None,
from_ami=None,
ec2_backend: Any,
snapshot_id: str,
volume: Any,
description: str,
encrypted: bool = False,
owner_id: Optional[str] = None,
from_ami: Optional[str] = None,
):
self.id = snapshot_id
self.volume = volume
self.description = description
self.start_time = utc_date_and_time()
self.create_volume_permission_groups = set()
self.create_volume_permission_userids = set()
self.create_volume_permission_groups: Set[str] = set()
self.create_volume_permission_userids: Set[str] = set()
self.ec2_backend = ec2_backend
self.status = "completed"
self.encrypted = encrypted
self.owner_id = owner_id or ec2_backend.account_id
self.from_ami = from_ami
def get_filter_value(self, filter_name):
def get_filter_value(
self, filter_name: str, method_name: Optional[str] = None
) -> Any:
if filter_name == "description":
return self.description
elif filter_name == "snapshot-id":
@ -235,20 +258,20 @@ class Snapshot(TaggedEC2Resource):
class EBSBackend:
def __init__(self):
self.volumes = {}
self.attachments = {}
self.snapshots = {}
def __init__(self) -> None:
self.volumes: Dict[str, Volume] = {}
self.attachments: Dict[str, VolumeAttachment] = {}
self.snapshots: Dict[str, Snapshot] = {}
def create_volume(
self,
size: str,
size: int,
zone_name: str,
snapshot_id: Optional[str] = None,
encrypted: bool = False,
kms_key_id: Optional[str] = None,
volume_type: Optional[str] = None,
iops: Optional[str] = None,
iops: Optional[int] = None,
) -> Volume:
if kms_key_id and not encrypted:
raise InvalidParameterDependency("KmsKeyId", "Encrypted")
@ -262,7 +285,7 @@ class EBSBackend:
raise InvalidParameterDependency("VolumeType", "Iops")
volume_id = random_volume_id()
zone = self.get_zone_by_name(zone_name)
zone = self.get_zone_by_name(zone_name) # type: ignore[attr-defined]
if snapshot_id:
snapshot = self.get_snapshot(snapshot_id)
if size is None:
@ -283,23 +306,32 @@ class EBSBackend:
self.volumes[volume_id] = volume
return volume
def describe_volumes(self, volume_ids=None, filters=None):
matches = self.volumes.copy().values()
def describe_volumes(
self, volume_ids: Optional[List[str]] = None, filters: Any = None
) -> List[Volume]:
matches = list(self.volumes.values())
if volume_ids:
matches = [vol for vol in matches if vol.id in volume_ids]
if len(volume_ids) > len(matches):
unknown_ids = set(volume_ids) - set(matches)
unknown_ids = set(volume_ids) - set(matches) # type: ignore[arg-type]
raise InvalidVolumeIdError(unknown_ids)
if filters:
matches = generic_filter(filters, matches)
return matches
def modify_volume(self, volume_id, target_size=None, target_volume_type=None):
def modify_volume(
self,
volume_id: str,
target_size: Optional[int] = None,
target_volume_type: Optional[str] = None,
) -> Volume:
volume = self.get_volume(volume_id)
volume.modify(target_size=target_size, target_volume_type=target_volume_type)
return volume
def describe_volumes_modifications(self, volume_ids=None, filters=None):
def describe_volumes_modifications(
self, volume_ids: Optional[List[str]] = None, filters: Any = None
) -> List[VolumeModification]:
volumes = self.describe_volumes(volume_ids)
modifications = []
for volume in volumes:
@ -308,13 +340,13 @@ class EBSBackend:
modifications = generic_filter(filters, modifications)
return modifications
def get_volume(self, volume_id):
def get_volume(self, volume_id: str) -> Volume:
volume = self.volumes.get(volume_id, None)
if not volume:
raise InvalidVolumeIdError(volume_id)
return volume
def delete_volume(self, volume_id):
def delete_volume(self, volume_id: str) -> Volume:
if volume_id in self.volumes:
volume = self.volumes[volume_id]
if volume.attachment:
@ -323,13 +355,17 @@ class EBSBackend:
raise InvalidVolumeIdError(volume_id)
def attach_volume(
self, volume_id, instance_id, device_path, delete_on_termination=False
):
self,
volume_id: str,
instance_id: str,
device_path: str,
delete_on_termination: bool = False,
) -> Optional[VolumeAttachment]:
volume = self.get_volume(volume_id)
instance = self.get_instance(instance_id)
instance = self.get_instance(instance_id) # type: ignore[attr-defined]
if not volume or not instance:
return False
return None
volume.attachment = VolumeAttachment(volume, instance, device_path, "attached")
# Modify instance to capture mount of block device.
@ -343,9 +379,11 @@ class EBSBackend:
instance.block_device_mapping[device_path] = bdt
return volume.attachment
def detach_volume(self, volume_id, instance_id, device_path):
def detach_volume(
self, volume_id: str, instance_id: str, device_path: str
) -> VolumeAttachment:
volume = self.get_volume(volume_id)
instance = self.get_instance(instance_id)
instance = self.get_instance(instance_id) # type: ignore[attr-defined]
old_attachment = volume.attachment
if not old_attachment:
@ -376,15 +414,17 @@ class EBSBackend:
params.append(owner_id)
if from_ami:
params.append(from_ami)
snapshot = Snapshot(*params)
snapshot = Snapshot(*params) # type: ignore[arg-type]
self.snapshots[snapshot_id] = snapshot
return snapshot
def create_snapshots(self, instance_spec, description, tags):
def create_snapshots(
self, instance_spec: Dict[str, Any], description: str, tags: Dict[str, str]
) -> List[Snapshot]:
"""
The CopyTagsFromSource-parameter is not yet implemented.
"""
instance = self.get_instance(instance_spec["InstanceId"])
instance = self.get_instance(instance_spec["InstanceId"]) # type: ignore[attr-defined]
block_device_mappings = instance.block_device_mapping
if str(instance_spec.get("ExcludeBootVolume", False)).lower() == "true":
@ -403,8 +443,10 @@ class EBSBackend:
snapshot.add_tags(tags)
return snapshots
def describe_snapshots(self, snapshot_ids=None, filters=None):
matches = self.snapshots.copy().values()
def describe_snapshots(
self, snapshot_ids: Optional[List[str]] = None, filters: Any = None
) -> List[Snapshot]:
matches = list(self.snapshots.values())
if snapshot_ids:
matches = [snap for snap in matches if snap.id in snapshot_ids]
if len(snapshot_ids) > len(matches):
@ -413,12 +455,15 @@ class EBSBackend:
matches = generic_filter(filters, matches)
return matches
def copy_snapshot(self, source_snapshot_id, source_region, description=None):
def copy_snapshot(
self, source_snapshot_id: str, source_region: str, description: str
) -> Snapshot:
from ..models import ec2_backends
source_snapshot = ec2_backends[self.account_id][
source_region
].describe_snapshots(snapshot_ids=[source_snapshot_id])[0]
backend = ec2_backends[self.account_id][source_region] # type: ignore[attr-defined]
source_snapshot = backend.describe_snapshots(snapshot_ids=[source_snapshot_id])[
0
]
snapshot_id = random_snapshot_id()
snapshot = Snapshot(
self,
@ -430,30 +475,32 @@ class EBSBackend:
self.snapshots[snapshot_id] = snapshot
return snapshot
def get_snapshot(self, snapshot_id):
def get_snapshot(self, snapshot_id: str) -> Snapshot:
snapshot = self.snapshots.get(snapshot_id, None)
if not snapshot:
raise InvalidSnapshotIdError()
return snapshot
def delete_snapshot(self, snapshot_id):
if snapshot_id in self.snapshots:
def delete_snapshot(self, snapshot_id: str) -> Snapshot:
if snapshot_id in self.snapshots: # type: ignore[attr-defined]
snapshot = self.snapshots[snapshot_id]
if snapshot.from_ami and snapshot.from_ami in self.amis:
if snapshot.from_ami and snapshot.from_ami in self.amis: # type: ignore[attr-defined]
raise InvalidSnapshotInUse(snapshot_id, snapshot.from_ami)
return self.snapshots.pop(snapshot_id)
raise InvalidSnapshotIdError()
def get_create_volume_permission_groups(self, snapshot_id):
snapshot = self.get_snapshot(snapshot_id)
def get_create_volume_permission_groups(self, snapshot_id: str) -> Set[str]:
snapshot = self.get_snapshot(snapshot_id) # type: ignore[attr-defined]
return snapshot.create_volume_permission_groups
def get_create_volume_permission_userids(self, snapshot_id):
snapshot = self.get_snapshot(snapshot_id)
def get_create_volume_permission_userids(self, snapshot_id: str) -> Set[str]:
snapshot = self.get_snapshot(snapshot_id) # type: ignore[attr-defined]
return snapshot.create_volume_permission_userids
def add_create_volume_permission(self, snapshot_id, user_ids=None, groups=None):
snapshot = self.get_snapshot(snapshot_id)
def add_create_volume_permission(
self, snapshot_id: str, user_ids: List[str], groups: List[str]
) -> None:
snapshot = self.get_snapshot(snapshot_id) # type: ignore[attr-defined]
if user_ids:
snapshot.create_volume_permission_userids.update(user_ids)
@ -462,27 +509,28 @@ class EBSBackend:
else:
snapshot.create_volume_permission_groups.update(groups)
return True
def remove_create_volume_permission(self, snapshot_id, user_ids=None, groups=None):
snapshot = self.get_snapshot(snapshot_id)
def remove_create_volume_permission(
self,
snapshot_id: str,
user_ids: Optional[List[str]] = None,
groups: Optional[Iterable[str]] = None,
) -> None:
snapshot = self.get_snapshot(snapshot_id) # type: ignore[attr-defined]
if user_ids:
snapshot.create_volume_permission_userids.difference_update(user_ids)
if groups and groups != ["all"]:
raise InvalidAMIAttributeItemValueError("UserGroup", groups)
else:
snapshot.create_volume_permission_groups.difference_update(groups)
snapshot.create_volume_permission_groups.difference_update(groups) # type: ignore[arg-type]
return True
def _get_default_encryption_key(self):
def _get_default_encryption_key(self) -> str:
# https://aws.amazon.com/kms/features/#AWS_Service_Integration
# An AWS managed CMK is created automatically when you first create
# an encrypted resource using an AWS service integrated with KMS.
from moto.kms import kms_backends
kms = kms_backends[self.account_id][self.region_name]
kms = kms_backends[self.account_id][self.region_name] # type: ignore[attr-defined]
ebs_alias = "alias/aws/ebs"
if not kms.alias_exists(ebs_alias):
key = kms.create_key(

View File

@ -1,3 +1,4 @@
from typing import Any, Dict, List, Optional
from moto.core import CloudFormationModel
from .core import TaggedEC2Resource
from ..exceptions import (
@ -16,7 +17,13 @@ from ..utils import (
class ElasticAddress(TaggedEC2Resource, CloudFormationModel):
def __init__(self, ec2_backend, domain, address=None, tags=None):
def __init__(
self,
ec2_backend: Any,
domain: str,
address: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
):
self.ec2_backend = ec2_backend
if address:
self.public_ip = address
@ -27,22 +34,27 @@ class ElasticAddress(TaggedEC2Resource, CloudFormationModel):
self.domain = domain
self.instance = None
self.eni = None
self.association_id = None
self.association_id: Optional[str] = None
self.add_tags(tags or {})
@staticmethod
def cloudformation_name_type():
return None
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type():
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-ec2-eip.html
return "AWS::EC2::EIP"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any
) -> "ElasticAddress":
from ..models import ec2_backends
ec2_backend = ec2_backends[account_id][region_name]
@ -64,21 +76,23 @@ class ElasticAddress(TaggedEC2Resource, CloudFormationModel):
return eip
@property
def physical_resource_id(self):
def physical_resource_id(self) -> str:
return self.public_ip
@classmethod
def has_cfn_attr(cls, attr):
def has_cfn_attr(cls, attr: str) -> bool:
return attr in ["AllocationId"]
def get_cfn_attribute(self, attribute_name):
def get_cfn_attribute(self, attribute_name: str) -> Any:
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == "AllocationId":
return self.allocation_id
raise UnformattedGetAttTemplateException()
def get_filter_value(self, filter_name):
def get_filter_value(
self, filter_name: str, method_name: Optional[str] = None
) -> Any:
if filter_name == "allocation-id":
return self.allocation_id
elif filter_name == "association-id":
@ -107,20 +121,27 @@ class ElasticAddress(TaggedEC2Resource, CloudFormationModel):
class ElasticAddressBackend:
def __init__(self):
self.addresses = []
def __init__(self) -> None:
self.addresses: List[ElasticAddress] = []
def allocate_address(self, domain, address=None, tags=None):
def allocate_address(
self,
domain: str,
address: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
) -> ElasticAddress:
if domain not in ["standard", "vpc"]:
domain = "vpc"
if address:
address = ElasticAddress(self, domain=domain, address=address, tags=tags)
ea = ElasticAddress(self, domain=domain, address=address, tags=tags)
else:
address = ElasticAddress(self, domain=domain, tags=tags)
self.addresses.append(address)
return address
ea = ElasticAddress(self, domain=domain, tags=tags)
self.addresses.append(ea)
return ea
def address_by_ip(self, ips, fail_if_not_found=True):
def address_by_ip(
self, ips: List[str], fail_if_not_found: bool = True
) -> List[ElasticAddress]:
eips = [
address for address in self.addresses.copy() if address.public_ip in ips
]
@ -131,7 +152,7 @@ class ElasticAddressBackend:
return eips
def address_by_allocation(self, allocation_ids):
def address_by_allocation(self, allocation_ids: List[str]) -> List[ElasticAddress]:
eips = [
address
for address in self.addresses
@ -144,7 +165,9 @@ class ElasticAddressBackend:
return eips
def address_by_association(self, association_ids):
def address_by_association(
self, association_ids: List[str]
) -> List[ElasticAddress]:
eips = [
address
for address in self.addresses
@ -159,12 +182,12 @@ class ElasticAddressBackend:
def associate_address(
self,
instance=None,
eni=None,
address=None,
allocation_id=None,
reassociate=False,
):
instance: Any = None,
eni: Any = None,
address: Optional[str] = None,
allocation_id: Optional[str] = None,
reassociate: bool = False,
) -> ElasticAddress:
eips = []
if address:
eips = self.address_by_ip([address])
@ -192,24 +215,31 @@ class ElasticAddressBackend:
raise ResourceAlreadyAssociatedError(eip.public_ip)
def describe_addresses(self, allocation_ids=None, public_ips=None, filters=None):
def describe_addresses(
self,
allocation_ids: Optional[List[str]] = None,
public_ips: Optional[List[str]] = None,
filters: Any = None,
) -> List[ElasticAddress]:
matches = self.addresses.copy()
if allocation_ids:
matches = [addr for addr in matches if addr.allocation_id in allocation_ids]
if len(allocation_ids) > len(matches):
unknown_ids = set(allocation_ids) - set(matches)
unknown_ids = set(allocation_ids) - set(matches) # type: ignore[arg-type]
raise InvalidAllocationIdError(unknown_ids)
if public_ips:
matches = [addr for addr in matches if addr.public_ip in public_ips]
if len(public_ips) > len(matches):
unknown_ips = set(public_ips) - set(matches)
unknown_ips = set(public_ips) - set(matches) # type: ignore[arg-type]
raise InvalidAddressError(unknown_ips)
if filters:
matches = generic_filter(filters, matches)
return matches
def disassociate_address(self, address=None, association_id=None):
def disassociate_address(
self, address: Optional[str] = None, association_id: Optional[str] = None
) -> None:
eips = []
if address:
eips = self.address_by_ip([address])
@ -225,9 +255,10 @@ class ElasticAddressBackend:
eip.instance = None
eip.association_id = None
return True
def release_address(self, address=None, allocation_id=None):
def release_address(
self, address: Optional[str] = None, allocation_id: Optional[str] = None
) -> None:
eips = []
if address:
eips = self.address_by_ip([address])
@ -238,4 +269,3 @@ class ElasticAddressBackend:
self.disassociate_address(address=eip.public_ip)
eip.allocation_id = None
self.addresses.remove(eip)
return True

View File

@ -1,3 +1,4 @@
from typing import Any, Dict, Optional, List, Union
from moto.core import CloudFormationModel
from ..exceptions import InvalidNetworkAttachmentIdError, InvalidNetworkInterfaceIdError
from .core import TaggedEC2Resource
@ -15,16 +16,16 @@ from ..utils import (
class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
def __init__(
self,
ec2_backend,
subnet,
private_ip_address,
private_ip_addresses=None,
device_index=0,
public_ip_auto_assign=False,
group_ids=None,
description=None,
tags=None,
**kwargs,
ec2_backend: Any,
subnet: Any,
private_ip_address: Union[List[str], str],
private_ip_addresses: Optional[List[Dict[str, Any]]] = None,
device_index: int = 0,
public_ip_auto_assign: bool = False,
group_ids: Optional[List[str]] = None,
description: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
**kwargs: Any,
):
self.ec2_backend = ec2_backend
self.id = random_eni_id()
@ -32,7 +33,7 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
if isinstance(private_ip_address, list) and private_ip_address:
private_ip_address = private_ip_address[0]
self.private_ip_address = private_ip_address or None
self.private_ip_addresses = private_ip_addresses or []
self.private_ip_addresses: List[Dict[str, Any]] = private_ip_addresses or []
self.ipv6_addresses = kwargs.get("ipv6_addresses") or []
self.subnet = subnet
@ -45,7 +46,7 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
self.description = description
self.source_dest_check = True
self.public_ip = None
self.public_ip: Optional[str] = None
self.public_ip_auto_assign = public_ip_auto_assign
self.start()
self.add_tags(tags or {})
@ -60,7 +61,7 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
association = list(self.subnet.ipv6_cidr_block_associations.values())[0]
subnet_ipv6_cidr_block = association.get("ipv6CidrBlock")
if kwargs.get("ipv6_address_count"):
while len(self.ipv6_addresses) < kwargs.get("ipv6_address_count"):
while len(self.ipv6_addresses) < kwargs["ipv6_address_count"]:
ip = random_private_ip(subnet_ipv6_cidr_block, ipv6=True)
if ip not in self.ipv6_addresses:
self.ipv6_addresses.append(ip)
@ -80,9 +81,9 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
if not self.private_ip_address:
if self.private_ip_addresses:
for ip in self.private_ip_addresses:
if isinstance(ip, dict) and ip.get("Primary"):
self.private_ip_address = ip.get("PrivateIpAddress")
for private_ip in self.private_ip_addresses:
if isinstance(private_ip, dict) and private_ip.get("Primary"):
self.private_ip_address = private_ip.get("PrivateIpAddress")
break
if not self.private_ip_addresses:
self.private_ip_address = random_private_ip(self.subnet.cidr_block)
@ -133,12 +134,12 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
self._group_set.append(group)
@property
def owner_id(self):
def owner_id(self) -> str:
return self.ec2_backend.account_id
@property
def association(self):
association = {}
def association(self) -> Dict[str, Any]: # type: ignore[misc]
association: Dict[str, Any] = {}
if self.public_ip:
eips = self.ec2_backend.address_by_ip(
[self.public_ip], fail_if_not_found=False
@ -150,18 +151,23 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
return association
@staticmethod
def cloudformation_name_type():
return None
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type():
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-networkinterface.html
return "AWS::EC2::NetworkInterface"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "NetworkInterface":
from ..models import ec2_backends
properties = cloudformation_json["Properties"]
@ -186,14 +192,14 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
)
return network_interface
def stop(self):
def stop(self) -> None:
if self.public_ip_auto_assign:
self.public_ip = None
def start(self):
def start(self) -> None:
self.check_auto_public_ip()
def check_auto_public_ip(self):
def check_auto_public_ip(self) -> None:
if (
self.public_ip_auto_assign
and str(self.public_ip_auto_assign).lower() == "true"
@ -201,17 +207,17 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
self.public_ip = random_public_ip()
@property
def group_set(self):
def group_set(self) -> Any: # type: ignore[misc]
if self.instance and self.instance.security_groups:
return set(self._group_set) | set(self.instance.security_groups)
else:
return self._group_set
@classmethod
def has_cfn_attr(cls, attr):
def has_cfn_attr(cls, attr: str) -> bool:
return attr in ["PrimaryPrivateIpAddress", "SecondaryPrivateIpAddresses"]
def get_cfn_attribute(self, attribute_name):
def get_cfn_attribute(self, attribute_name: str) -> Any:
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == "PrimaryPrivateIpAddress":
@ -223,10 +229,12 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
raise UnformattedGetAttTemplateException()
@property
def physical_resource_id(self):
def physical_resource_id(self) -> str:
return self.id
def get_filter_value(self, filter_name):
def get_filter_value(
self, filter_name: str, method_name: Optional[str] = None
) -> Any:
if filter_name == "network-interface-id":
return self.id
elif filter_name in ("addresses.private-ip-address", "private-ip-address"):
@ -242,7 +250,7 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
elif filter_name == "description":
return self.description
elif filter_name == "attachment.instance-id":
return self.instance.id if self.instance else None
return self.instance.id if self.instance else None # type: ignore[attr-defined]
elif filter_name == "attachment.instance-owner-id":
return self.owner_id
else:
@ -250,19 +258,19 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
class NetworkInterfaceBackend:
def __init__(self):
self.enis = {}
def __init__(self) -> None:
self.enis: Dict[str, NetworkInterface] = {}
def create_network_interface(
self,
subnet,
private_ip_address,
private_ip_addresses=None,
group_ids=None,
description=None,
tags=None,
**kwargs,
):
subnet: Any,
private_ip_address: Union[str, List[str]],
private_ip_addresses: Optional[List[Dict[str, Any]]] = None,
group_ids: Optional[List[str]] = None,
description: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> NetworkInterface:
eni = NetworkInterface(
self,
subnet,
@ -276,23 +284,24 @@ class NetworkInterfaceBackend:
self.enis[eni.id] = eni
return eni
def get_network_interface(self, eni_id):
def get_network_interface(self, eni_id: str) -> NetworkInterface:
for eni in self.enis.values():
if eni_id == eni.id:
return eni
raise InvalidNetworkInterfaceIdError(eni_id)
def delete_network_interface(self, eni_id):
def delete_network_interface(self, eni_id: str) -> None:
deleted = self.enis.pop(eni_id, None)
if not deleted:
raise InvalidNetworkInterfaceIdError(eni_id)
return deleted
def describe_network_interfaces(self, filters=None):
def describe_network_interfaces(
self, filters: Any = None
) -> List[NetworkInterface]:
# Note: This is only used in EC2Backend#do_resources_exist
# Client-calls use #get_all_network_interfaces()
# We should probably merge these at some point..
enis = self.enis.values()
enis = list(self.enis.values())
if filters:
for (_filter, _filter_value) in filters.items():
@ -302,33 +311,34 @@ class NetworkInterfaceBackend:
eni for eni in enis if getattr(eni, _filter) in _filter_value
]
else:
self.raise_not_implemented_error(
self.raise_not_implemented_error( # type: ignore
f"The filter '{_filter}' for DescribeNetworkInterfaces"
)
return enis
def attach_network_interface(self, eni_id, instance_id, device_index):
def attach_network_interface(
self, eni_id: str, instance_id: str, device_index: int
) -> str:
eni = self.get_network_interface(eni_id)
instance = self.get_instance(instance_id)
instance = self.get_instance(instance_id) # type: ignore[attr-defined]
return instance.attach_eni(eni, device_index)
def detach_network_interface(self, attachment_id):
found_eni = None
def detach_network_interface(self, attachment_id: str) -> None:
for eni in self.enis.values():
if eni.attachment_id == attachment_id:
found_eni = eni
break
else:
eni.instance.detach_eni(eni) # type: ignore[attr-defined]
return
raise InvalidNetworkAttachmentIdError(attachment_id)
found_eni.instance.detach_eni(found_eni)
def modify_network_interface_attribute(
self, eni_id, group_ids, source_dest_check=None, description=None
):
self,
eni_id: str,
group_ids: List[str],
source_dest_check: Optional[bool] = None,
description: Optional[str] = None,
) -> None:
eni = self.get_network_interface(eni_id)
groups = [self.get_security_group_from_id(group_id) for group_id in group_ids]
groups = [self.get_security_group_from_id(group_id) for group_id in group_ids] # type: ignore[attr-defined]
if groups:
eni._group_set = groups
if source_dest_check in [True, False]:
@ -337,8 +347,10 @@ class NetworkInterfaceBackend:
if description:
eni.description = description
def get_all_network_interfaces(self, eni_ids=None, filters=None):
enis = self.enis.copy().values()
def get_all_network_interfaces(
self, eni_ids: Optional[List[str]] = None, filters: Any = None
) -> List[NetworkInterface]:
enis = list(self.enis.values())
if eni_ids:
enis = [eni for eni in enis if eni.id in eni_ids]
@ -350,7 +362,9 @@ class NetworkInterfaceBackend:
return generic_filter(filters, enis)
def unassign_private_ip_addresses(self, eni_id=None, private_ip_address=None):
def unassign_private_ip_addresses(
self, eni_id: str, private_ip_address: Optional[List[str]] = None
) -> NetworkInterface:
eni = self.get_network_interface(eni_id)
if private_ip_address:
for item in eni.private_ip_addresses.copy():
@ -358,7 +372,9 @@ class NetworkInterfaceBackend:
eni.private_ip_addresses.remove(item)
return eni
def assign_private_ip_addresses(self, eni_id=None, secondary_ips_count=None):
def assign_private_ip_addresses(
self, eni_id: str, secondary_ips_count: Optional[int] = None
) -> NetworkInterface:
eni = self.get_network_interface(eni_id)
eni_assigned_ips = [
item.get("PrivateIpAddress") for item in eni.private_ip_addresses
@ -372,7 +388,12 @@ class NetworkInterfaceBackend:
secondary_ips_count -= 1
return eni
def assign_ipv6_addresses(self, eni_id=None, ipv6_addresses=None, ipv6_count=None):
def assign_ipv6_addresses(
self,
eni_id: str,
ipv6_addresses: Optional[List[str]] = None,
ipv6_count: Optional[int] = None,
) -> NetworkInterface:
eni = self.get_network_interface(eni_id)
if ipv6_addresses:
eni.ipv6_addresses.extend(ipv6_addresses)
@ -386,10 +407,12 @@ class NetworkInterfaceBackend:
ipv6_count -= 1
return eni
def unassign_ipv6_addresses(self, eni_id=None, ips=None):
def unassign_ipv6_addresses(
self, eni_id: str, ips: Optional[List[str]] = None
) -> NetworkInterface:
eni = self.get_network_interface(eni_id)
if ips:
for ip in eni.ipv6_addresses.copy():
if ip in ips:
eni.ipv6_addresses.remove(ip)
return eni, ips
return eni

View File

@ -2,7 +2,7 @@ import copy
import itertools
import json
from collections import defaultdict
from typing import Optional
from typing import Any, Dict, Optional
from moto.core import CloudFormationModel
from moto.core.utils import aws_api_matches
from ..exceptions import (
@ -110,13 +110,13 @@ class SecurityRule(object):
class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
def __init__(
self,
ec2_backend,
group_id,
name,
description,
vpc_id=None,
tags=None,
is_default=None,
ec2_backend: Any,
group_id: str,
name: str,
description: str,
vpc_id: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
is_default: Optional[bool] = None,
):
self.ec2_backend = ec2_backend
self.id = group_id

View File

@ -39,9 +39,9 @@ class DHCPOptions(EC2BaseResponse):
def delete_dhcp_options(self):
dhcp_opt_id = self._get_param("DhcpOptionsId")
delete_status = self.ec2_backend.delete_dhcp_options_set(dhcp_opt_id)
self.ec2_backend.delete_dhcp_options_set(dhcp_opt_id)
template = self.response_template(DELETE_DHCP_OPTIONS_RESPONSE)
return template.render(delete_status=delete_status)
return template.render(delete_status="true")
def describe_dhcp_options(self):
dhcp_opt_ids = self._get_multi_param("DhcpOptionsId")

View File

@ -133,9 +133,9 @@ class ElasticNetworkInterfaces(EC2BaseResponse):
def unassign_ipv6_addresses(self):
eni_id = self._get_param("NetworkInterfaceId")
ips = self._get_multi_param("Ipv6Addresses")
eni, unassigned_ips = self.ec2_backend.unassign_ipv6_addresses(eni_id, ips)
eni = self.ec2_backend.unassign_ipv6_addresses(eni_id, ips)
template = self.response_template(UNASSIGN_IPV6_ADDRESSES)
return template.render(eni=eni, unassigned_ips=unassigned_ips)
return template.render(eni=eni, unassigned_ips=ips)
ASSIGN_PRIVATE_IP_ADDRESSES = """<AssignPrivateIpAddressesResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">

View File

@ -100,7 +100,7 @@ def random_flow_log_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["flow-logs"])
def random_snapshot_id():
def random_snapshot_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["snapshot"])
@ -146,7 +146,7 @@ def random_customer_gateway_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["customer-gateway"])
def random_volume_id():
def random_volume_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["volume"])
@ -170,7 +170,7 @@ def random_vpc_peering_connection_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["vpc-peering-connection"])
def random_eip_association_id():
def random_eip_association_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["vpc-elastic-ip-association"])
@ -188,23 +188,23 @@ def random_route_table_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["route-table"])
def random_eip_allocation_id():
def random_eip_allocation_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["vpc-elastic-ip"])
def random_dhcp_option_id():
def random_dhcp_option_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["dhcp-options"])
def random_eni_id():
def random_eni_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["network-interface"])
def random_eni_attach_id():
def random_eni_attach_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["network-interface-attachment"])
def random_nat_gateway_id():
def random_nat_gateway_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["nat-gateway"], size=17)
@ -236,7 +236,7 @@ def random_carrier_gateway_id() -> str:
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["carrier-gateway"], size=17)
def random_public_ip():
def random_public_ip() -> str:
return f"54.214.{random.choice(range(255))}.{random.choice(range(255))}"
@ -244,7 +244,7 @@ def random_dedicated_host_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["dedicated_host"], size=17)
def random_private_ip(cidr=None, ipv6=False):
def random_private_ip(cidr: str = None, ipv6: bool = False) -> str:
# prefix - ula.prefixlen : get number of remaing length for the IP.
# prefix will be 32 for IPv4 and 128 for IPv6.
# random.getrandbits() will generate remaining bits for IPv6 or Ipv4 in decimal format
@ -259,16 +259,16 @@ def random_private_ip(cidr=None, ipv6=False):
return f"10.{random.choice(range(255))}.{random.choice(range(255))}.{random.choice(range(255))}"
def random_ip():
def random_ip() -> str:
return f"127.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}"
def generate_dns_from_ip(ip, dns_type="internal"):
def generate_dns_from_ip(ip: Any, dns_type: str = "internal") -> str:
splits = ip.split("/")[0].split(".") if "/" in ip else ip.split(".")
return f"ip-{splits[0]}-{splits[1]}-{splits[2]}-{splits[3]}.ec2.{dns_type}"
def random_mac_address():
def random_mac_address() -> str:
return f"02:00:00:{random.randint(0, 255)}02x:{random.randint(0, 255)}02x:{random.randint(0, 255)}02x"

View File

@ -38,7 +38,7 @@ class BlockDeviceType(object):
status: Optional[str] = None,
attach_time: Optional[str] = None,
delete_on_termination: bool = False,
size: Optional[str] = None,
size: Optional[int] = None,
volume_type: Optional[str] = None,
iops: Optional[str] = None,
encrypted: Optional[str] = None,

View File

@ -229,7 +229,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/moto_api
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/ec2/models/d*,moto/ec2/models/e*,moto/moto_api
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract