2014-08-27 15:17:06 +00:00
|
|
|
from __future__ import unicode_literals
|
2014-08-25 22:09:38 +00:00
|
|
|
# Ensure 'assert_raises' context manager support for Python 2.6
|
|
|
|
import tests.backport_assert_raises
|
|
|
|
from nose.tools import assert_raises
|
|
|
|
|
2014-06-05 04:12:22 +00:00
|
|
|
import re
|
|
|
|
|
2013-02-22 04:13:01 +00:00
|
|
|
import boto
|
2014-06-05 04:12:22 +00:00
|
|
|
from boto.exception import EC2ResponseError
|
|
|
|
|
2013-08-03 21:21:25 +00:00
|
|
|
import sure # noqa
|
2013-02-22 04:13:01 +00:00
|
|
|
|
|
|
|
from moto import mock_ec2
|
|
|
|
|
|
|
|
|
2014-06-05 04:12:22 +00:00
|
|
|
VPC_CIDR="10.0.0.0/16"
|
|
|
|
BAD_VPC="vpc-deadbeef"
|
|
|
|
BAD_IGW="igw-deadbeef"
|
|
|
|
|
|
|
|
@mock_ec2
|
|
|
|
def test_igw_create():
|
|
|
|
""" internet gateway create """
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
|
|
|
|
conn.get_all_internet_gateways().should.have.length_of(0)
|
|
|
|
igw = conn.create_internet_gateway()
|
|
|
|
conn.get_all_internet_gateways().should.have.length_of(1)
|
|
|
|
igw.id.should.match(r'igw-[0-9a-f]+')
|
|
|
|
|
|
|
|
igw = conn.get_all_internet_gateways()[0]
|
|
|
|
igw.attachments.should.have.length_of(0)
|
|
|
|
|
|
|
|
@mock_ec2
|
|
|
|
def test_igw_attach():
|
|
|
|
""" internet gateway attach """
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
igw = conn.create_internet_gateway()
|
|
|
|
vpc = conn.create_vpc(VPC_CIDR)
|
|
|
|
conn.attach_internet_gateway(igw.id, vpc.id)
|
|
|
|
|
|
|
|
igw = conn.get_all_internet_gateways()[0]
|
|
|
|
igw.attachments[0].vpc_id.should.be.equal(vpc.id)
|
|
|
|
|
|
|
|
@mock_ec2
|
|
|
|
def test_igw_attach_bad_vpc():
|
|
|
|
""" internet gateway fail to attach w/ bad vpc """
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
igw = conn.create_internet_gateway()
|
2014-08-25 17:54:47 +00:00
|
|
|
|
|
|
|
with assert_raises(EC2ResponseError) as cm:
|
|
|
|
conn.attach_internet_gateway(igw.id, BAD_VPC)
|
|
|
|
cm.exception.code.should.equal('InvalidVpcID.NotFound')
|
|
|
|
cm.exception.status.should.equal(400)
|
|
|
|
cm.exception.request_id.should_not.be.none
|
2014-06-05 04:12:22 +00:00
|
|
|
|
|
|
|
@mock_ec2
|
|
|
|
def test_igw_attach_twice():
|
|
|
|
""" internet gateway fail to attach twice """
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
igw = conn.create_internet_gateway()
|
|
|
|
vpc1 = conn.create_vpc(VPC_CIDR)
|
|
|
|
vpc2 = conn.create_vpc(VPC_CIDR)
|
|
|
|
conn.attach_internet_gateway(igw.id, vpc1.id)
|
2014-08-25 17:54:47 +00:00
|
|
|
|
|
|
|
with assert_raises(EC2ResponseError) as cm:
|
|
|
|
conn.attach_internet_gateway(igw.id, vpc2.id)
|
|
|
|
cm.exception.code.should.equal('Resource.AlreadyAssociated')
|
|
|
|
cm.exception.status.should.equal(400)
|
|
|
|
cm.exception.request_id.should_not.be.none
|
2014-06-05 04:12:22 +00:00
|
|
|
|
|
|
|
@mock_ec2
|
|
|
|
def test_igw_detach():
|
|
|
|
""" internet gateway detach"""
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
igw = conn.create_internet_gateway()
|
|
|
|
vpc = conn.create_vpc(VPC_CIDR)
|
|
|
|
conn.attach_internet_gateway(igw.id, vpc.id)
|
|
|
|
conn.detach_internet_gateway(igw.id, vpc.id)
|
|
|
|
igw = conn.get_all_internet_gateways()[0]
|
|
|
|
igw.attachments.should.have.length_of(0)
|
|
|
|
|
|
|
|
@mock_ec2
|
2014-08-25 17:54:47 +00:00
|
|
|
def test_igw_detach_wrong_vpc():
|
|
|
|
""" internet gateway fail to detach w/ wrong vpc """
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
igw = conn.create_internet_gateway()
|
|
|
|
vpc1 = conn.create_vpc(VPC_CIDR)
|
|
|
|
vpc2 = conn.create_vpc(VPC_CIDR)
|
|
|
|
conn.attach_internet_gateway(igw.id, vpc1.id)
|
|
|
|
|
|
|
|
with assert_raises(EC2ResponseError) as cm:
|
|
|
|
conn.detach_internet_gateway(igw.id, vpc2.id)
|
|
|
|
cm.exception.code.should.equal('Gateway.NotAttached')
|
|
|
|
cm.exception.status.should.equal(400)
|
|
|
|
cm.exception.request_id.should_not.be.none
|
|
|
|
|
|
|
|
@mock_ec2
|
|
|
|
def test_igw_detach_invalid_vpc():
|
|
|
|
""" internet gateway fail to detach w/ invalid vpc """
|
2014-06-05 04:12:22 +00:00
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
igw = conn.create_internet_gateway()
|
|
|
|
vpc = conn.create_vpc(VPC_CIDR)
|
|
|
|
conn.attach_internet_gateway(igw.id, vpc.id)
|
2014-08-25 17:54:47 +00:00
|
|
|
|
|
|
|
with assert_raises(EC2ResponseError) as cm:
|
|
|
|
conn.detach_internet_gateway(igw.id, BAD_VPC)
|
|
|
|
cm.exception.code.should.equal('Gateway.NotAttached')
|
|
|
|
cm.exception.status.should.equal(400)
|
|
|
|
cm.exception.request_id.should_not.be.none
|
2014-06-05 04:12:22 +00:00
|
|
|
|
|
|
|
@mock_ec2
|
|
|
|
def test_igw_detach_unattached():
|
|
|
|
""" internet gateway fail to detach unattached """
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
igw = conn.create_internet_gateway()
|
2014-08-25 17:54:47 +00:00
|
|
|
vpc = conn.create_vpc(VPC_CIDR)
|
|
|
|
|
|
|
|
with assert_raises(EC2ResponseError) as cm:
|
|
|
|
conn.detach_internet_gateway(igw.id, vpc.id)
|
|
|
|
cm.exception.code.should.equal('Gateway.NotAttached')
|
|
|
|
cm.exception.status.should.equal(400)
|
|
|
|
cm.exception.request_id.should_not.be.none
|
2014-06-05 04:12:22 +00:00
|
|
|
|
|
|
|
@mock_ec2
|
|
|
|
def test_igw_delete():
|
|
|
|
""" internet gateway delete"""
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
vpc = conn.create_vpc(VPC_CIDR)
|
|
|
|
conn.get_all_internet_gateways().should.have.length_of(0)
|
|
|
|
igw = conn.create_internet_gateway()
|
|
|
|
conn.get_all_internet_gateways().should.have.length_of(1)
|
|
|
|
conn.delete_internet_gateway(igw.id)
|
|
|
|
conn.get_all_internet_gateways().should.have.length_of(0)
|
|
|
|
|
|
|
|
@mock_ec2
|
|
|
|
def test_igw_delete_attached():
|
|
|
|
""" internet gateway fail to delete attached """
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
igw = conn.create_internet_gateway()
|
|
|
|
vpc = conn.create_vpc(VPC_CIDR)
|
|
|
|
conn.attach_internet_gateway(igw.id, vpc.id)
|
2014-08-25 17:54:47 +00:00
|
|
|
|
|
|
|
with assert_raises(EC2ResponseError) as cm:
|
|
|
|
conn.delete_internet_gateway(igw.id)
|
|
|
|
cm.exception.code.should.equal('DependencyViolation')
|
|
|
|
cm.exception.status.should.equal(400)
|
|
|
|
cm.exception.request_id.should_not.be.none
|
2014-06-05 04:12:22 +00:00
|
|
|
|
|
|
|
@mock_ec2
|
|
|
|
def test_igw_desribe():
|
|
|
|
""" internet gateway fetch by id """
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
igw = conn.create_internet_gateway()
|
|
|
|
igw_by_search = conn.get_all_internet_gateways([igw.id])[0]
|
|
|
|
igw.id.should.equal(igw_by_search.id)
|
|
|
|
|
2013-02-22 04:13:01 +00:00
|
|
|
@mock_ec2
|
2014-06-05 04:12:22 +00:00
|
|
|
def test_igw_desribe_bad_id():
|
|
|
|
""" internet gateway fail to fetch by bad id """
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
2014-08-25 17:54:47 +00:00
|
|
|
with assert_raises(EC2ResponseError) as cm:
|
|
|
|
conn.get_all_internet_gateways([BAD_IGW])
|
|
|
|
cm.exception.code.should.equal('InvalidInternetGatewayID.NotFound')
|
|
|
|
cm.exception.status.should.equal(400)
|
|
|
|
cm.exception.request_id.should_not.be.none
|
2014-11-05 17:11:56 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_ec2
|
|
|
|
def test_igw_filter_by_vpc_id():
|
|
|
|
""" internet gateway filter by vpc id """
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
|
|
|
|
igw1 = conn.create_internet_gateway()
|
|
|
|
igw2 = conn.create_internet_gateway()
|
|
|
|
vpc = conn.create_vpc(VPC_CIDR)
|
|
|
|
conn.attach_internet_gateway(igw1.id, vpc.id)
|
|
|
|
|
|
|
|
result = conn.get_all_internet_gateways(filters={"attachment.vpc-id": vpc.id})
|
|
|
|
result.should.have.length_of(1)
|
|
|
|
result[0].id.should.equal(igw1.id)
|
|
|
|
|
|
|
|
|
|
|
|
@mock_ec2
|
|
|
|
def test_igw_filter_by_tags():
|
|
|
|
""" internet gateway filter by vpc id """
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
|
|
|
|
igw1 = conn.create_internet_gateway()
|
|
|
|
igw2 = conn.create_internet_gateway()
|
|
|
|
igw1.add_tag("tests", "yes")
|
|
|
|
|
|
|
|
result = conn.get_all_internet_gateways(filters={"tag:tests": "yes"})
|
|
|
|
result.should.have.length_of(1)
|
|
|
|
result[0].id.should.equal(igw1.id)
|
|
|
|
|
|
|
|
|
|
|
|
@mock_ec2
|
|
|
|
def test_igw_filter_by_internet_gateway_id():
|
|
|
|
""" internet gateway filter by internet gateway id """
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
|
|
|
|
igw1 = conn.create_internet_gateway()
|
|
|
|
igw2 = conn.create_internet_gateway()
|
|
|
|
|
|
|
|
result = conn.get_all_internet_gateways(filters={"internet-gateway-id": igw1.id})
|
|
|
|
result.should.have.length_of(1)
|
|
|
|
result[0].id.should.equal(igw1.id)
|
|
|
|
|
|
|
|
|
|
|
|
@mock_ec2
|
|
|
|
def test_igw_filter_by_attachment_state():
|
|
|
|
""" internet gateway filter by attachment state """
|
|
|
|
conn = boto.connect_vpc('the_key', 'the_secret')
|
|
|
|
|
|
|
|
igw1 = conn.create_internet_gateway()
|
|
|
|
igw2 = conn.create_internet_gateway()
|
|
|
|
vpc = conn.create_vpc(VPC_CIDR)
|
|
|
|
conn.attach_internet_gateway(igw1.id, vpc.id)
|
|
|
|
|
|
|
|
result = conn.get_all_internet_gateways(filters={"attachment.state": "available"})
|
|
|
|
result.should.have.length_of(1)
|
|
|
|
result[0].id.should.equal(igw1.id)
|