278 lines
8.6 KiB
Python
278 lines
8.6 KiB
Python
from __future__ import unicode_literals
|
|
import boto
|
|
import boto3
|
|
import sure # noqa
|
|
from nose.tools import assert_raises
|
|
from botocore.exceptions import ClientError
|
|
|
|
from moto import mock_ec2_deprecated, mock_ec2
|
|
|
|
|
|
@mock_ec2_deprecated
|
|
def test_default_network_acl_created_with_vpc():
|
|
conn = boto.connect_vpc("the_key", "the secret")
|
|
vpc = conn.create_vpc("10.0.0.0/16")
|
|
all_network_acls = conn.get_all_network_acls()
|
|
all_network_acls.should.have.length_of(2)
|
|
|
|
|
|
@mock_ec2_deprecated
|
|
def test_network_acls():
|
|
conn = boto.connect_vpc("the_key", "the secret")
|
|
vpc = conn.create_vpc("10.0.0.0/16")
|
|
network_acl = conn.create_network_acl(vpc.id)
|
|
all_network_acls = conn.get_all_network_acls()
|
|
all_network_acls.should.have.length_of(3)
|
|
|
|
|
|
@mock_ec2_deprecated
|
|
def test_new_subnet_associates_with_default_network_acl():
|
|
conn = boto.connect_vpc("the_key", "the secret")
|
|
vpc = conn.get_all_vpcs()[0]
|
|
|
|
subnet = conn.create_subnet(vpc.id, "172.31.112.0/20")
|
|
all_network_acls = conn.get_all_network_acls()
|
|
all_network_acls.should.have.length_of(1)
|
|
|
|
acl = all_network_acls[0]
|
|
acl.associations.should.have.length_of(7)
|
|
[a.subnet_id for a in acl.associations].should.contain(subnet.id)
|
|
|
|
|
|
@mock_ec2_deprecated
|
|
def test_network_acl_entries():
|
|
conn = boto.connect_vpc("the_key", "the secret")
|
|
vpc = conn.create_vpc("10.0.0.0/16")
|
|
|
|
network_acl = conn.create_network_acl(vpc.id)
|
|
|
|
network_acl_entry = conn.create_network_acl_entry(
|
|
network_acl.id,
|
|
110,
|
|
6,
|
|
"ALLOW",
|
|
"0.0.0.0/0",
|
|
False,
|
|
port_range_from="443",
|
|
port_range_to="443",
|
|
)
|
|
|
|
all_network_acls = conn.get_all_network_acls()
|
|
all_network_acls.should.have.length_of(3)
|
|
|
|
test_network_acl = next(na for na in all_network_acls if na.id == network_acl.id)
|
|
entries = test_network_acl.network_acl_entries
|
|
entries.should.have.length_of(1)
|
|
entries[0].rule_number.should.equal("110")
|
|
entries[0].protocol.should.equal("6")
|
|
entries[0].rule_action.should.equal("ALLOW")
|
|
|
|
|
|
@mock_ec2_deprecated
|
|
def test_delete_network_acl_entry():
|
|
conn = boto.connect_vpc("the_key", "the secret")
|
|
vpc = conn.create_vpc("10.0.0.0/16")
|
|
|
|
network_acl = conn.create_network_acl(vpc.id)
|
|
|
|
conn.create_network_acl_entry(
|
|
network_acl.id,
|
|
110,
|
|
6,
|
|
"ALLOW",
|
|
"0.0.0.0/0",
|
|
False,
|
|
port_range_from="443",
|
|
port_range_to="443",
|
|
)
|
|
conn.delete_network_acl_entry(network_acl.id, 110, False)
|
|
|
|
all_network_acls = conn.get_all_network_acls()
|
|
|
|
test_network_acl = next(na for na in all_network_acls if na.id == network_acl.id)
|
|
entries = test_network_acl.network_acl_entries
|
|
entries.should.have.length_of(0)
|
|
|
|
|
|
@mock_ec2_deprecated
|
|
def test_replace_network_acl_entry():
|
|
conn = boto.connect_vpc("the_key", "the secret")
|
|
vpc = conn.create_vpc("10.0.0.0/16")
|
|
|
|
network_acl = conn.create_network_acl(vpc.id)
|
|
|
|
conn.create_network_acl_entry(
|
|
network_acl.id,
|
|
110,
|
|
6,
|
|
"ALLOW",
|
|
"0.0.0.0/0",
|
|
False,
|
|
port_range_from="443",
|
|
port_range_to="443",
|
|
)
|
|
conn.replace_network_acl_entry(
|
|
network_acl.id,
|
|
110,
|
|
-1,
|
|
"DENY",
|
|
"0.0.0.0/0",
|
|
False,
|
|
port_range_from="22",
|
|
port_range_to="22",
|
|
)
|
|
|
|
all_network_acls = conn.get_all_network_acls()
|
|
|
|
test_network_acl = next(na for na in all_network_acls if na.id == network_acl.id)
|
|
entries = test_network_acl.network_acl_entries
|
|
entries.should.have.length_of(1)
|
|
entries[0].rule_number.should.equal("110")
|
|
entries[0].protocol.should.equal("-1")
|
|
entries[0].rule_action.should.equal("DENY")
|
|
|
|
|
|
@mock_ec2_deprecated
|
|
def test_associate_new_network_acl_with_subnet():
|
|
conn = boto.connect_vpc("the_key", "the secret")
|
|
vpc = conn.create_vpc("10.0.0.0/16")
|
|
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
|
|
network_acl = conn.create_network_acl(vpc.id)
|
|
|
|
conn.associate_network_acl(network_acl.id, subnet.id)
|
|
|
|
all_network_acls = conn.get_all_network_acls()
|
|
all_network_acls.should.have.length_of(3)
|
|
|
|
test_network_acl = next(na for na in all_network_acls if na.id == network_acl.id)
|
|
|
|
test_network_acl.associations.should.have.length_of(1)
|
|
test_network_acl.associations[0].subnet_id.should.equal(subnet.id)
|
|
|
|
|
|
@mock_ec2_deprecated
|
|
def test_delete_network_acl():
|
|
conn = boto.connect_vpc("the_key", "the secret")
|
|
vpc = conn.create_vpc("10.0.0.0/16")
|
|
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
|
|
network_acl = conn.create_network_acl(vpc.id)
|
|
|
|
all_network_acls = conn.get_all_network_acls()
|
|
all_network_acls.should.have.length_of(3)
|
|
|
|
any(acl.id == network_acl.id for acl in all_network_acls).should.be.ok
|
|
|
|
conn.delete_network_acl(network_acl.id)
|
|
|
|
updated_network_acls = conn.get_all_network_acls()
|
|
updated_network_acls.should.have.length_of(2)
|
|
|
|
any(acl.id == network_acl.id for acl in updated_network_acls).shouldnt.be.ok
|
|
|
|
|
|
@mock_ec2_deprecated
|
|
def test_network_acl_tagging():
|
|
conn = boto.connect_vpc("the_key", "the secret")
|
|
vpc = conn.create_vpc("10.0.0.0/16")
|
|
network_acl = conn.create_network_acl(vpc.id)
|
|
|
|
network_acl.add_tag("a key", "some value")
|
|
|
|
tag = conn.get_all_tags()[0]
|
|
tag.name.should.equal("a key")
|
|
tag.value.should.equal("some value")
|
|
|
|
all_network_acls = conn.get_all_network_acls()
|
|
test_network_acl = next(na for na in all_network_acls if na.id == network_acl.id)
|
|
test_network_acl.tags.should.have.length_of(1)
|
|
test_network_acl.tags["a key"].should.equal("some value")
|
|
|
|
|
|
@mock_ec2
|
|
def test_new_subnet_in_new_vpc_associates_with_default_network_acl():
|
|
ec2 = boto3.resource("ec2", region_name="us-west-1")
|
|
new_vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
|
|
new_vpc.reload()
|
|
|
|
subnet = ec2.create_subnet(VpcId=new_vpc.id, CidrBlock="10.0.0.0/24")
|
|
subnet.reload()
|
|
|
|
new_vpcs_default_network_acl = next(iter(new_vpc.network_acls.all()), None)
|
|
new_vpcs_default_network_acl.reload()
|
|
new_vpcs_default_network_acl.vpc_id.should.equal(new_vpc.id)
|
|
new_vpcs_default_network_acl.associations.should.have.length_of(1)
|
|
new_vpcs_default_network_acl.associations[0]["SubnetId"].should.equal(subnet.id)
|
|
|
|
|
|
@mock_ec2
|
|
def test_default_network_acl_default_entries():
|
|
ec2 = boto3.resource("ec2", region_name="us-west-1")
|
|
default_network_acl = next(iter(ec2.network_acls.all()), None)
|
|
default_network_acl.is_default.should.be.ok
|
|
|
|
default_network_acl.entries.should.have.length_of(4)
|
|
unique_entries = []
|
|
for entry in default_network_acl.entries:
|
|
entry["CidrBlock"].should.equal("0.0.0.0/0")
|
|
entry["Protocol"].should.equal("-1")
|
|
entry["RuleNumber"].should.be.within([100, 32767])
|
|
entry["RuleAction"].should.be.within(["allow", "deny"])
|
|
assert type(entry["Egress"]) is bool
|
|
if entry["RuleAction"] == "allow":
|
|
entry["RuleNumber"].should.be.equal(100)
|
|
else:
|
|
entry["RuleNumber"].should.be.equal(32767)
|
|
if entry not in unique_entries:
|
|
unique_entries.append(entry)
|
|
|
|
unique_entries.should.have.length_of(4)
|
|
|
|
|
|
@mock_ec2
|
|
def test_delete_default_network_acl_default_entry():
|
|
ec2 = boto3.resource("ec2", region_name="us-west-1")
|
|
default_network_acl = next(iter(ec2.network_acls.all()), None)
|
|
default_network_acl.is_default.should.be.ok
|
|
|
|
default_network_acl.entries.should.have.length_of(4)
|
|
first_default_network_acl_entry = default_network_acl.entries[0]
|
|
|
|
default_network_acl.delete_entry(
|
|
Egress=first_default_network_acl_entry["Egress"],
|
|
RuleNumber=first_default_network_acl_entry["RuleNumber"],
|
|
)
|
|
|
|
default_network_acl.entries.should.have.length_of(3)
|
|
|
|
|
|
@mock_ec2
|
|
def test_duplicate_network_acl_entry():
|
|
ec2 = boto3.resource("ec2", region_name="us-west-1")
|
|
default_network_acl = next(iter(ec2.network_acls.all()), None)
|
|
default_network_acl.is_default.should.be.ok
|
|
|
|
rule_number = 200
|
|
egress = True
|
|
default_network_acl.create_entry(
|
|
CidrBlock="0.0.0.0/0",
|
|
Egress=egress,
|
|
Protocol="-1",
|
|
RuleAction="allow",
|
|
RuleNumber=rule_number,
|
|
)
|
|
|
|
with assert_raises(ClientError) as ex:
|
|
default_network_acl.create_entry(
|
|
CidrBlock="10.0.0.0/0",
|
|
Egress=egress,
|
|
Protocol="-1",
|
|
RuleAction="deny",
|
|
RuleNumber=rule_number,
|
|
)
|
|
str(ex.exception).should.equal(
|
|
"An error occurred (NetworkAclEntryAlreadyExists) when calling the CreateNetworkAclEntry "
|
|
"operation: The network acl entry identified by {} already exists.".format(
|
|
rule_number
|
|
)
|
|
)
|