Add default Network ACL during VPC creation. Associate default Network ACL with a new subnet. Add support for associating a new Network ACL with a subnet.

This commit is contained in:
Tyler Sanders 2014-11-14 17:23:56 -06:00
parent efc8caaf48
commit 400d12b175
4 changed files with 198 additions and 14 deletions

View File

@ -88,7 +88,7 @@ from .utils import (
filter_internet_gateways,
filter_reservations,
random_network_acl_id,
)
random_network_acl_subnet_association_id)
def validate_resource_ids(resource_ids):
@ -1412,6 +1412,9 @@ class VPCBackend(object):
# AWS creates a default main route table and security group.
main_route_table = self.create_route_table(vpc_id, main=True)
# AWS creates a default Network ACL
default_network_acl = self.create_network_acl(vpc_id, default=True)
default = self.get_security_group_from_name('default', vpc_id=vpc_id)
if not default:
self.create_security_group('default', 'default VPC security group', vpc_id=vpc_id)
@ -1603,6 +1606,10 @@ class SubnetBackend(object):
def create_subnet(self, vpc_id, cidr_block):
subnet_id = random_subnet_id()
subnet = Subnet(self, subnet_id, vpc_id, cidr_block)
# AWS associates a new subnet with the default Network ACL
self.associate_default_network_acl_with_subnet(subnet_id)
vpc = self.get_vpc(vpc_id) # Validate VPC exists
self.subnets[subnet_id] = subnet
return subnet
@ -2281,15 +2288,23 @@ class NetworkAclBackend(object):
raise InvalidNetworkAclIdError(network_acl_id)
return network_acl
def create_network_acl(self, vpc_id):
def create_network_acl(self, vpc_id, default=False):
network_acl_id = random_network_acl_id()
network_acl = NetworkAcl(self, network_acl_id, vpc_id)
vpc = self.get_vpc(vpc_id)
network_acl = NetworkAcl(self, network_acl_id, vpc_id, default)
self.network_acls[network_acl_id] = network_acl
return network_acl
def get_all_network_acls(self, filters=None):
def get_all_network_acls(self, network_acl_ids=None, filters=None):
network_acls = self.network_acls.values()
if network_acl_ids:
network_acls = [network_acl for network_acl in network_acls
if network_acl.id in network_acl_ids ]
if len(network_acls) != len(network_acl_ids):
invalid_id = list(set(network_acl_ids).difference(set([network_acl.id for network_acl in network_acls])))[0]
raise InvalidRouteTableIdError(invalid_id)
return generic_filter(filters, network_acls)
def delete_network_acl(self, network_acl_id):
@ -2312,13 +2327,69 @@ class NetworkAclBackend(object):
network_acl.network_acl_entries.append(network_acl_entry)
return network_acl_entry
def replace_network_acl_association(self, association_id,
network_acl_id):
# lookup existing association for subnet and delete it
default_acl = next(value for key, value in self.network_acls.iteritems()
if association_id in value.associations.keys())
subnet_id = None
for key, value in default_acl.associations.iteritems():
if key == association_id:
subnet_id = default_acl.associations[key].subnet_id
del default_acl.associations[key]
break
new_assoc_id = random_network_acl_subnet_association_id()
association = NetworkAclAssociation(self,
new_assoc_id,
subnet_id,
network_acl_id)
new_acl = self.get_network_acl(network_acl_id)
new_acl.associations[new_assoc_id] = association
return association
def associate_default_network_acl_with_subnet(self, subnet_id):
association_id = random_network_acl_subnet_association_id()
acl = next(acl for acl in self.network_acls.values() if acl.default)
acl.associations[association_id] = NetworkAclAssociation(self, association_id,
subnet_id, acl.id)
class NetworkAclAssociation(object):
def __init__(self, ec2_backend, new_association_id,
subnet_id, network_acl_id):
self.ec2_backend = ec2_backend
self.id = new_association_id
self.subnet_id = subnet_id
self.network_acl_id = network_acl_id
super(NetworkAclAssociation, self).__init__()
class NetworkAcl(TaggedEC2Resource):
def __init__(self, ec2_backend, network_acl_id, vpc_id):
def __init__(self, ec2_backend, network_acl_id, vpc_id, default=False):
self.ec2_backend = ec2_backend
self.id = network_acl_id
self.vpc_id = vpc_id
self.network_acl_entries = []
self.associations = {}
self.default = default
def get_filter_value(self, filter_name):
if filter_name == "vpc-id":
return self.vpc_id
elif filter_name == "association.network-acl-id":
return self.id
elif filter_name == "association.subnet-id":
return [assoc.subnet_id for assoc in self.associations.values()]
filter_value = super(NetworkAcl, self).get_filter_value(filter_name)
if filter_value is None:
self.ec2_backend.raise_not_implemented_error("The filter '{0}' for DescribeNetworkAcls".format(filter_name))
return filter_value
class NetworkAclEntry(TaggedEC2Resource):

View File

