Add more availability regions and implement default VPC (#773)

Fix filter name for availability zone

Fix bug assuming dict keys are ordered

Fix tests

Fix tests

Fix bug
This commit is contained in:
Taro Sato 2017-01-11 17:37:57 -08:00 committed by Steve Pulec
parent 231d3cadcb
commit 02324ad708
13 changed files with 224 additions and 148 deletions

View File

@ -1,6 +1,6 @@
from __future__ import unicode_literals
import logging
logging.getLogger('boto').setLevel(logging.CRITICAL)
#logging.getLogger('boto').setLevel(logging.CRITICAL)
__title__ = 'moto'
__version__ = '0.4.30'

View File

@ -1086,33 +1086,34 @@ class Zone(object):
class RegionsAndZonesBackend(object):
regions = [
Region("ap-northeast-1", "ec2.ap-northeast-1.amazonaws.com"),
Region("ap-northeast-2", "ec2.ap-northeast-2.amazonaws.com"),
Region("ap-south-1", "ec2.ap-south-1.amazonaws.com"),
Region("ap-southeast-1", "ec2.ap-southeast-1.amazonaws.com"),
Region("ap-southeast-2", "ec2.ap-southeast-2.amazonaws.com"),
Region("cn-north-1", "ec2.cn-north-1.amazonaws.com.cn"),
Region("eu-central-1", "ec2.eu-central-1.amazonaws.com"),
Region("eu-west-1", "ec2.eu-west-1.amazonaws.com"),
Region("sa-east-1", "ec2.sa-east-1.amazonaws.com"),
Region("us-east-1", "ec2.us-east-1.amazonaws.com"),
Region("ap-northeast-1", "ec2.ap-northeast-1.amazonaws.com"),
Region("us-west-2", "ec2.us-west-2.amazonaws.com"),
Region("us-east-2", "ec2.us-east-2.amazonaws.com"),
Region("us-gov-west-1", "ec2.us-gov-west-1.amazonaws.com"),
Region("us-west-1", "ec2.us-west-1.amazonaws.com"),
Region("ap-southeast-1", "ec2.ap-southeast-1.amazonaws.com"),
Region("ap-southeast-2", "ec2.ap-southeast-2.amazonaws.com"),
Region("us-west-2", "ec2.us-west-2.amazonaws.com"),
]
# TODO: cleanup. For now, pretend everything is us-east-1. 'merica.
zones = [
Zone("us-east-1a", "us-east-1"),
Zone("us-east-1b", "us-east-1"),
Zone("us-east-1c", "us-east-1"),
Zone("us-east-1d", "us-east-1"),
Zone("us-east-1e", "us-east-1"),
]
zones = dict(
(region, [Zone(region + c, region) for c in 'abc'])
for region in [r.name for r in regions])
def describe_regions(self):
return self.regions
def describe_availability_zones(self):
return self.zones
return self.zones[self.region_name]
def get_zone_by_name(self, name):
for zone in self.zones:
for zone in self.zones[self.region_name]:
if zone.name == name:
return zone
@ -1794,15 +1795,15 @@ class VPC(TaggedEC2Resource):
return self.id
def get_filter_value(self, filter_name):
if filter_name == 'vpc-id':
if filter_name in ('vpc-id', 'vpcId'):
return self.id
elif filter_name == 'cidr':
elif filter_name in ('cidr', 'cidr-block', 'cidrBlock'):
return self.cidr_block
elif filter_name == 'isDefault':
elif filter_name in ('is-default', 'isDefault'):
return self.is_default
elif filter_name == 'state':
return self.state
elif filter_name == 'dhcp-options-id':
elif filter_name in ('dhcp-options-id', 'dhcpOptionsId'):
if not self.dhcp_options:
return None
@ -2013,12 +2014,7 @@ class Subnet(TaggedEC2Resource):
@property
def availability_zone(self):
if self._availability_zone is None:
# This could probably be smarter, but there doesn't appear to be a
# way to pull AZs for a region in boto
return self.ec2_backend.region_name + "a"
else:
return self._availability_zone
return self._availability_zone
@property
def physical_resource_id(self):
@ -2043,11 +2039,11 @@ class Subnet(TaggedEC2Resource):
"""
if filter_name in ('cidr', 'cidrBlock', 'cidr-block'):
return self.cidr_block
elif filter_name == 'vpc-id':
elif filter_name in ('vpc-id', 'vpcId'):
return self.vpc_id
elif filter_name == 'subnet-id':
return self.id
elif filter_name == 'availabilityZone':
elif filter_name in ('availabilityZone', 'availability-zone'):
return self.availability_zone
elif filter_name in ('defaultForAz', 'default-for-az'):
return self.default_for_az
@ -2068,37 +2064,49 @@ class Subnet(TaggedEC2Resource):
class SubnetBackend(object):
def __init__(self):
self.subnets = {}
# maps availability zone to dict of (subnet_id, subnet)
self.subnets = defaultdict(dict)
super(SubnetBackend, self).__init__()
def get_subnet(self, subnet_id):
subnet = self.subnets.get(subnet_id, None)
if not subnet:
raise InvalidSubnetIdError(subnet_id)
return subnet
for subnets in self.subnets.values():
if subnet_id in subnets:
return subnets[subnet_id]
raise InvalidSubnetIdError(subnet_id)
def create_subnet(self, vpc_id, cidr_block, availability_zone=None):
def create_subnet(self, vpc_id, cidr_block, availability_zone):
subnet_id = random_subnet_id()
vpc = self.get_vpc(vpc_id) # Validate VPC exists
default_for_az = vpc.is_default
map_public_ip_on_launch = vpc.is_default
subnet = Subnet(self, subnet_id, vpc_id, cidr_block, availability_zone, default_for_az, map_public_ip_on_launch)
# if this is the first subnet for an availability zone,
# consider it the default
default_for_az = str(availability_zone not in self.subnets).lower()
map_public_ip_on_launch = default_for_az
subnet = Subnet(self, subnet_id, vpc_id, cidr_block, availability_zone,
default_for_az, map_public_ip_on_launch)
# AWS associates a new subnet with the default Network ACL
self.associate_default_network_acl_with_subnet(subnet_id)
self.subnets[subnet_id] = subnet
self.subnets[availability_zone][subnet_id] = subnet
return subnet
def get_all_subnets(self, filters=None):
subnets = self.subnets.values()
def get_all_subnets(self, subnet_ids=None, filters=None):
subnets = []
if subnet_ids:
for subnet_id in subnet_ids:
for items in self.subnets.values():
if subnet_id in items:
subnets.append(items[subnet_id])
else:
for items in self.subnets.values():
subnets.extend(items.values())
return generic_filter(filters, subnets)
def delete_subnet(self, subnet_id):
deleted = self.subnets.pop(subnet_id, None)
if not deleted:
raise InvalidSubnetIdError(subnet_id)
return deleted
for subnets in self.subnets.values():
if subnet_id in subnets:
return subnets.pop(subnet_id, None)
raise InvalidSubnetIdError(subnet_id)
def modify_subnet_attribute(self, subnet_id, map_public_ip):
subnet = self.get_subnet(subnet_id)
@ -3377,6 +3385,29 @@ class EC2Backend(BaseBackend, InstanceBackend, TagBackend, AmiBackend,
super(EC2Backend, self).__init__()
self.region_name = region_name
# Default VPC exists by default, which is the current behavior
# of EC2-VPC. See for detail:
#
# docs.aws.amazon.com/AmazonVPC/latest/UserGuide/default-vpc.html
#
if not self.vpcs:
vpc = self.create_vpc('172.31.0.0/16')
else:
# For now this is included for potential
# backward-compatibility issues
vpc = self.vpcs.values()[0]
# Create default subnet for each availability zone
ip, _ = vpc.cidr_block.split('/')
ip = ip.split('.')
ip[2] = 0
for zone in self.describe_availability_zones():
az_name = zone.name
cidr_block = '.'.join(str(i) for i in ip) + '/20'
self.create_subnet(vpc.id, cidr_block, availability_zone=az_name)
ip[2] += 16
def reset(self):
region_name = self.region_name
self.__dict__ = {}
@ -3434,5 +3465,5 @@ class EC2Backend(BaseBackend, InstanceBackend, TagBackend, AmiBackend,
return True
ec2_backends = {}
for region in boto.ec2.regions():
for region in RegionsAndZonesBackend.regions:
ec2_backends[region.name] = EC2Backend(region.name)

View File

@ -1,4 +1,5 @@
from __future__ import unicode_literals
import random
from moto.core.responses import BaseResponse
from moto.ec2.utils import filters_from_querystring
@ -10,11 +11,12 @@ class Subnets(BaseResponse):
if 'AvailabilityZone' in self.querystring:
availability_zone = self.querystring['AvailabilityZone'][0]
else:
availability_zone = None
zone = random.choice(self.ec2_backend.describe_availability_zones())
availability_zone = zone.name
subnet = self.ec2_backend.create_subnet(
vpc_id,
cidr_block,
availability_zone,
vpc_id,
cidr_block,
availability_zone,
)
template = self.response_template(CREATE_SUBNET_RESPONSE)
return template.render(subnet=subnet)
@ -27,7 +29,17 @@ class Subnets(BaseResponse):
def describe_subnets(self):
filters = filters_from_querystring(self.querystring)
subnets = self.ec2_backend.get_all_subnets(filters)
subnet_ids = []
idx = 1
key = 'SubnetId.{0}'.format(idx)
while key in self.querystring:
v = self.querystring[key]
subnet_ids.append(v[0])
idx += 1
key = 'SubnetId.{0}'.format(idx)
subnets = self.ec2_backend.get_all_subnets(subnet_ids, filters)
template = self.response_template(DESCRIBE_SUBNETS_RESPONSE)
return template.render(subnets=subnets)

View File

@ -3,7 +3,7 @@ from .responses import EC2Response
url_bases = [
"https?://ec2.(.+).amazonaws.com",
"https?://ec2.(.+).amazonaws.com(|.cn)",
]
url_paths = {

View File

@ -19,6 +19,7 @@ from .exceptions import (
)
class FakeHealthCheck(object):
def __init__(self, timeout, healthy_threshold, unhealthy_threshold,
interval, target):
@ -337,5 +338,5 @@ class ELBBackend(BaseBackend):
elb_backends = {}
for region in boto.ec2.elb.regions():
elb_backends[region.name] = ELBBackend(region.name)
for region in ec2_backends.keys():
elb_backends[region] = ELBBackend(region)

View File

@ -661,13 +661,14 @@ def test_vpc_single_instance_in_subnet():
)
vpc_conn = boto.vpc.connect_to_region("us-west-1")
vpc = vpc_conn.get_all_vpcs()[0]
vpc = vpc_conn.get_all_vpcs(filters={'cidrBlock': '10.0.0.0/16'})[0]
vpc.cidr_block.should.equal("10.0.0.0/16")
# Add this once we implement the endpoint
# vpc_conn.get_all_internet_gateways().should.have.length_of(1)
subnet = vpc_conn.get_all_subnets()[0]
subnet = vpc_conn.get_all_subnets(filters={'vpcId': vpc.id})[0]
subnet.vpc_id.should.equal(vpc.id)
ec2_conn = boto.ec2.connect_to_region("us-west-1")
@ -1355,7 +1356,7 @@ def test_vpc_gateway_attachment_creation_should_attach_itself_to_vpc():
)
vpc_conn = boto.vpc.connect_to_region("us-west-1")
vpc = vpc_conn.get_all_vpcs()[0]
vpc = vpc_conn.get_all_vpcs(filters={'cidrBlock': '10.0.0.0/16'})[0]
igws = vpc_conn.get_all_internet_gateways(
filters={'attachment.vpc-id': vpc.id}
)

View File

@ -1,5 +1,7 @@
from __future__ import unicode_literals
import boto
import boto.ec2
import boto3
import sure # noqa
from moto import mock_ec2
@ -9,16 +11,39 @@ from moto import mock_ec2
def test_describe_regions():
conn = boto.connect_ec2('the_key', 'the_secret')
regions = conn.get_all_regions()
regions.should.have.length_of(8)
regions[0].name.should.equal('eu-west-1')
regions[0].endpoint.should.equal('ec2.eu-west-1.amazonaws.com')
regions.should.have.length_of(14)
for region in regions:
region.endpoint.should.contain(region.name)
@mock_ec2
def test_availability_zones():
# Just testing us-east-1 for now
conn = boto.connect_ec2('the_key', 'the_secret')
zones = conn.get_all_zones()
zones.should.have.length_of(5)
zones[0].name.should.equal('us-east-1a')
zones[0].region_name.should.equal('us-east-1')
regions = conn.get_all_regions()
for region in regions:
conn = boto.ec2.connect_to_region(region.name)
if conn is None:
continue
for zone in conn.get_all_zones():
zone.name.should.contain(region.name)
@mock_ec2
def test_boto3_describe_regions():
ec2 = boto3.client('ec2', 'us-east-1')
resp = ec2.describe_regions()
resp['Regions'].should.have.length_of(14)
for rec in resp['Regions']:
rec['Endpoint'].should.contain(rec['RegionName'])
@mock_ec2
def test_boto3_availability_zones():
ec2 = boto3.client('ec2', 'us-east-1')
resp = ec2.describe_regions()
regions = [r['RegionName'] for r in resp['Regions']]
for region in regions:
conn = boto3.client('ec2', region)
resp = conn.describe_availability_zones()
for rec in resp['AvailabilityZones']:
rec['ZoneName'].should.contain(region)

View File

@ -52,15 +52,12 @@ def test_key_pairs_create_two():
kp = conn.create_key_pair('bar')
assert kp.material.startswith('---- BEGIN RSA PRIVATE KEY ----')
kps = conn.get_all_key_pairs()
assert len(kps) == 2
# on Python 3, these are reversed for some reason
if six.PY3:
return
assert kps[0].name == 'foo'
assert kps[1].name == 'bar'
kps.should.have.length_of(2)
[i.name for i in kps].should.contain('foo')
[i.name for i in kps].should.contain('bar')
kps = conn.get_all_key_pairs('foo')
assert len(kps) == 1
assert kps[0].name == 'foo'
kps.should.have.length_of(1)
kps[0].name.should.equal('foo')
@mock_ec2

View File

@ -7,41 +7,37 @@ from moto import mock_ec2
@mock_ec2
def test_default_network_acl_created_with_vpc():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(1)
@mock_ec2
def test_network_acls():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
network_acl = conn.create_network_acl(vpc.id)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(2)
@mock_ec2
def test_new_subnet_associates_with_default_network_acl():
@mock_ec2
def test_network_acls():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
network_acl = conn.create_network_acl(vpc.id)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(3)
@mock_ec2
def test_new_subnet_associates_with_default_network_acl():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.get_all_vpcs()[0]
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(1)
acl = all_network_acls[0]
acl.associations.should.have.length_of(1)
acl.associations[0].subnet_id.should.equal(subnet.id)
acl.associations.should.have.length_of(4)
[a.subnet_id for a in acl.associations].should.contain(subnet.id)
@mock_ec2
def test_network_acl_entries():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
@ -55,7 +51,7 @@ def test_network_acl_entries():
)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(2)
all_network_acls.should.have.length_of(3)
test_network_acl = next(na for na in all_network_acls
if na.id == network_acl.id)
@ -68,7 +64,6 @@ def test_network_acl_entries():
@mock_ec2
def test_associate_new_network_acl_with_subnet():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
@ -77,7 +72,7 @@ def test_associate_new_network_acl_with_subnet():
conn.associate_network_acl(network_acl.id, subnet.id)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(2)
all_network_acls.should.have.length_of(3)
test_network_acl = next(na for na in all_network_acls
if na.id == network_acl.id)
@ -88,28 +83,26 @@ def test_associate_new_network_acl_with_subnet():
@mock_ec2
def test_delete_network_acl():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
network_acl = conn.create_network_acl(vpc.id)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(2)
all_network_acls.should.have.length_of(3)
any(acl.id == network_acl.id for acl in all_network_acls).should.be.ok
conn.delete_network_acl(network_acl.id)
updated_network_acls = conn.get_all_network_acls()
updated_network_acls.should.have.length_of(1)
updated_network_acls.should.have.length_of(2)
any(acl.id == network_acl.id for acl in updated_network_acls).shouldnt.be.ok
@mock_ec2
def test_network_acl_tagging():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
network_acl = conn.create_network_acl(vpc.id)
@ -125,4 +118,3 @@ def test_network_acl_tagging():
if na.id == network_acl.id)
test_network_acl.tags.should.have.length_of(1)
test_network_acl.tags["a key"].should.equal("some value")

View File

@ -17,7 +17,7 @@ def test_route_tables_defaults():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
all_route_tables = conn.get_all_route_tables()
all_route_tables = conn.get_all_route_tables(filters={'vpc-id': vpc.id})
all_route_tables.should.have.length_of(1)
main_route_table = all_route_tables[0]
@ -33,7 +33,7 @@ def test_route_tables_defaults():
vpc.delete()
all_route_tables = conn.get_all_route_tables()
all_route_tables = conn.get_all_route_tables(filters={'vpc-id': vpc.id})
all_route_tables.should.have.length_of(0)
@ -43,7 +43,7 @@ def test_route_tables_additional():
vpc = conn.create_vpc("10.0.0.0/16")
route_table = conn.create_route_table(vpc.id)
all_route_tables = conn.get_all_route_tables()
all_route_tables = conn.get_all_route_tables(filters={'vpc-id': vpc.id})
all_route_tables.should.have.length_of(2)
all_route_tables[0].vpc_id.should.equal(vpc.id)
all_route_tables[1].vpc_id.should.equal(vpc.id)
@ -67,7 +67,7 @@ def test_route_tables_additional():
conn.delete_route_table(route_table.id)
all_route_tables = conn.get_all_route_tables()
all_route_tables = conn.get_all_route_tables(filters={'vpc-id': vpc.id})
all_route_tables.should.have.length_of(1)
with assert_raises(EC2ResponseError) as cm:
@ -88,11 +88,11 @@ def test_route_tables_filters_standard():
route_table2 = conn.create_route_table(vpc2.id)
all_route_tables = conn.get_all_route_tables()
all_route_tables.should.have.length_of(4)
all_route_tables.should.have.length_of(5)
# Filter by main route table
main_route_tables = conn.get_all_route_tables(filters={'association.main':'true'})
main_route_tables.should.have.length_of(2)
main_route_tables.should.have.length_of(3)
main_route_table_ids = [route_table.id for route_table in main_route_tables]
main_route_table_ids.should_not.contain(route_table1.id)
main_route_table_ids.should_not.contain(route_table2.id)
@ -131,7 +131,7 @@ def test_route_tables_filters_associations():
association_id3 = conn.associate_route_table(route_table2.id, subnet3.id)
all_route_tables = conn.get_all_route_tables()
all_route_tables.should.have.length_of(3)
all_route_tables.should.have.length_of(4)
# Filter by association ID
association1_route_tables = conn.get_all_route_tables(filters={'association.route-table-association-id':association_id1})
@ -160,7 +160,7 @@ def test_route_table_associations():
route_table = conn.create_route_table(vpc.id)
all_route_tables = conn.get_all_route_tables()
all_route_tables.should.have.length_of(2)
all_route_tables.should.have.length_of(3)
# Refresh
route_table = conn.get_all_route_tables(route_table.id)[0]
@ -232,7 +232,7 @@ def test_route_table_replace_route_table_association():
route_table2 = conn.create_route_table(vpc.id)
all_route_tables = conn.get_all_route_tables()
all_route_tables.should.have.length_of(3)
all_route_tables.should.have.length_of(4)
# Refresh
route_table1 = conn.get_all_route_tables(route_table1.id)[0]
@ -330,14 +330,14 @@ def test_route_table_get_by_tag_boto3():
def test_routes_additional():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
main_route_table = conn.get_all_route_tables()[0]
main_route_table = conn.get_all_route_tables(filters={'vpc-id': vpc.id})[0]
local_route = main_route_table.routes[0]
igw = conn.create_internet_gateway()
ROUTE_CIDR = "10.0.0.4/24"
conn.create_route(main_route_table.id, ROUTE_CIDR, gateway_id=igw.id)
main_route_table = conn.get_all_route_tables()[0] # Refresh route table
main_route_table = conn.get_all_route_tables(filters={'vpc-id': vpc.id})[0] # Refresh route table
main_route_table.routes.should.have.length_of(2)
new_routes = [route for route in main_route_table.routes if route.destination_cidr_block != vpc.cidr_block]
@ -351,7 +351,7 @@ def test_routes_additional():
conn.delete_route(main_route_table.id, ROUTE_CIDR)
main_route_table = conn.get_all_route_tables()[0] # Refresh route table
main_route_table = conn.get_all_route_tables(filters={'vpc-id': vpc.id})[0] # Refresh route table
main_route_table.routes.should.have.length_of(1)
new_routes = [route for route in main_route_table.routes if route.destination_cidr_block != vpc.cidr_block]

View File

@ -37,7 +37,7 @@ def test_create_and_describe_security_group():
cm.exception.request_id.should_not.be.none
all_groups = conn.get_all_security_groups()
all_groups.should.have.length_of(2) # The default group gets created automatically
all_groups.should.have.length_of(3) # The default group gets created automatically
group_names = [group.name for group in all_groups]
set(group_names).should.equal(set(["default", "test security group"]))
@ -57,7 +57,7 @@ def test_create_security_group_without_description_raises_error():
def test_default_security_group():
conn = boto.ec2.connect_to_region('us-east-1')
groups = conn.get_all_security_groups()
groups.should.have.length_of(1)
groups.should.have.length_of(2)
groups[0].name.should.equal("default")
@ -98,7 +98,7 @@ def test_create_two_security_groups_with_same_name_in_different_vpc():
all_groups = conn.get_all_security_groups()
all_groups.should.have.length_of(3)
all_groups.should.have.length_of(4)
group_names = [group.name for group in all_groups]
# The default group is created automatically
set(group_names).should.equal(set(["default", "test security group"]))
@ -110,7 +110,7 @@ def test_deleting_security_groups():
security_group1 = conn.create_security_group('test1', 'test1')
conn.create_security_group('test2', 'test2')
conn.get_all_security_groups().should.have.length_of(3) # We need to include the default security group
conn.get_all_security_groups().should.have.length_of(4)
# Deleting a group that doesn't exist should throw an error
with assert_raises(EC2ResponseError) as cm:
@ -127,11 +127,11 @@ def test_deleting_security_groups():
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the DeleteSecurityGroup operation: Request would have succeeded, but DryRun flag is set')
conn.delete_security_group('test2')
conn.get_all_security_groups().should.have.length_of(2)
conn.get_all_security_groups().should.have.length_of(3)
# Delete by group id
conn.delete_security_group(group_id=security_group1.id)
conn.get_all_security_groups().should.have.length_of(1)
conn.get_all_security_groups().should.have.length_of(2)
@mock_ec2
@ -267,6 +267,7 @@ def test_authorize_other_group_egress_and_revoke():
sg01.revoke_egress(IpPermissions=[ip_permission])
sg01.ip_permissions_egress.should.have.length_of(1)
@mock_ec2
def test_authorize_group_in_vpc():
conn = boto.connect_ec2('the_key', 'the_secret')
@ -316,7 +317,7 @@ def test_get_all_security_groups():
resp[0].id.should.equal(sg1.id)
resp = conn.get_all_security_groups()
resp.should.have.length_of(3) # We need to include the default group here
resp.should.have.length_of(4)
@mock_ec2
@ -376,7 +377,7 @@ def test_authorize_all_protocols_with_no_port_specification():
sg.rules[0].from_port.should.equal(None)
sg.rules[0].to_port.should.equal(None)
'''
Boto3
'''

View File

@ -16,17 +16,18 @@ from moto import mock_cloudformation, mock_ec2
@mock_ec2
def test_subnets():
ec2 = boto.connect_ec2('the_key', 'the_secret')
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
all_subnets = conn.get_all_subnets()
all_subnets.should.have.length_of(1)
all_subnets.should.have.length_of(1 + len(ec2.get_all_zones()))
conn.delete_subnet(subnet.id)
all_subnets = conn.get_all_subnets()
all_subnets.should.have.length_of(0)
all_subnets.should.have.length_of(0 + len(ec2.get_all_zones()))
with assert_raises(EC2ResponseError) as cm:
conn.delete_subnet(subnet.id)
@ -59,7 +60,7 @@ def test_subnet_tagging():
tag.value.should.equal("some value")
# Refresh the subnet
subnet = conn.get_all_subnets()[0]
subnet = conn.get_all_subnets(subnet_ids=[subnet.id])[0]
subnet.tags.should.have.length_of(1)
subnet.tags["a key"].should.equal("some value")
@ -76,22 +77,32 @@ def test_subnet_should_have_proper_availability_zone_set():
def test_default_subnet():
ec2 = boto3.resource('ec2', region_name='us-west-1')
# Create the default VPC
default_vpc = ec2.create_vpc(CidrBlock='172.31.0.0/16')
default_vpc = list(ec2.vpcs.all())[0]
default_vpc.cidr_block.should.equal('172.31.0.0/16')
default_vpc.reload()
default_vpc.is_default.should.be.ok
subnet = ec2.create_subnet(VpcId=default_vpc.id, CidrBlock='172.31.0.0/20', AvailabilityZone='us-west-1a')
subnet.reload()
subnet.map_public_ip_on_launch.should.be.ok
subnet.map_public_ip_on_launch.shouldnt.be.ok
@mock_ec2
def test_non_default_subnet():
ec2 = boto3.resource('ec2', region_name='us-west-1')
vpc_cli = boto.vpc.connect_to_region('us-west-1')
# Create the default VPC
ec2.create_vpc(CidrBlock='172.31.0.0/16')
# Create the non default VPC
vpc = vpc_cli.create_vpc("10.0.0.0/16")
vpc.is_default.shouldnt.be.ok
subnet = vpc_cli.create_subnet(vpc.id, "10.0.0.0/24")
subnet = vpc_cli.get_all_subnets(subnet_ids=[subnet.id])[0]
subnet.mapPublicIpOnLaunch.should.equal('false')
@mock_ec2
def test_boto3_non_default_subnet():
ec2 = boto3.resource('ec2', region_name='us-west-1')
# Create the non default VPC
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
@ -108,10 +119,9 @@ def test_modify_subnet_attribute():
ec2 = boto3.resource('ec2', region_name='us-west-1')
client = boto3.client('ec2', region_name='us-west-1')
# Create the default VPC
ec2.create_vpc(CidrBlock='172.31.0.0/16')
# Get the default VPC
vpc = list(ec2.vpcs.all())[0]
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet = ec2.create_subnet(VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-1a')
# 'map_public_ip_on_launch' is set when calling 'DescribeSubnets' action
@ -120,11 +130,15 @@ def test_modify_subnet_attribute():
# For non default subnet, attribute value should be 'False'
subnet.map_public_ip_on_launch.shouldnt.be.ok
client.modify_subnet_attribute(SubnetId=subnet.id, MapPublicIpOnLaunch={'Value': False})
subnet.reload()
subnet.map_public_ip_on_launch.shouldnt.be.ok
client.modify_subnet_attribute(SubnetId=subnet.id, MapPublicIpOnLaunch={'Value': True})
subnet.reload()
subnet.map_public_ip_on_launch.should.be.ok
@mock_ec2
def test_modify_subnet_attribute_validation():
ec2 = boto3.resource('ec2', region_name='us-west-1')
@ -135,8 +149,10 @@ def test_modify_subnet_attribute_validation():
with assert_raises(ParamValidationError):
client.modify_subnet_attribute(SubnetId=subnet.id, MapPublicIpOnLaunch={'Value': 'invalid'})
@mock_ec2
def test_get_subnets_filtering():
ec2 = boto.ec2.connect_to_region('us-west-1')
conn = boto.vpc.connect_to_region('us-west-1')
vpcA = conn.create_vpc("10.0.0.0/16")
subnetA = conn.create_subnet(vpcA.id, "10.0.0.0/24", availability_zone='us-west-1a')
@ -145,7 +161,7 @@ def test_get_subnets_filtering():
subnetB2 = conn.create_subnet(vpcB.id, "10.0.1.0/24", availability_zone='us-west-1b')
all_subnets = conn.get_all_subnets()
all_subnets.should.have.length_of(3)
all_subnets.should.have.length_of(3 + len(ec2.get_all_zones()))
# Filter by VPC ID
subnets_by_vpc = conn.get_all_subnets(filters={'vpc-id': vpcB.id})
@ -181,9 +197,9 @@ def test_get_subnets_filtering():
set([subnet.id for subnet in subnets_by_az]).should.equal(set([subnetB1.id]))
# Filter by defaultForAz
subnets_by_az = conn.get_all_subnets(filters={'defaultForAz': "true"})
subnets_by_az.should.have.length_of(1)
set([subnet.id for subnet in subnets_by_az]).should.equal(set([subnetA.id]))
subnets_by_az.should.have.length_of(len(conn.get_all_zones()))
# Unsupported filter
conn.get_all_subnets.when.called_with(filters={'not-implemented-filter': 'foobar'}).should.throw(NotImplementedError)

View File

@ -21,12 +21,12 @@ def test_vpcs():
vpc.cidr_block.should.equal('10.0.0.0/16')
all_vpcs = conn.get_all_vpcs()
all_vpcs.should.have.length_of(1)
all_vpcs.should.have.length_of(2)
vpc.delete()
all_vpcs = conn.get_all_vpcs()
all_vpcs.should.have.length_of(0)
all_vpcs.should.have.length_of(1)
with assert_raises(EC2ResponseError) as cm:
conn.delete_vpc("vpc-1234abcd")
@ -40,14 +40,14 @@ def test_vpc_defaults():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
conn.get_all_vpcs().should.have.length_of(1)
conn.get_all_route_tables().should.have.length_of(1)
conn.get_all_vpcs().should.have.length_of(2)
conn.get_all_route_tables().should.have.length_of(2)
conn.get_all_security_groups(filters={'vpc-id': [vpc.id]}).should.have.length_of(1)
vpc.delete()
conn.get_all_vpcs().should.have.length_of(0)
conn.get_all_route_tables().should.have.length_of(0)
conn.get_all_vpcs().should.have.length_of(1)
conn.get_all_route_tables().should.have.length_of(1)
conn.get_all_security_groups(filters={'vpc-id': [vpc.id]}).should.have.length_of(0)
@mock_ec2
@ -56,7 +56,7 @@ def test_vpc_isdefault_filter():
vpc = conn.create_vpc("10.0.0.0/16")
conn.get_all_vpcs(filters={'isDefault': 'true'}).should.have.length_of(1)
vpc.delete()
conn.get_all_vpcs(filters={'isDefault': 'true'}).should.have.length_of(0)
conn.get_all_vpcs(filters={'isDefault': 'true'}).should.have.length_of(1)
@mock_ec2
@ -65,10 +65,10 @@ def test_multiple_vpcs_default_filter():
conn.create_vpc("10.8.0.0/16")
conn.create_vpc("10.0.0.0/16")
conn.create_vpc("192.168.0.0/16")
conn.get_all_vpcs().should.have.length_of(3)
conn.get_all_vpcs().should.have.length_of(4)
vpc = conn.get_all_vpcs(filters={'isDefault': 'true'})
vpc.should.have.length_of(1)
vpc[0].cidr_block.should.equal('10.8.0.0/16')
vpc[0].cidr_block.should.equal('172.31.0.0/16')
@mock_ec2
@ -76,9 +76,9 @@ def test_vpc_state_available_filter():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
conn.create_vpc("10.1.0.0/16")
conn.get_all_vpcs(filters={'state': 'available'}).should.have.length_of(2)
conn.get_all_vpcs(filters={'state': 'available'}).should.have.length_of(3)
vpc.delete()
conn.get_all_vpcs(filters={'state': 'available'}).should.have.length_of(1)
conn.get_all_vpcs(filters={'state': 'available'}).should.have.length_of(2)
@mock_ec2
def test_vpc_tagging():
@ -86,13 +86,12 @@ def test_vpc_tagging():
vpc = conn.create_vpc("10.0.0.0/16")
vpc.add_tag("a key", "some value")
tag = conn.get_all_tags()[0]
tag.name.should.equal("a key")
tag.value.should.equal("some value")
# Refresh the vpc
vpc = conn.get_all_vpcs()[0]
vpc = conn.get_all_vpcs(vpc_ids=[vpc.id])[0]
vpc.tags.should.have.length_of(1)
vpc.tags["a key"].should.equal("some value")
@ -245,7 +244,8 @@ def test_default_vpc():
ec2 = boto3.resource('ec2', region_name='us-west-1')
# Create the default VPC
default_vpc = ec2.create_vpc(CidrBlock='172.31.0.0/16')
default_vpc = list(ec2.vpcs.all())[0]
default_vpc.cidr_block.should.equal('172.31.0.0/16')
default_vpc.reload()
default_vpc.is_default.should.be.ok