Techdebt: MyPy EC2 (n-models) (#5907)
This commit is contained in:
parent
bd96e42915
commit
8cc9518662
@ -126,7 +126,7 @@ class FlowLogAlreadyExists(EC2ClientError):
|
||||
|
||||
|
||||
class InvalidNetworkAclIdError(EC2ClientError):
|
||||
def __init__(self, network_acl_id):
|
||||
def __init__(self, network_acl_id: str):
|
||||
super().__init__(
|
||||
"InvalidNetworkAclID.NotFound",
|
||||
f"The network acl ID '{network_acl_id}' does not exist",
|
||||
@ -212,7 +212,7 @@ class InvalidPermissionDuplicateError(EC2ClientError):
|
||||
|
||||
|
||||
class InvalidRouteTableIdError(EC2ClientError):
|
||||
def __init__(self, route_table_id):
|
||||
def __init__(self, route_table_id: str):
|
||||
super().__init__(
|
||||
"InvalidRouteTableID.NotFound",
|
||||
f"The routeTable ID '{route_table_id}' does not exist",
|
||||
@ -572,7 +572,7 @@ class AvailabilityZoneNotFromRegionError(EC2ClientError):
|
||||
|
||||
|
||||
class NetworkAclEntryAlreadyExistsError(EC2ClientError):
|
||||
def __init__(self, rule_number):
|
||||
def __init__(self, rule_number: str):
|
||||
super().__init__(
|
||||
"NetworkAclEntryAlreadyExists",
|
||||
f"The network acl entry identified by {rule_number} already exists.",
|
||||
|
@ -205,7 +205,7 @@ class EC2Backend(
|
||||
elif resource_prefix == EC2_RESOURCE_TO_PREFIX["launch-template"]:
|
||||
self.get_launch_template(resource_id)
|
||||
elif resource_prefix == EC2_RESOURCE_TO_PREFIX["network-acl"]:
|
||||
self.get_all_network_acls()
|
||||
self.describe_network_acls()
|
||||
elif resource_prefix == EC2_RESOURCE_TO_PREFIX["network-interface"]:
|
||||
self.describe_network_interfaces(
|
||||
filters={"network-interface-id": resource_id}
|
||||
|
@ -1,4 +1,5 @@
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from moto.core import CloudFormationModel
|
||||
from moto.core.utils import iso_8601_datetime_with_milliseconds
|
||||
@ -9,17 +10,16 @@ from ..utils import random_nat_gateway_id, random_private_ip
|
||||
class NatGateway(CloudFormationModel, TaggedEC2Resource):
|
||||
def __init__(
|
||||
self,
|
||||
backend,
|
||||
subnet_id,
|
||||
allocation_id,
|
||||
tags=None,
|
||||
connectivity_type="public",
|
||||
address_set=None,
|
||||
backend: Any,
|
||||
subnet_id: str,
|
||||
allocation_id: str,
|
||||
tags: Optional[Dict[str, str]] = None,
|
||||
connectivity_type: str = "public",
|
||||
):
|
||||
# public properties
|
||||
self.id = random_nat_gateway_id()
|
||||
self.subnet_id = subnet_id
|
||||
self.address_set = address_set or []
|
||||
self.address_set: List[Dict[str, Any]] = []
|
||||
self.state = "available"
|
||||
self.private_ip = random_private_ip()
|
||||
self.connectivity_type = connectivity_type
|
||||
@ -41,36 +41,31 @@ class NatGateway(CloudFormationModel, TaggedEC2Resource):
|
||||
self.vpc_id = self.ec2_backend.get_subnet(subnet_id).vpc_id
|
||||
|
||||
@property
|
||||
def physical_resource_id(self):
|
||||
def physical_resource_id(self) -> str:
|
||||
return self.id
|
||||
|
||||
@property
|
||||
def create_time(self):
|
||||
def create_time(self) -> str:
|
||||
return iso_8601_datetime_with_milliseconds(self._created_at)
|
||||
|
||||
@property
|
||||
def network_interface_id(self):
|
||||
return self._eni.id
|
||||
|
||||
@property
|
||||
def public_ip(self):
|
||||
if self.allocation_id:
|
||||
eips = self._backend.address_by_allocation([self.allocation_id])
|
||||
return eips[0].public_ip if self.allocation_id else None
|
||||
@staticmethod
|
||||
def cloudformation_name_type() -> str:
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def cloudformation_name_type():
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def cloudformation_type():
|
||||
def cloudformation_type() -> str:
|
||||
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-natgateway.html
|
||||
return "AWS::EC2::NatGateway"
|
||||
|
||||
@classmethod
|
||||
def create_from_cloudformation_json(
|
||||
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
|
||||
):
|
||||
def create_from_cloudformation_json( # type: ignore[misc]
|
||||
cls,
|
||||
resource_name: str,
|
||||
cloudformation_json: Any,
|
||||
account_id: str,
|
||||
region_name: str,
|
||||
**kwargs: Any
|
||||
) -> "NatGateway":
|
||||
from ..models import ec2_backends
|
||||
|
||||
ec2_backend = ec2_backends[account_id][region_name]
|
||||
@ -82,10 +77,12 @@ class NatGateway(CloudFormationModel, TaggedEC2Resource):
|
||||
|
||||
|
||||
class NatGatewayBackend:
|
||||
def __init__(self):
|
||||
self.nat_gateways = {}
|
||||
def __init__(self) -> None:
|
||||
self.nat_gateways: Dict[str, NatGateway] = {}
|
||||
|
||||
def describe_nat_gateways(self, filters, nat_gateway_ids):
|
||||
def describe_nat_gateways(
|
||||
self, filters: Any, nat_gateway_ids: Optional[List[str]]
|
||||
) -> List[NatGateway]:
|
||||
nat_gateways = list(self.nat_gateways.values())
|
||||
|
||||
if nat_gateway_ids:
|
||||
@ -120,14 +117,18 @@ class NatGatewayBackend:
|
||||
return nat_gateways
|
||||
|
||||
def create_nat_gateway(
|
||||
self, subnet_id, allocation_id, tags=None, connectivity_type="public"
|
||||
):
|
||||
self,
|
||||
subnet_id: str,
|
||||
allocation_id: str,
|
||||
tags: Optional[Dict[str, str]] = None,
|
||||
connectivity_type: str = "public",
|
||||
) -> NatGateway:
|
||||
nat_gateway = NatGateway(
|
||||
self, subnet_id, allocation_id, tags, connectivity_type
|
||||
)
|
||||
address_set = {}
|
||||
address_set: Dict[str, Any] = {}
|
||||
if allocation_id:
|
||||
eips = self.address_by_allocation([allocation_id])
|
||||
eips = self.address_by_allocation([allocation_id]) # type: ignore[attr-defined]
|
||||
eip = eips[0] if len(eips) > 0 else None
|
||||
if eip:
|
||||
address_set["allocationId"] = allocation_id
|
||||
@ -138,7 +139,7 @@ class NatGatewayBackend:
|
||||
self.nat_gateways[nat_gateway.id] = nat_gateway
|
||||
return nat_gateway
|
||||
|
||||
def delete_nat_gateway(self, nat_gateway_id):
|
||||
nat_gw = self.nat_gateways.get(nat_gateway_id)
|
||||
def delete_nat_gateway(self, nat_gateway_id: str) -> NatGateway:
|
||||
nat_gw: NatGateway = self.nat_gateways.get(nat_gateway_id) # type: ignore
|
||||
nat_gw.state = "deleted"
|
||||
return nat_gw
|
||||
|
@ -1,3 +1,5 @@
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from ..exceptions import (
|
||||
InvalidNetworkAclIdError,
|
||||
InvalidRouteTableIdError,
|
||||
@ -12,27 +14,32 @@ from ..utils import (
|
||||
|
||||
|
||||
class NetworkAclBackend:
|
||||
def __init__(self):
|
||||
self.network_acls = {}
|
||||
def __init__(self) -> None:
|
||||
self.network_acls: Dict[str, "NetworkAcl"] = {}
|
||||
|
||||
def get_network_acl(self, network_acl_id):
|
||||
def get_network_acl(self, network_acl_id: str) -> "NetworkAcl":
|
||||
network_acl = self.network_acls.get(network_acl_id, None)
|
||||
if not network_acl:
|
||||
raise InvalidNetworkAclIdError(network_acl_id)
|
||||
return network_acl
|
||||
|
||||
def create_network_acl(self, vpc_id, tags=None, default=False):
|
||||
def create_network_acl(
|
||||
self,
|
||||
vpc_id: str,
|
||||
tags: Optional[List[Dict[str, str]]] = None,
|
||||
default: bool = False,
|
||||
) -> "NetworkAcl":
|
||||
network_acl_id = random_network_acl_id()
|
||||
self.get_vpc(vpc_id)
|
||||
self.get_vpc(vpc_id) # type: ignore[attr-defined]
|
||||
network_acl = NetworkAcl(self, network_acl_id, vpc_id, default)
|
||||
for tag in tags or []:
|
||||
network_acl.add_tag(tag.get("Key"), tag.get("Value"))
|
||||
network_acl.add_tag(tag["Key"], tag["Value"])
|
||||
self.network_acls[network_acl_id] = network_acl
|
||||
if default:
|
||||
self.add_default_entries(network_acl_id)
|
||||
return network_acl
|
||||
|
||||
def add_default_entries(self, network_acl_id):
|
||||
def add_default_entries(self, network_acl_id: str) -> None:
|
||||
default_acl_entries = [
|
||||
{"rule_number": "100", "rule_action": "allow", "egress": "true"},
|
||||
{"rule_number": "32767", "rule_action": "deny", "egress": "true"},
|
||||
@ -53,10 +60,7 @@ class NetworkAclBackend:
|
||||
port_range_to=None,
|
||||
)
|
||||
|
||||
def get_all_network_acls(self, network_acl_ids=None, filters=None):
|
||||
self.describe_network_acls(network_acl_ids, filters)
|
||||
|
||||
def delete_network_acl(self, network_acl_id):
|
||||
def delete_network_acl(self, network_acl_id: str) -> "NetworkAcl":
|
||||
deleted = self.network_acls.pop(network_acl_id, None)
|
||||
if not deleted:
|
||||
raise InvalidNetworkAclIdError(network_acl_id)
|
||||
@ -64,17 +68,17 @@ class NetworkAclBackend:
|
||||
|
||||
def create_network_acl_entry(
|
||||
self,
|
||||
network_acl_id,
|
||||
rule_number,
|
||||
protocol,
|
||||
rule_action,
|
||||
egress,
|
||||
cidr_block,
|
||||
icmp_code,
|
||||
icmp_type,
|
||||
port_range_from,
|
||||
port_range_to,
|
||||
):
|
||||
network_acl_id: str,
|
||||
rule_number: str,
|
||||
protocol: str,
|
||||
rule_action: str,
|
||||
egress: str,
|
||||
cidr_block: str,
|
||||
icmp_code: Optional[int],
|
||||
icmp_type: Optional[int],
|
||||
port_range_from: Optional[int],
|
||||
port_range_to: Optional[int],
|
||||
) -> "NetworkAclEntry":
|
||||
|
||||
network_acl = self.get_network_acl(network_acl_id)
|
||||
if any(
|
||||
@ -99,7 +103,9 @@ class NetworkAclBackend:
|
||||
network_acl.network_acl_entries.append(network_acl_entry)
|
||||
return network_acl_entry
|
||||
|
||||
def delete_network_acl_entry(self, network_acl_id, rule_number, egress):
|
||||
def delete_network_acl_entry(
|
||||
self, network_acl_id: str, rule_number: str, egress: str
|
||||
) -> "NetworkAclEntry":
|
||||
network_acl = self.get_network_acl(network_acl_id)
|
||||
entry = next(
|
||||
entry
|
||||
@ -112,17 +118,17 @@ class NetworkAclBackend:
|
||||
|
||||
def replace_network_acl_entry(
|
||||
self,
|
||||
network_acl_id,
|
||||
rule_number,
|
||||
protocol,
|
||||
rule_action,
|
||||
egress,
|
||||
cidr_block,
|
||||
icmp_code,
|
||||
icmp_type,
|
||||
port_range_from,
|
||||
port_range_to,
|
||||
):
|
||||
network_acl_id: str,
|
||||
rule_number: str,
|
||||
protocol: str,
|
||||
rule_action: str,
|
||||
egress: str,
|
||||
cidr_block: str,
|
||||
icmp_code: int,
|
||||
icmp_type: int,
|
||||
port_range_from: int,
|
||||
port_range_to: int,
|
||||
) -> "NetworkAclEntry":
|
||||
|
||||
self.delete_network_acl_entry(network_acl_id, rule_number, egress)
|
||||
network_acl_entry = self.create_network_acl_entry(
|
||||
@ -139,7 +145,9 @@ class NetworkAclBackend:
|
||||
)
|
||||
return network_acl_entry
|
||||
|
||||
def replace_network_acl_association(self, association_id, network_acl_id):
|
||||
def replace_network_acl_association(
|
||||
self, association_id: str, network_acl_id: str
|
||||
) -> "NetworkAclAssociation":
|
||||
|
||||
# lookup existing association for subnet and delete it
|
||||
default_acl = next(
|
||||
@ -163,7 +171,9 @@ class NetworkAclBackend:
|
||||
new_acl.associations[new_assoc_id] = association
|
||||
return association
|
||||
|
||||
def associate_default_network_acl_with_subnet(self, subnet_id, vpc_id):
|
||||
def associate_default_network_acl_with_subnet(
|
||||
self, subnet_id: str, vpc_id: str
|
||||
) -> None:
|
||||
association_id = random_network_acl_subnet_association_id()
|
||||
acl = next(
|
||||
acl
|
||||
@ -174,8 +184,10 @@ class NetworkAclBackend:
|
||||
self, association_id, subnet_id, acl.id
|
||||
)
|
||||
|
||||
def describe_network_acls(self, network_acl_ids=None, filters=None):
|
||||
network_acls = self.network_acls.copy().values()
|
||||
def describe_network_acls(
|
||||
self, network_acl_ids: Optional[List[str]] = None, filters: Any = None
|
||||
) -> List["NetworkAcl"]:
|
||||
network_acls = list(self.network_acls.values())
|
||||
|
||||
if network_acl_ids:
|
||||
network_acls = [
|
||||
@ -194,29 +206,41 @@ class NetworkAclBackend:
|
||||
return generic_filter(filters, network_acls)
|
||||
|
||||
|
||||
class NetworkAclAssociation(object):
|
||||
def __init__(self, ec2_backend, new_association_id, subnet_id, network_acl_id):
|
||||
class NetworkAclAssociation:
|
||||
def __init__(
|
||||
self,
|
||||
ec2_backend: Any,
|
||||
new_association_id: str,
|
||||
subnet_id: Optional[str],
|
||||
network_acl_id: str,
|
||||
):
|
||||
self.ec2_backend = ec2_backend
|
||||
self.id = new_association_id
|
||||
self.new_association_id = new_association_id
|
||||
self.subnet_id = subnet_id
|
||||
self.network_acl_id = network_acl_id
|
||||
super().__init__()
|
||||
|
||||
|
||||
class NetworkAcl(TaggedEC2Resource):
|
||||
def __init__(
|
||||
self, ec2_backend, network_acl_id, vpc_id, default=False, owner_id=None
|
||||
self,
|
||||
ec2_backend: Any,
|
||||
network_acl_id: str,
|
||||
vpc_id: str,
|
||||
default: bool = False,
|
||||
owner_id: Optional[str] = None,
|
||||
):
|
||||
self.ec2_backend = ec2_backend
|
||||
self.id = network_acl_id
|
||||
self.vpc_id = vpc_id
|
||||
self.owner_id = owner_id or ec2_backend.account_id
|
||||
self.network_acl_entries = []
|
||||
self.associations = {}
|
||||
self.network_acl_entries: List[NetworkAclEntry] = []
|
||||
self.associations: Dict[str, NetworkAclAssociation] = {}
|
||||
self.default = "true" if default is True else "false"
|
||||
|
||||
def get_filter_value(self, filter_name):
|
||||
def get_filter_value(
|
||||
self, filter_name: str, method_name: Optional[str] = None
|
||||
) -> Any:
|
||||
if filter_name == "default":
|
||||
return self.default
|
||||
elif filter_name == "vpc-id":
|
||||
@ -244,17 +268,17 @@ class NetworkAcl(TaggedEC2Resource):
|
||||
class NetworkAclEntry(TaggedEC2Resource):
|
||||
def __init__(
|
||||
self,
|
||||
ec2_backend,
|
||||
network_acl_id,
|
||||
rule_number,
|
||||
protocol,
|
||||
rule_action,
|
||||
egress,
|
||||
cidr_block,
|
||||
icmp_code,
|
||||
icmp_type,
|
||||
port_range_from,
|
||||
port_range_to,
|
||||
ec2_backend: Any,
|
||||
network_acl_id: str,
|
||||
rule_number: str,
|
||||
protocol: str,
|
||||
rule_action: str,
|
||||
egress: str,
|
||||
cidr_block: str,
|
||||
icmp_code: Optional[int],
|
||||
icmp_type: Optional[int],
|
||||
port_range_from: Optional[int],
|
||||
port_range_to: Optional[int],
|
||||
):
|
||||
self.ec2_backend = ec2_backend
|
||||
self.network_acl_id = network_acl_id
|
||||
|
@ -126,11 +126,11 @@ def random_subnet_association_id():
|
||||
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["route-table-association"])
|
||||
|
||||
|
||||
def random_network_acl_id():
|
||||
def random_network_acl_id() -> str:
|
||||
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["network-acl"])
|
||||
|
||||
|
||||
def random_network_acl_subnet_association_id():
|
||||
def random_network_acl_subnet_association_id() -> str:
|
||||
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["network-acl-subnet-assoc"])
|
||||
|
||||
|
||||
|
@ -229,7 +229,7 @@ disable = W,C,R,E
|
||||
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
|
||||
|
||||
[mypy]
|
||||
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/ec2/models/d*,moto/ec2/models/e*,moto/ec2/models/f*,moto/ec2/models/h*,moto/ec2/models/i*,moto/ec2/models/k*,moto/ec2/models/l*,moto/ec2/models/m*,moto/moto_api
|
||||
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2/models/a*,moto/ec2/models/c*,moto/ec2/models/d*,moto/ec2/models/e*,moto/ec2/models/f*,moto/ec2/models/h*,moto/ec2/models/i*,moto/ec2/models/k*,moto/ec2/models/l*,moto/ec2/models/m*,moto/ec2/models/n*,moto/moto_api
|
||||
show_column_numbers=True
|
||||
show_error_codes = True
|
||||
disable_error_code=abstract
|
||||
|
Loading…
Reference in New Issue
Block a user