diff --git a/moto/ec2/models.py b/moto/ec2/models.py
index 852175eaf..f80a02e03 100644
--- a/moto/ec2/models.py
+++ b/moto/ec2/models.py
@@ -135,6 +135,7 @@ from .utils import (
random_internet_gateway_id,
random_egress_only_internet_gateway_id,
random_ip,
+ generate_dns_from_ip,
random_mac_address,
random_ipv6_cidr,
random_transit_gateway_attachment_id,
@@ -287,13 +288,20 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
group_ids=None,
description=None,
tags=None,
+ **kwargs,
):
self.ec2_backend = ec2_backend
self.id = random_eni_id()
self.device_index = device_index
+ if isinstance(private_ip_address, list) and private_ip_address:
+ private_ip_address = private_ip_address[0]
self.private_ip_address = private_ip_address or None
self.private_ip_addresses = private_ip_addresses or []
+ self.ipv6_addresses = kwargs.get("ipv6_addresses") or []
+
self.subnet = subnet
+ if isinstance(subnet, str):
+ self.subnet = self.ec2_backend.get_subnet(subnet)
self.instance = None
self.attachment_id = None
self.description = description
@@ -311,26 +319,63 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
# returns groups for both self and the attached instance.
self._group_set = []
+ if self.subnet.ipv6_cidr_block_associations:
+ association = list(self.subnet.ipv6_cidr_block_associations.values())[0]
+ subnet_ipv6_cidr_block = association.get("ipv6CidrBlock")
+ if kwargs.get("ipv6_address_count"):
+ while len(self.ipv6_addresses) < kwargs.get("ipv6_address_count"):
+ ip = random_private_ip(subnet_ipv6_cidr_block, ipv6=True)
+ if ip not in self.ipv6_addresses:
+ self.ipv6_addresses.append(ip)
+
+ if self.private_ip_addresses:
+ primary_selected = True if private_ip_address else False
+ for item in self.private_ip_addresses.copy():
+ if isinstance(item, str):
+ self.private_ip_addresses.remove(item)
+ self.private_ip_addresses.append(
+ {
+ "Primary": True if not primary_selected else False,
+ "PrivateIpAddress": item,
+ }
+ )
+ primary_selected = True
+
if not self.private_ip_address:
if self.private_ip_addresses:
for ip in self.private_ip_addresses:
- if isinstance(ip, list) and ip.get("Primary", False) in [
- "true",
- True,
- "True",
- ]:
+ if isinstance(ip, dict) and ip.get("Primary"):
self.private_ip_address = ip.get("PrivateIpAddress")
- if isinstance(ip, str):
- self.private_ip_address = self.private_ip_addresses[0]
break
- else:
- self.private_ip_address = random_private_ip()
+ if not self.private_ip_addresses:
+ self.private_ip_address = random_private_ip(self.subnet.cidr_block)
- if not self.private_ip_addresses and self.private_ip_address:
+ if not self.private_ip_addresses:
self.private_ip_addresses.append(
{"Primary": True, "PrivateIpAddress": self.private_ip_address}
)
+ secondary_ips = kwargs.get("secondary_ips_count", None)
+ if secondary_ips:
+ ips = [
+ random_private_ip(self.subnet.cidr_block)
+ for index in range(0, int(secondary_ips))
+ ]
+ if ips:
+ self.private_ip_addresses.extend(
+ [{"Primary": False, "PrivateIpAddress": ip} for ip in ips]
+ )
+
+ if self.subnet:
+ vpc = self.ec2_backend.get_vpc(self.subnet.vpc_id)
+ if vpc and vpc.enable_dns_hostnames:
+ self.private_dns_name = generate_dns_from_ip(
+ self.private_ip_address, type="internal"
+ )
+ for address in self.private_ip_addresses:
+ if address.get("Primary", None):
+ address["PrivateDnsName"] = self.private_dns_name
+
group = None
if group_ids:
for group_id in group_ids:
@@ -347,6 +392,10 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
self.ec2_backend.groups[subnet.vpc_id][group_id] = group
if group:
self._group_set.append(group)
+ if not group_ids:
+ group = self.ec2_backend.get_default_security_group(vpc.id)
+ if group:
+ self._group_set.append(group)
@property
def owner_id(self):
@@ -536,14 +585,18 @@ class NetworkInterfaceBackend(object):
found_eni.instance.detach_eni(found_eni)
def modify_network_interface_attribute(
- self, eni_id, group_ids, source_dest_check=None
+ self, eni_id, group_ids, source_dest_check=None, description=None
):
eni = self.get_network_interface(eni_id)
groups = [self.get_security_group_from_id(group_id) for group_id in group_ids]
- eni._group_set = groups
- if source_dest_check:
+ if groups:
+ eni._group_set = groups
+ if source_dest_check in [True, False]:
eni.source_dest_check = source_dest_check
+ if description:
+ eni.description = description
+
def get_all_network_interfaces(self, eni_ids=None, filters=None):
enis = self.enis.copy().values()
@@ -557,6 +610,50 @@ class NetworkInterfaceBackend(object):
return generic_filter(filters, enis)
+ def unassign_private_ip_addresses(self, eni_id=None, private_ip_address=None):
+ eni = self.get_network_interface(eni_id)
+ if private_ip_address:
+ for item in eni.private_ip_addresses.copy():
+ if item.get("PrivateIpAddress") in private_ip_address:
+ eni.private_ip_addresses.remove(item)
+ return eni
+
+ def assign_private_ip_addresses(self, eni_id=None, secondary_ips_count=None):
+ eni = self.get_network_interface(eni_id)
+ eni_assigned_ips = [
+ item.get("PrivateIpAddress") for item in eni.private_ip_addresses
+ ]
+ while secondary_ips_count:
+ ip = random_private_ip(eni.subnet.cidr_block)
+ if ip not in eni_assigned_ips:
+ eni.private_ip_addresses.append(
+ {"Primary": False, "PrivateIpAddress": ip}
+ )
+ secondary_ips_count -= 1
+ return eni
+
+ def assign_ipv6_addresses(self, eni_id=None, ipv6_addresses=None, ipv6_count=None):
+ eni = self.get_network_interface(eni_id)
+ if ipv6_addresses:
+ eni.ipv6_addresses.extend(ipv6_addresses)
+
+ while ipv6_count:
+ association = list(eni.subnet.ipv6_cidr_block_associations.values())[0]
+ subnet_ipv6_cidr_block = association.get("ipv6CidrBlock")
+ ip = random_private_ip(subnet_ipv6_cidr_block, ipv6=True)
+ if ip not in eni.ipv6_addresses:
+ eni.ipv6_addresses.append(ip)
+ ipv6_count -= 1
+ return eni
+
+ def unassign_ipv6_addresses(self, eni_id=None, ips=None):
+ eni = self.get_network_interface(eni_id)
+ if ips:
+ for ip in eni.ipv6_addresses.copy():
+ if ip in ips:
+ eni.ipv6_addresses.remove(ip)
+ return eni, ips
+
class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
VALID_ATTRIBUTES = {
@@ -666,6 +763,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
kwargs.get("nics", {}),
private_ip=kwargs.get("private_ip"),
associate_public_ip=self.associate_public_ip,
+ security_groups=self.security_groups,
)
@property
@@ -901,16 +999,11 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
@property
def dynamic_group_list(self):
- if self.nics:
- groups = []
- for nic in self.nics.values():
- for group in nic.group_set:
- groups.append(group)
- return groups
- else:
- return self.security_groups
+ return self.security_groups
- def prep_nics(self, nic_spec, private_ip=None, associate_public_ip=None):
+ def prep_nics(
+ self, nic_spec, private_ip=None, associate_public_ip=None, security_groups=None
+ ):
self.nics = {}
if self.subnet_id:
@@ -969,6 +1062,8 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
group_id = nic.get("SecurityGroupId")
group_ids = [group_id] if group_id else []
+ if security_groups:
+ group_ids.extend([group.id for group in security_groups])
use_nic = self.ec2_backend.create_network_interface(
subnet,
@@ -1056,10 +1151,13 @@ class InstanceBackend(object):
security_groups = [
self.get_security_group_by_name_or_id(name) for name in security_group_names
]
- security_groups.extend(
- self.get_security_group_from_id(sg_id)
- for sg_id in kwargs.pop("security_group_ids", [])
- )
+
+ for sg_id in kwargs.pop("security_group_ids", []):
+ if isinstance(sg_id, str):
+ security_groups.append(self.get_security_group_from_id(sg_id))
+ else:
+ security_groups.append(sg_id)
+
self.reservations[new_reservation.id] = new_reservation
tags = kwargs.pop("tags", {})
@@ -2636,6 +2734,11 @@ class SecurityGroupBackend(object):
group = self.get_security_group_from_name(group_name_or_id, vpc_id)
return group
+ def get_default_security_group(self, vpc_id=None):
+ for group_id, group in self.groups[vpc_id or self.default_vpc.id].items():
+ if group.is_default:
+ return group
+
def authorize_security_group_ingress(
self,
group_name_or_id,
diff --git a/moto/ec2/responses/elastic_network_interfaces.py b/moto/ec2/responses/elastic_network_interfaces.py
index 0f1c61aa1..7f88b534b 100644
--- a/moto/ec2/responses/elastic_network_interfaces.py
+++ b/moto/ec2/responses/elastic_network_interfaces.py
@@ -1,5 +1,9 @@
from moto.core.responses import BaseResponse
-from moto.ec2.utils import filters_from_querystring, add_tag_specification
+from moto.ec2.utils import (
+ filters_from_querystring,
+ get_attribute_value,
+ add_tag_specification,
+)
class ElasticNetworkInterfaces(BaseResponse):
@@ -7,6 +11,9 @@ class ElasticNetworkInterfaces(BaseResponse):
subnet_id = self._get_param("SubnetId")
private_ip_address = self._get_param("PrivateIpAddress")
private_ip_addresses = self._get_multi_param("PrivateIpAddresses")
+ ipv6_addresses = self._get_multi_param("Ipv6Addresses")
+ ipv6_address_count = self._get_int_param("Ipv6AddressCount", 0)
+ secondary_ips_count = self._get_param("SecondaryPrivateIpAddressCount")
groups = self._get_multi_param("SecurityGroupId")
subnet = self.ec2_backend.get_subnet(subnet_id)
description = self._get_param("Description")
@@ -21,6 +28,9 @@ class ElasticNetworkInterfaces(BaseResponse):
groups,
description,
tags,
+ secondary_ips_count=secondary_ips_count,
+ ipv6_addresses=ipv6_addresses,
+ ipv6_address_count=ipv6_address_count,
)
template = self.response_template(CREATE_NETWORK_INTERFACE_RESPONSE)
return template.render(eni=eni)
@@ -65,10 +75,11 @@ class ElasticNetworkInterfaces(BaseResponse):
def modify_network_interface_attribute(self):
eni_id = self._get_param("NetworkInterfaceId")
group_ids = self._get_multi_param("SecurityGroupId")
- source_dest_check = self._get_param("SourceDestCheck")
+ source_dest_check = get_attribute_value("SourceDestCheck", self.querystring)
+ description = get_attribute_value("Description", self.querystring)
if self.is_not_dryrun("ModifyNetworkInterface"):
self.ec2_backend.modify_network_interface_attribute(
- eni_id, group_ids, source_dest_check
+ eni_id, group_ids, source_dest_check, description
)
return MODIFY_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE
@@ -78,13 +89,92 @@ class ElasticNetworkInterfaces(BaseResponse):
"ElasticNetworkInterfaces(AmazonVPC).reset_network_interface_attribute is not yet implemented"
)
+ def assign_private_ip_addresses(self):
+ eni_id = self._get_param("NetworkInterfaceId")
+ secondary_ips_count = self._get_int_param("SecondaryPrivateIpAddressCount", 0)
+ eni = self.ec2_backend.assign_private_ip_addresses(eni_id, secondary_ips_count)
+ template = self.response_template(ASSIGN_PRIVATE_IP_ADDRESSES)
+ return template.render(eni=eni)
+
+ def unassign_private_ip_addresses(self):
+ eni_id = self._get_param("NetworkInterfaceId")
+ private_ip_address = self._get_multi_param("PrivateIpAddress")
+ eni = self.ec2_backend.unassign_private_ip_addresses(eni_id, private_ip_address)
+ template = self.response_template(UNASSIGN_PRIVATE_IP_ADDRESSES)
+ return template.render(eni=eni)
+
+ def assign_ipv6_addresses(self):
+ eni_id = self._get_param("NetworkInterfaceId")
+ ipv6_count = self._get_int_param("Ipv6AddressCount", 0)
+ ipv6_addresses = self._get_multi_param("Ipv6Addresses")
+ eni = self.ec2_backend.assign_ipv6_addresses(
+ eni_id, ipv6_addresses, ipv6_count,
+ )
+ template = self.response_template(ASSIGN_IPV6_ADDRESSES)
+ return template.render(eni=eni)
+
+ def unassign_ipv6_addresses(self):
+ eni_id = self._get_param("NetworkInterfaceId")
+ ips = self._get_multi_param("Ipv6Addresses")
+ eni, unassigned_ips = self.ec2_backend.unassign_ipv6_addresses(eni_id, ips)
+ template = self.response_template(UNASSIGN_IPV6_ADDRESSES)
+ return template.render(eni=eni, unassigned_ips=unassigned_ips)
+
+
+ASSIGN_PRIVATE_IP_ADDRESSES = """
+ 3fb591ba-558c-48f8-ae6b-c2f9d6d06425
+ {{ eni.id }}
+
+ {% for address in eni.private_ip_addresses %}
+ -
+ {{ address.PrivateIpAddress }}
+
+ {% endfor %}
+
+ true
+"""
+
+
+UNASSIGN_PRIVATE_IP_ADDRESSES = """
+ 3fb591ba-558c-48f8-ae6b-c2f9d6d06425
+ {{ eni.id }}
+
+ {% for address in eni.private_ip_addresses %}
+ -
+ {{ address.PrivateIpAddress }}
+
+ {% endfor %}
+
+ true
+"""
+
+
+ASSIGN_IPV6_ADDRESSES = """
+ c36d17eb-a0ba-4d38-8727-example
+ {{ eni.id }}
+
+ {% for address in eni.ipv6_addresses %}
+ - {{address}}
+ {% endfor %}
+
+
+"""
+
+UNASSIGN_IPV6_ADDRESSES = """
+ 94d446d7-fc8e-4918-94f9-example
+ {{ eni.id }}
+
+ {% for address in unassigned_ips %}
+ - {{address}}
+ {% endfor %}
+
+"""
+
CREATE_NETWORK_INTERFACE_RESPONSE = """
2c6021ec-d705-445a-9780-420d0c7ab793
-
-
{{ eni.id }}
{{ eni.subnet.id }}
{{ eni.subnet.vpc_id }}
@@ -100,7 +190,10 @@ CREATE_NETWORK_INTERFACE_RESPONSE = """
{% if eni.private_ip_address %}
{{ eni.private_ip_address }}
{% endif %}
- {{ eni.source_dest_check }}
+ {% if eni.private_dns_name %}
+ {{ eni.private_dns_name }}
+ {% endif %}
+ {{ "true" if eni.source_dest_check == True else "false" }}
{% for group in eni.group_set %}
-
@@ -131,13 +224,19 @@ CREATE_NETWORK_INTERFACE_RESPONSE = """
-
{{ address.PrivateIpAddress }}
{% if address.privateDnsName %}
- {{ address.privateDnsName }}
+ {{ address.PrivateDnsName }}
{% endif %}
- {{ address.Primary }}
+ {{ "true" if address.Primary == True else "false" }}
{% endfor %}
-
+
+ {% for address in eni.ipv6_addresses %}
+ -
+ {{address}}
+
+ {% endfor %}
+
{{ eni.interface_type }}
@@ -148,60 +247,69 @@ DESCRIBE_NETWORK_INTERFACES_RESPONSE = """
{% for eni in enis %}
-
- {{ eni.id }}
- {{ eni.subnet.id }}
- {{ eni.subnet.vpc_id }}
- {{ eni.subnet.availability_zone }}
- {% if eni.description %}
- {{ eni.description }}
- {% endif %}
- {{ eni.owner_id }}
- AIDARCSPW2WNREUEN7XFM
- False
- {{ eni.status }}
- {{ eni.mac_address }}
- {% if eni.private_ip_address %}
+ {{ eni.id }}
+ {{ eni.subnet.id }}
+ {{ eni.subnet.vpc_id }}
+ {{ eni.subnet.availability_zone }}
+ {% if eni.description %}
+ {{ eni.description }}
+ {% endif %}
+ {{ eni.owner_id }}
+ AIDARCSPW2WNREUEN7XFM
+ False
+ {{ eni.status }}
+ {{ eni.mac_address }}
+ {% if eni.private_ip_address %}
{{ eni.private_ip_address }}
- {% endif %}
- {{ eni.source_dest_check }}
-
- {% for group in eni.group_set %}
-
-
- {{ group.id }}
- {{ group.name }}
-
- {% endfor %}
-
- {% if eni.association %}
-
- {{ eni.public_ip }}
- {{ eni.owner_id }}
- {{ eni.association.allocationId }}
- {{ eni.association.associationId }}
- true
-
- {% endif %}
-
+ {% endif %}
+ {% if eni.private_dns_name %}
+ {{ eni.private_dns_name }}
+ {% endif %}
+ {{ "true" if eni.source_dest_check == True else "false" }}
+
+ {% for group in eni.group_set %}
+ -
+ {{ group.id }}
+ {{ group.name }}
+
+ {% endfor %}
+
+ {% if eni.association %}
+
+ {{ eni.public_ip }}
+ {{ eni.owner_id }}
+ {{ eni.association.allocationId }}
+ {{ eni.association.associationId }}
+ true
+
+ {% endif %}
+
{% for tag in eni.get_tags() %}
-
{{ tag.key }}
{{ tag.value }}
{% endfor %}
-
-
+
+
{% for address in eni.private_ip_addresses %}
-
- {{ address.PrivateIpAddress }}
- {% if address.privateDnsName %}
- {{ address.privateDnsName }}
- {% endif %}
- {{ address.Primary }}
+ {{ address.PrivateIpAddress }}
+ {% if address.privateDnsName %}
+ {{ address.PrivateDnsName }}
+ {% endif %}
+ {{ "true" if address.Primary == True else "false" }}
{% endfor %}
-
-
- {{ eni.interface_type }}
+
+
+ {% for address in eni.ipv6_addresses %}
+ -
+ {{address}}
+
+ {% endfor %}
+
+ {{ eni.interface_type }}
{% endfor %}
diff --git a/moto/ec2/utils.py b/moto/ec2/utils.py
index c80cf3328..c8e4e2f11 100644
--- a/moto/ec2/utils.py
+++ b/moto/ec2/utils.py
@@ -3,6 +3,7 @@ import hashlib
import fnmatch
import random
import re
+import ipaddress
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
@@ -227,7 +228,18 @@ def random_public_ip():
return "54.214.{0}.{1}".format(random.choice(range(255)), random.choice(range(255)))
-def random_private_ip():
+def random_private_ip(cidr=None, ipv6=False):
+ # prefix - ula.prefixlen : get number of remaing length for the IP.
+ # prefix will be 32 for IPv4 and 128 for IPv6.
+ # random.getrandbits() will generate remaining bits for IPv6 or Ipv4 in decimal format
+ if cidr:
+ if ipv6:
+ ula = ipaddress.IPv6Network(cidr)
+ return str(ula.network_address + (random.getrandbits(128 - ula.prefixlen)))
+ ula = ipaddress.IPv4Network(cidr)
+ return str(ula.network_address + (random.getrandbits(32 - ula.prefixlen)))
+ if ipv6:
+ return "2001::cafe:%x/64" % random.getrandbits(16)
return "10.{0}.{1}.{2}".format(
random.choice(range(255)), random.choice(range(255)), random.choice(range(255))
)
@@ -239,6 +251,13 @@ def random_ip():
)
+def generate_dns_from_ip(ip, type="internal"):
+ splits = ip.split("/")[0].split(".") if "/" in ip else ip.split(".")
+ return "ip-{}-{}-{}-{}.ec2.{}".format(
+ splits[0], splits[1], splits[2], splits[3], type
+ )
+
+
def random_mac_address():
return "02:00:00:%02x:%02x:%02x" % (
random.randint(0, 255),
@@ -357,6 +376,16 @@ def dict_from_querystring(parameter, querystring_dict):
return use_dict
+def get_attribute_value(parameter, querystring_dict):
+ for key, value in querystring_dict.items():
+ match = re.search(r"{0}.Value".format(parameter), key)
+ if match:
+ if value[0].lower() in ["true", "false"]:
+ return True if value[0].lower() in ["true"] else False
+ return value[0]
+ return None
+
+
def get_object_value(obj, attr):
keys = attr.split(".")
val = obj
diff --git a/tests/terraform-tests.success.txt b/tests/terraform-tests.success.txt
index 4d5d3fa9e..59ed5e56a 100644
--- a/tests/terraform-tests.success.txt
+++ b/tests/terraform-tests.success.txt
@@ -111,4 +111,10 @@ TestAccAWSNatGateway
TestAccAWSRouteTable_
TestAccAWSRouteTableAssociation_
TestAccAWSS3Bucket_forceDestroyWithObjectLockEnabled
+TestAccAWSENI_PrivateIpsCount
+TestAccAWSENI_SourceDestCheck
+TestAccAWSENI_Tags
+TestAccAWSENI_basic
+TestAccAWSENI_IPv6
+TestAccAWSENI_disappears
TestAccAWSS3BucketObject_
diff --git a/tests/test_ec2/test_elastic_network_interfaces.py b/tests/test_ec2/test_elastic_network_interfaces.py
index d5dc03244..717e0f118 100644
--- a/tests/test_ec2/test_elastic_network_interfaces.py
+++ b/tests/test_ec2/test_elastic_network_interfaces.py
@@ -33,7 +33,7 @@ def test_elastic_network_interfaces():
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(1)
eni = all_enis[0]
- eni.groups.should.have.length_of(0)
+ eni.groups.should.have.length_of(1)
eni.private_ip_addresses.should.have.length_of(1)
eni.private_ip_addresses[0].private_ip_address.startswith("10.").should.be.true
@@ -80,7 +80,7 @@ def test_elastic_network_interfaces_boto3():
]
my_enis.should.have.length_of(1)
eni = my_enis[0]
- eni["Groups"].should.have.length_of(0)
+ eni["Groups"].should.have.length_of(1)
eni["PrivateIpAddresses"].should.have.length_of(1)
eni["PrivateIpAddresses"][0]["PrivateIpAddress"].startswith("10.").should.be.true
@@ -150,7 +150,7 @@ def test_elastic_network_interfaces_with_private_ip():
all_enis.should.have.length_of(1)
eni = all_enis[0]
- eni.groups.should.have.length_of(0)
+ eni.groups.should.have.length_of(1)
eni.private_ip_addresses.should.have.length_of(1)
eni.private_ip_addresses[0].private_ip_address.should.equal(private_ip)
@@ -175,7 +175,7 @@ def test_elastic_network_interfaces_with_private_ip_boto3():
]
eni = my_enis[0]
- eni["Groups"].should.have.length_of(0)
+ eni["Groups"].should.have.length_of(1)
eni["PrivateIpAddresses"].should.have.length_of(1)
eni["PrivateIpAddresses"][0]["PrivateIpAddress"].should.equal(private_ip)