moto/moto/ec2/models/subnets.py
2023-02-12 16:53:36 -01:00

460 lines
16 KiB
Python

import ipaddress
import itertools
from collections import defaultdict
from typing import Any, Dict, Iterable, List, Optional, Tuple, TYPE_CHECKING, Set
from moto.core import CloudFormationModel
if TYPE_CHECKING:
from moto.ec2.models.instances import Instance
from moto.ec2.models.availability_zones_and_regions import Zone
from ..exceptions import (
GenericInvalidParameterValueError,
InvalidAvailabilityZoneError,
InvalidCIDRBlockParameterError,
InvalidParameterValueError,
InvalidSubnetConflictError,
InvalidSubnetIdError,
InvalidSubnetRangeError,
InvalidSubnetCidrBlockAssociationID,
)
from .core import TaggedEC2Resource
from .availability_zones_and_regions import RegionsAndZonesBackend
from ..utils import (
random_subnet_id,
generic_filter,
random_subnet_ipv6_cidr_block_association_id,
)
class Subnet(TaggedEC2Resource, CloudFormationModel):
def __init__(
self,
ec2_backend: Any,
subnet_id: str,
vpc_id: str,
cidr_block: str,
ipv6_cidr_block: Optional[str],
availability_zone: Zone,
default_for_az: str,
map_public_ip_on_launch: str,
assign_ipv6_address_on_creation: bool = False,
):
self.ec2_backend = ec2_backend
self.id = subnet_id
self.vpc_id = vpc_id
self.cidr_block = cidr_block
self.cidr = ipaddress.IPv4Network(str(self.cidr_block), strict=False)
self._available_ip_addresses = self.cidr.num_addresses - 5
self._availability_zone = availability_zone
self.default_for_az = default_for_az
self.map_public_ip_on_launch = map_public_ip_on_launch
self.assign_ipv6_address_on_creation = assign_ipv6_address_on_creation
self.ipv6_cidr_block_associations: Dict[str, Dict[str, Any]] = {}
if ipv6_cidr_block:
self.attach_ipv6_cidr_block_associations(ipv6_cidr_block)
# Theory is we assign ip's as we go (as 16,777,214 usable IPs in a /8)
self._subnet_ip_generator = self.cidr.hosts()
self.reserved_ips = [
next(self._subnet_ip_generator) for _ in range(0, 3)
] # Reserved by AWS
self._unused_ips: Set[
str
] = set() # if instance is destroyed hold IP here for reuse
self._subnet_ips: Dict[str, "Instance"] = {}
self.state = "available"
# Placeholder for response templates until Ipv6 support implemented.
self.ipv6_native = False
@property
def owner_id(self) -> str:
return self.ec2_backend.account_id
@staticmethod
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-subnet.html
return "AWS::EC2::Subnet"
@classmethod
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "Subnet":
from ..models import ec2_backends
properties = cloudformation_json["Properties"]
vpc_id = properties["VpcId"]
cidr_block = properties["CidrBlock"]
availability_zone = properties.get("AvailabilityZone")
ec2_backend = ec2_backends[account_id][region_name]
subnet = ec2_backend.create_subnet(
vpc_id=vpc_id, cidr_block=cidr_block, availability_zone=availability_zone
)
for tag in properties.get("Tags", []):
tag_key = tag["Key"]
tag_value = tag["Value"]
subnet.add_tag(tag_key, tag_value)
return subnet
@property
def available_ip_addresses(self) -> str:
enis = [
eni
for eni in self.ec2_backend.get_all_network_interfaces()
if eni.subnet.id == self.id
]
addresses_taken = []
for eni in enis:
if eni.private_ip_addresses:
addresses_taken.extend(eni.private_ip_addresses)
return str(self._available_ip_addresses - len(addresses_taken))
@property
def availability_zone(self) -> str:
return self._availability_zone.name
@property
def availability_zone_id(self) -> str:
return self._availability_zone.zone_id
@property
def physical_resource_id(self) -> str:
return self.id
def get_filter_value(
self, filter_name: str, method_name: Optional[str] = None
) -> Any:
"""
API Version 2014-10-01 defines the following filters for DescribeSubnets:
* availabilityZone
* available-ip-address-count
* cidrBlock
* defaultForAz
* state
* subnet-id
* tag:key=value
* tag-key
* tag-value
* vpc-id
Taken from: http://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html
"""
if filter_name in ("cidr", "cidrBlock", "cidr-block"):
return self.cidr_block
elif filter_name in ("vpc-id", "vpcId"):
return self.vpc_id
elif filter_name == "subnet-id":
return self.id
elif filter_name in ("availabilityZone", "availability-zone"):
return self.availability_zone
elif filter_name in ("defaultForAz", "default-for-az"):
return self.default_for_az
elif filter_name == "state":
return self.state
else:
return super().get_filter_value(filter_name, "DescribeSubnets")
@classmethod
def has_cfn_attr(cls, attr: str) -> bool:
return attr in ["AvailabilityZone"]
def get_cfn_attribute(self, attribute_name: str) -> None:
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == "AvailabilityZone":
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "AvailabilityZone" ]"')
raise UnformattedGetAttTemplateException()
def get_available_subnet_ip(self, instance: "Instance") -> str:
try:
new_ip = str(self._unused_ips.pop())
except KeyError:
new_ip_v4 = next(self._subnet_ip_generator)
# Skips any IP's if they've been manually specified
while str(new_ip_v4) in self._subnet_ips:
new_ip_v4 = next(self._subnet_ip_generator)
if new_ip_v4 == self.cidr.broadcast_address:
raise StopIteration() # Broadcast address cant be used obviously
new_ip = str(new_ip_v4)
# TODO StopIteration will be raised if no ip's available, not sure how aws handles this.
self._subnet_ips[new_ip] = instance
return new_ip
def request_ip(self, ip: str, instance: "Instance") -> None:
if ipaddress.ip_address(ip) not in self.cidr:
raise Exception(f"IP does not fall in the subnet CIDR of {self.cidr}")
if ip in self._subnet_ips:
raise Exception("IP already in use")
try:
self._unused_ips.remove(ip)
except KeyError:
pass
self._subnet_ips[ip] = instance
def del_subnet_ip(self, ip: str) -> None:
try:
del self._subnet_ips[ip]
self._unused_ips.add(ip)
except KeyError:
pass # Unknown IP
def attach_ipv6_cidr_block_associations(
self, ipv6_cidr_block: str
) -> Dict[str, str]:
association = {
"associationId": random_subnet_ipv6_cidr_block_association_id(),
"ipv6CidrBlock": ipv6_cidr_block,
"ipv6CidrBlockState": "associated",
}
self.ipv6_cidr_block_associations[association["associationId"]] = association
return association
def detach_subnet_cidr_block(self, association_id: str) -> Dict[str, Any]:
association = self.ipv6_cidr_block_associations.get(association_id)
association["ipv6CidrBlockState"] = "disassociated" # type: ignore[index]
return association # type: ignore[return-value]
class SubnetRouteTableAssociation(CloudFormationModel):
def __init__(self, route_table_id: str, subnet_id: str):
self.route_table_id = route_table_id
self.subnet_id = subnet_id
@property
def physical_resource_id(self) -> str:
return self.route_table_id
@staticmethod
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-subnetroutetableassociation.html
return "AWS::EC2::SubnetRouteTableAssociation"
@classmethod
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "SubnetRouteTableAssociation":
from ..models import ec2_backends
properties = cloudformation_json["Properties"]
route_table_id = properties["RouteTableId"]
subnet_id = properties["SubnetId"]
ec2_backend = ec2_backends[account_id][region_name]
subnet_association = ec2_backend.create_subnet_association(
route_table_id=route_table_id, subnet_id=subnet_id
)
return subnet_association
class SubnetBackend:
def __init__(self) -> None:
# maps availability zone to dict of (subnet_id, subnet)
self.subnets: Dict[str, Dict[str, Subnet]] = defaultdict(dict)
self.subnet_associations: Dict[str, SubnetRouteTableAssociation] = {}
def get_subnet(self, subnet_id: str) -> Subnet:
for subnets_per_zone in self.subnets.values():
if subnet_id in subnets_per_zone:
return subnets_per_zone[subnet_id]
raise InvalidSubnetIdError(subnet_id)
def get_default_subnet(self, availability_zone: str) -> Subnet:
return [
subnet
for subnet in self.describe_subnets(
filters={"availabilityZone": availability_zone}
)
if subnet.default_for_az
][0]
def create_subnet(
self,
vpc_id: str,
cidr_block: str,
ipv6_cidr_block: Optional[str] = None,
availability_zone: Optional[str] = None,
availability_zone_id: Optional[str] = None,
tags: Optional[List[Dict[str, str]]] = None,
) -> Subnet:
subnet_id = random_subnet_id()
# Validate VPC exists and the supplied CIDR block is a subnet of the VPC's
vpc = self.get_vpc(vpc_id) # type: ignore[attr-defined]
vpc_cidr_blocks = [
ipaddress.IPv4Network(
str(cidr_block_association["cidr_block"]), strict=False
)
for cidr_block_association in vpc.get_cidr_block_association_set()
]
try:
subnet_cidr_block = ipaddress.IPv4Network(str(cidr_block), strict=False)
except ValueError:
raise InvalidCIDRBlockParameterError(cidr_block)
subnet_in_vpc_cidr_range = False
for vpc_cidr_block in vpc_cidr_blocks:
if (
vpc_cidr_block.network_address <= subnet_cidr_block.network_address
and vpc_cidr_block.broadcast_address
>= subnet_cidr_block.broadcast_address
):
subnet_in_vpc_cidr_range = True
break
if not subnet_in_vpc_cidr_range:
raise InvalidSubnetRangeError(cidr_block)
# The subnet size must use a /64 prefix length.
if ipv6_cidr_block and "::/64" not in ipv6_cidr_block:
raise GenericInvalidParameterValueError("ipv6-cidr-block", ipv6_cidr_block)
for subnet in self.describe_subnets(filters={"vpc-id": vpc_id}):
if subnet.cidr.overlaps(subnet_cidr_block):
raise InvalidSubnetConflictError(cidr_block)
# if this is the first subnet for an availability zone,
# consider it the default
default_for_az = str(availability_zone not in self.subnets).lower()
map_public_ip_on_launch = default_for_az
if availability_zone is None and not availability_zone_id:
availability_zone = "us-east-1a"
try:
if availability_zone:
availability_zone_data = next(
zone
for zones in RegionsAndZonesBackend.zones.values()
for zone in zones
if zone.name == availability_zone
)
elif availability_zone_id:
availability_zone_data = next(
zone
for zones in RegionsAndZonesBackend.zones.values()
for zone in zones
if zone.zone_id == availability_zone_id
)
except StopIteration:
raise InvalidAvailabilityZoneError(
availability_zone,
", ".join(
[
zone.name
for zones in RegionsAndZonesBackend.zones.values()
for zone in zones
]
),
)
subnet = Subnet(
self,
subnet_id,
vpc_id,
cidr_block,
ipv6_cidr_block,
availability_zone_data,
default_for_az,
map_public_ip_on_launch,
assign_ipv6_address_on_creation=False,
)
for tag in tags or []:
subnet.add_tag(tag["Key"], tag["Value"])
# AWS associates a new subnet with the default Network ACL
self.associate_default_network_acl_with_subnet(subnet_id, vpc_id) # type: ignore[attr-defined]
self.subnets[availability_zone][subnet_id] = subnet # type: ignore[index]
return subnet
def describe_subnets(
self, subnet_ids: Optional[List[str]] = None, filters: Optional[Any] = None
) -> Iterable[Subnet]:
# Extract a list of all subnets
matches = list(
itertools.chain(*[x.copy().values() for x in self.subnets.copy().values()])
)
if subnet_ids:
matches = [sn for sn in matches if sn.id in subnet_ids]
if len(subnet_ids) > len(matches):
unknown_ids = set(subnet_ids) - set(matches) # type: ignore[arg-type]
raise InvalidSubnetIdError(list(unknown_ids)[0])
if filters:
matches = generic_filter(filters, matches)
return matches
def delete_subnet(self, subnet_id: str) -> Subnet:
for subnets in self.subnets.values():
if subnet_id in subnets:
return subnets.pop(subnet_id)
raise InvalidSubnetIdError(subnet_id)
def modify_subnet_attribute(
self, subnet_id: str, attr_name: str, attr_value: str
) -> None:
subnet = self.get_subnet(subnet_id)
if attr_name in ("map_public_ip_on_launch", "assign_ipv6_address_on_creation"):
setattr(subnet, attr_name, attr_value)
else:
raise InvalidParameterValueError(attr_name)
def get_subnet_from_ipv6_association(self, association_id: str) -> Optional[Subnet]:
subnet = None
for s in self.describe_subnets():
if association_id in s.ipv6_cidr_block_associations:
subnet = s
return subnet
def associate_subnet_cidr_block(
self, subnet_id: str, ipv6_cidr_block: str
) -> Dict[str, str]:
subnet = self.get_subnet(subnet_id)
if not subnet:
raise InvalidSubnetIdError(subnet_id)
association = subnet.attach_ipv6_cidr_block_associations(ipv6_cidr_block)
return association
def disassociate_subnet_cidr_block(
self, association_id: str
) -> Tuple[str, Dict[str, str]]:
subnet = self.get_subnet_from_ipv6_association(association_id)
if not subnet:
raise InvalidSubnetCidrBlockAssociationID(association_id)
association = subnet.detach_subnet_cidr_block(association_id)
return subnet.id, association
def create_subnet_association(
self, route_table_id: str, subnet_id: str
) -> SubnetRouteTableAssociation:
subnet_association = SubnetRouteTableAssociation(route_table_id, subnet_id)
self.subnet_associations[f"{route_table_id}:{subnet_id}"] = subnet_association
return subnet_association