@ -1,7 +1,8 @@
from __future__ import unicode_literals
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.utils import filters_from_querystring
from moto.ec2.utils import filters_from_querystring, \
network_acl_ids_from_querystring
class NetworkACLs(BaseResponse):
@ -33,22 +34,32 @@ class NetworkACLs(BaseResponse):
return template.render(network_acl_entry=network_acl_entry)
def delete_network_acl(self):
raise NotImplementedError(
'NetworkACLs(AmazonVPC).delete_network_acl is not yet implemented')
network_acl_id = self.querystring.get('NetworkAclId')[0]
self.ec2_backend.delete_network_acl(network_acl_id)
template = Template(DELETE_NETWORK_ACL_ASSOCIATION)
return template.render()
def delete_network_acl_entry(self):
raise NotImplementedError(
'NetworkACLs(AmazonVPC).delete_network_acl_entry is not yet implemented')
def describe_network_acls(self):
network_acl_ids = network_acl_ids_from_querystring(self.querystring)
filters = filters_from_querystring(self.querystring)
network_acls = self.ec2_backend.get_all_network_acls(filters)
network_acls = self.ec2_backend.get_all_network_acls(network_acl_ids, filters)
template = Template(DESCRIBE_NETWORK_ACL_RESPONSE)
return template.render(network_acls=network_acls)
def replace_network_acl_association(self):
raise NotImplementedError(
'NetworkACLs(AmazonVPC).replace_network_acl_association is not yet implemented')
association_id = self.querystring.get('AssociationId')[0]
network_acl_id = self.querystring.get('NetworkAclId')[0]
association = self.ec2_backend.replace_network_acl_association(
association_id,
network_acl_id
)
template = Template(REPLACE_NETWORK_ACL_ASSOCIATION)
return template.render(association=association)
def replace_network_acl_entry(self):
raise NotImplementedError(
@ -95,7 +106,15 @@ DESCRIBE_NETWORK_ACL_RESPONSE = """
</item>
{% endfor %}
</entrySet>
<associationSet/>
<associationSet>
{% for association in network_acl.associations.values() %}
<item>
<networkAclAssociationId>{{ association.id }}</networkAclAssociationId>
<networkAclId>{{ association.network_acl_id }}</networkAclId>
<subnetId>{{ association.subnet_id }}</subnetId>
</item>
{% endfor %}
</associationSet>
<tagSet/>
</item>
{% endfor %}
@ -109,3 +128,17 @@ CREATE_NETWORK_ACL_ENTRY_RESPONSE = """
<return>true</return>
</CreateNetworkAclEntryResponse>
"""
REPLACE_NETWORK_ACL_ASSOCIATION = """
<ReplaceNetworkAclAssociationResponse xmlns="http://ec2.amazonaws.com/doc/2014-09-01/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<newAssociationId>{{ association.new_association_id }}</newAssociationId>
</ReplaceNetworkAclAssociationResponse>
"""
DELETE_NETWORK_ACL_ASSOCIATION = """
<DeleteNetworkAclResponse xmlns="http://ec2.amazonaws.com/doc/2014-10-01/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<return>true</return>
</DeleteNetworkAclResponse>
"""

View File

@ -10,6 +10,7 @@ EC2_RESOURCE_TO_PREFIX = {
'instance': 'i',
'internet-gateway': 'igw',
'network-acl': 'acl',
'network-acl-subnet-assoc': 'aclassoc',
'network-interface': 'eni',
'network-interface-attachment': 'eni-attach',
'reserved-instance': 'uuid4',
@ -76,6 +77,10 @@ def random_network_acl_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['network-acl'])
def random_network_acl_subnet_association_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['network-acl-subnet-assoc'])
def random_volume_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['volume'])
@ -162,6 +167,14 @@ def route_table_ids_from_querystring(querystring_dict):
return route_table_ids
def network_acl_ids_from_querystring(querystring_dict):
network_acl_ids = []
for key, value in querystring_dict.items():
if 'NetworkAclId' in key:
network_acl_ids.append(value[0])
return network_acl_ids
def vpc_ids_from_querystring(querystring_dict):
vpc_ids = []
for key, value in querystring_dict.items():

View File

@ -5,6 +5,15 @@ import sure # noqa
from moto import mock_ec2
@mock_ec2
def test_default_network_acl_created_with_vpc():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(1)
@mock_ec2
def test_network_acls():
@ -13,9 +22,22 @@ def test_network_acls():
network_acl = conn.create_network_acl(vpc.id)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(2)
@mock_ec2
def test_new_subnet_associates_with_default_network_acl():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(1)
acl = all_network_acls[0]
acl.associations.should.have.length_of(1)
acl.associations[0].subnet_id.should.equal(subnet.id)
@mock_ec2
def test_network_acl_entries():
@ -33,12 +55,57 @@ def test_network_acl_entries():
)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(1)
all_network_acls.should.have.length_of(2)
entries = all_network_acls[0].network_acl_entries
test_network_acl = next(na for na in all_network_acls
if na.id == network_acl.id)
entries = test_network_acl.network_acl_entries
entries.should.have.length_of(1)
entries[0].rule_number.should.equal('110')
entries[0].protocol.should.equal('6')
entries[0].rule_action.should.equal('ALLOW')
@mock_ec2
def test_associate_new_network_acl_with_subnet():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
network_acl = conn.create_network_acl(vpc.id)
conn.associate_network_acl(network_acl.id, subnet.id)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(2)
test_network_acl = next(na for na in all_network_acls
if na.id == network_acl.id)
test_network_acl.associations.should.have.length_of(1)
test_network_acl.associations[0].subnet_id.should.equal(subnet.id)
@mock_ec2
def test_delete_network_acl():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
network_acl = conn.create_network_acl(vpc.id)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(2)
any(acl.id == network_acl.id for acl in all_network_acls).should.be.ok
conn.delete_network_acl(network_acl.id)
updated_network_acls = conn.get_all_network_acls()
updated_network_acls.should.have.length_of(1)
any(acl.id == network_acl.id for acl in updated_network_acls).shouldnt.be.ok