Some EC2 fixes for alternative regions.

This commit is contained in:
Steve Pulec 2014-10-30 22:46:24 -04:00
parent 0cae959f05
commit 2d65b0a020
16 changed files with 153 additions and 154 deletions

View File

@ -3,6 +3,7 @@ import copy
import itertools import itertools
from collections import defaultdict from collections import defaultdict
from datetime import datetime from datetime import datetime
import re
import six import six
import boto import boto
@ -105,7 +106,7 @@ class StateReason(object):
class TaggedEC2Resource(object): class TaggedEC2Resource(object):
def get_tags(self, *args, **kwargs): def get_tags(self, *args, **kwargs):
tags = ec2_backend.describe_tags(filters={'resource-id': [self.id]}) tags = self.ec2_backend.describe_tags(filters={'resource-id': [self.id]})
return tags return tags
def get_filter_value(self, filter_name): def get_filter_value(self, filter_name):
@ -127,7 +128,9 @@ class TaggedEC2Resource(object):
class NetworkInterface(object): class NetworkInterface(object):
def __init__(self, subnet, private_ip_address, device_index=0, public_ip_auto_assign=True, group_ids=None): def __init__(self, ec2_backend, subnet, private_ip_address, device_index=0,
public_ip_auto_assign=True, group_ids=None):
self.ec2_backend = ec2_backend
self.id = random_eni_id() self.id = random_eni_id()
self.device_index = device_index self.device_index = device_index
self.private_ip_address = private_ip_address self.private_ip_address = private_ip_address
@ -148,11 +151,11 @@ class NetworkInterface(object):
group = None group = None
if group_ids: if group_ids:
for group_id in group_ids: for group_id in group_ids:
group = ec2_backend.get_security_group_from_id(group_id) group = self.ec2_backend.get_security_group_from_id(group_id)
if not group: if not group:
# Create with specific group ID. # Create with specific group ID.
group = SecurityGroup(group_id, group_id, group_id, vpc_id=subnet.vpc_id) group = SecurityGroup(group_id, group_id, group_id, vpc_id=subnet.vpc_id)
ec2_backend.groups[subnet.vpc_id][group_id] = group self.ec2_backend.groups[subnet.vpc_id][group_id] = group
if group: if group:
self._group_set.append(group) self._group_set.append(group)
@ -207,7 +210,7 @@ class NetworkInterfaceBackend(object):
super(NetworkInterfaceBackend, self).__init__() super(NetworkInterfaceBackend, self).__init__()
def create_network_interface(self, subnet, private_ip_address, group_ids=None, **kwargs): def create_network_interface(self, subnet, private_ip_address, group_ids=None, **kwargs):
eni = NetworkInterface(subnet, private_ip_address, group_ids=group_ids) eni = NetworkInterface(self, subnet, private_ip_address, group_ids=group_ids)
self.enis[eni.id] = eni self.enis[eni.id] = eni
return eni return eni
@ -241,7 +244,7 @@ class NetworkInterfaceBackend(object):
enis.append(eni) enis.append(eni)
break break
else: else:
ec2_backend.raise_not_implemented_error("The filter '{0}' for DescribeNetworkInterfaces".format(_filter)) self.raise_not_implemented_error("The filter '{0}' for DescribeNetworkInterfaces".format(_filter))
return enis return enis
def attach_network_interface(self, eni_id, instance_id, device_index): def attach_network_interface(self, eni_id, instance_id, device_index):
@ -760,7 +763,9 @@ class TagBackend(object):
class Ami(TaggedEC2Resource): class Ami(TaggedEC2Resource):
def __init__(self, ami_id, instance=None, source_ami=None, name=None, description=None): def __init__(self, ec2_backend, ami_id, instance=None, source_ami=None,
name=None, description=None):
self.ec2_backend = ec2_backend
self.id = ami_id self.id = ami_id
self.state = "available" self.state = "available"
@ -790,8 +795,8 @@ class Ami(TaggedEC2Resource):
self.launch_permission_groups = set() self.launch_permission_groups = set()
# AWS auto-creates these, we should reflect the same. # AWS auto-creates these, we should reflect the same.
volume = ec2_backend.create_volume(15, "us-east-1a") volume = self.ec2_backend.create_volume(15, "us-east-1a")
self.ebs_snapshot = ec2_backend.create_snapshot(volume.id, "Auto-created snapshot for AMI %s" % self.id) self.ebs_snapshot = self.ec2_backend.create_snapshot(volume.id, "Auto-created snapshot for AMI %s" % self.id)
def get_filter_value(self, filter_name): def get_filter_value(self, filter_name):
if filter_name == 'virtualization-type': if filter_name == 'virtualization-type':
@ -810,7 +815,7 @@ class Ami(TaggedEC2Resource):
filter_value = super(Ami, self).get_filter_value(filter_name) filter_value = super(Ami, self).get_filter_value(filter_name)
if filter_value is None: if filter_value is None:
ec2_backend.raise_not_implemented_error("The filter '{0}' for DescribeImages".format(filter_name)) self.ec2_backend.raise_not_implemented_error("The filter '{0}' for DescribeImages".format(filter_name))
return filter_value return filter_value
@ -824,14 +829,14 @@ class AmiBackend(object):
# TODO: check that instance exists and pull info from it. # TODO: check that instance exists and pull info from it.
ami_id = random_ami_id() ami_id = random_ami_id()
instance = self.get_instance(instance_id) instance = self.get_instance(instance_id)
ami = Ami(ami_id, instance=instance, source_ami=None, name=name, description=description) ami = Ami(self, ami_id, instance=instance, source_ami=None, name=name, description=description)
self.amis[ami_id] = ami self.amis[ami_id] = ami
return ami return ami
def copy_image(self, source_image_id, source_region, name=None, description=None): def copy_image(self, source_image_id, source_region, name=None, description=None):
source_ami = ec2_backends[source_region].describe_images(ami_ids=[source_image_id])[0] source_ami = ec2_backends[source_region].describe_images(ami_ids=[source_image_id])[0]
ami_id = random_ami_id() ami_id = random_ami_id()
ami = Ami(ami_id, instance=None, source_ami=source_ami, name=name, description=description) ami = Ami(self, ami_id, instance=None, source_ami=source_ami, name=name, description=description)
self.amis[ami_id] = ami self.amis[ami_id] = ami
return ami return ami
@ -863,7 +868,7 @@ class AmiBackend(object):
def add_launch_permission(self, ami_id, user_id=None, group=None): def add_launch_permission(self, ami_id, user_id=None, group=None):
if user_id: if user_id:
ec2_backend.raise_not_implemented_error("The UserId parameter for ModifyImageAttribute") self.raise_not_implemented_error("The UserId parameter for ModifyImageAttribute")
if group != 'all': if group != 'all':
raise InvalidAMIAttributeItemValueError("UserGroup", group) raise InvalidAMIAttributeItemValueError("UserGroup", group)
@ -873,7 +878,7 @@ class AmiBackend(object):
def remove_launch_permission(self, ami_id, user_id=None, group=None): def remove_launch_permission(self, ami_id, user_id=None, group=None):
if user_id: if user_id:
ec2_backend.raise_not_implemented_error("The UserId parameter for ModifyImageAttribute") self.raise_not_implemented_error("The UserId parameter for ModifyImageAttribute")
if group != 'all': if group != 'all':
raise InvalidAMIAttributeItemValueError("UserGroup", group) raise InvalidAMIAttributeItemValueError("UserGroup", group)
@ -1104,7 +1109,7 @@ class SecurityGroupBackend(object):
if name == 'default': if name == 'default':
# If the request is for the default group and it does not exist, create it # If the request is for the default group and it does not exist, create it
default_group = ec2_backend.create_security_group("default", "The default security group", vpc_id=vpc_id, force=True) default_group = self.create_security_group("default", "The default security group", vpc_id=vpc_id, force=True)
return default_group return default_group
def authorize_security_group_ingress(self, def authorize_security_group_ingress(self,
@ -1318,7 +1323,7 @@ class EBSBackend(object):
def add_create_volume_permission(self, snapshot_id, user_id=None, group=None): def add_create_volume_permission(self, snapshot_id, user_id=None, group=None):
if user_id: if user_id:
ec2_backend.raise_not_implemented_error("The UserId parameter for ModifySnapshotAttribute") self.raise_not_implemented_error("The UserId parameter for ModifySnapshotAttribute")
if group != 'all': if group != 'all':
raise InvalidAMIAttributeItemValueError("UserGroup", group) raise InvalidAMIAttributeItemValueError("UserGroup", group)
@ -1328,7 +1333,7 @@ class EBSBackend(object):
def remove_create_volume_permission(self, snapshot_id, user_id=None, group=None): def remove_create_volume_permission(self, snapshot_id, user_id=None, group=None):
if user_id: if user_id:
ec2_backend.raise_not_implemented_error("The UserId parameter for ModifySnapshotAttribute") self.raise_not_implemented_error("The UserId parameter for ModifySnapshotAttribute")
if group != 'all': if group != 'all':
raise InvalidAMIAttributeItemValueError("UserGroup", group) raise InvalidAMIAttributeItemValueError("UserGroup", group)
@ -1338,7 +1343,8 @@ class EBSBackend(object):
class VPC(TaggedEC2Resource): class VPC(TaggedEC2Resource):
def __init__(self, vpc_id, cidr_block): def __init__(self, ec2_backend, vpc_id, cidr_block):
self.ec2_backend = ec2_backend
self.id = vpc_id self.id = vpc_id
self.cidr_block = cidr_block self.cidr_block = cidr_block
self.dhcp_options = None self.dhcp_options = None
@ -1369,7 +1375,7 @@ class VPC(TaggedEC2Resource):
filter_value = super(VPC, self).get_filter_value(filter_name) filter_value = super(VPC, self).get_filter_value(filter_name)
if filter_value is None: if filter_value is None:
ec2_backend.raise_not_implemented_error("The filter '{0}' for DescribeVPCs".format(filter_name)) self.ec2_backend.raise_not_implemented_error("The filter '{0}' for DescribeVPCs".format(filter_name))
return filter_value return filter_value
@ -1381,15 +1387,15 @@ class VPCBackend(object):
def create_vpc(self, cidr_block): def create_vpc(self, cidr_block):
vpc_id = random_vpc_id() vpc_id = random_vpc_id()
vpc = VPC(vpc_id, cidr_block) vpc = VPC(self, vpc_id, cidr_block)
self.vpcs[vpc_id] = vpc self.vpcs[vpc_id] = vpc
# AWS creates a default main route table and security group. # AWS creates a default main route table and security group.
main_route_table = self.create_route_table(vpc_id, main=True) main_route_table = self.create_route_table(vpc_id, main=True)
default = ec2_backend.get_security_group_from_name('default', vpc_id=vpc_id) default = self.get_security_group_from_name('default', vpc_id=vpc_id)
if not default: if not default:
ec2_backend.create_security_group('default', 'default VPC security group', vpc_id=vpc_id) self.create_security_group('default', 'default VPC security group', vpc_id=vpc_id)
return vpc return vpc
@ -1408,19 +1414,19 @@ class VPCBackend(object):
def delete_vpc(self, vpc_id): def delete_vpc(self, vpc_id):
# Delete route table if only main route table remains. # Delete route table if only main route table remains.
route_tables = ec2_backend.get_all_route_tables(filters={'vpc-id':vpc_id}) route_tables = self.get_all_route_tables(filters={'vpc-id':vpc_id})
if len(route_tables) > 1: if len(route_tables) > 1:
raise DependencyViolationError( raise DependencyViolationError(
"The vpc {0} has dependencies and cannot be deleted." "The vpc {0} has dependencies and cannot be deleted."
.format(vpc_id) .format(vpc_id)
) )
for route_table in route_tables: for route_table in route_tables:
ec2_backend.delete_route_table(route_table.id) self.delete_route_table(route_table.id)
# Delete default security group if exists. # Delete default security group if exists.
default = ec2_backend.get_security_group_from_name('default', vpc_id=vpc_id) default = self.get_security_group_from_name('default', vpc_id=vpc_id)
if default: if default:
ec2_backend.delete_security_group(group_id=default.id) self.delete_security_group(group_id=default.id)
# Now delete VPC. # Now delete VPC.
vpc = self.vpcs.pop(vpc_id, None) vpc = self.vpcs.pop(vpc_id, None)
@ -1467,8 +1473,8 @@ class VPCPeeringConnection(TaggedEC2Resource):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json): def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
properties = cloudformation_json['Properties'] properties = cloudformation_json['Properties']
vpc = self.get_vpc(properties['VpcId']) vpc = get_vpc(properties['VpcId'])
peer_vpc = self.get_vpc(properties['PeerVpcId']) peer_vpc = get_vpc(properties['PeerVpcId'])
vpc_pcx = ec2_backend.create_vpc_peering_connection(vpc, peer_vpc) vpc_pcx = ec2_backend.create_vpc_peering_connection(vpc, peer_vpc)
@ -1521,7 +1527,8 @@ class VPCPeeringConnectionBackend(object):
class Subnet(TaggedEC2Resource): class Subnet(TaggedEC2Resource):
def __init__(self, subnet_id, vpc_id, cidr_block): def __init__(self, ec2_backend, subnet_id, vpc_id, cidr_block):
self.ec2_backend = ec2_backend
self.id = subnet_id self.id = subnet_id
self.vpc_id = vpc_id self.vpc_id = vpc_id
self.cidr_block = cidr_block self.cidr_block = cidr_block
@ -1552,7 +1559,7 @@ class Subnet(TaggedEC2Resource):
filter_value = super(Subnet, self).get_filter_value(filter_name) filter_value = super(Subnet, self).get_filter_value(filter_name)
if filter_value is None: if filter_value is None:
ec2_backend.raise_not_implemented_error("The filter '{0}' for DescribeSubnets".format(filter_name)) self.ec2_backend.raise_not_implemented_error("The filter '{0}' for DescribeSubnets".format(filter_name))
return filter_value return filter_value
@ -1576,7 +1583,7 @@ class SubnetBackend(object):
def create_subnet(self, vpc_id, cidr_block): def create_subnet(self, vpc_id, cidr_block):
subnet_id = random_subnet_id() subnet_id = random_subnet_id()
subnet = Subnet(subnet_id, vpc_id, cidr_block) subnet = Subnet(self, subnet_id, vpc_id, cidr_block)
vpc = self.get_vpc(vpc_id) # Validate VPC exists vpc = self.get_vpc(vpc_id) # Validate VPC exists
self.subnets[subnet_id] = subnet self.subnets[subnet_id] = subnet
return subnet return subnet
@ -1624,7 +1631,8 @@ class SubnetRouteTableAssociationBackend(object):
class RouteTable(TaggedEC2Resource): class RouteTable(TaggedEC2Resource):
def __init__(self, route_table_id, vpc_id, main=False): def __init__(self, ec2_backend, route_table_id, vpc_id, main=False):
self.ec2_backend = ec2_backend
self.id = route_table_id self.id = route_table_id
self.vpc_id = vpc_id self.vpc_id = vpc_id
self.main = main self.main = main
@ -1665,7 +1673,7 @@ class RouteTable(TaggedEC2Resource):
filter_value = super(RouteTable, self).get_filter_value(filter_name) filter_value = super(RouteTable, self).get_filter_value(filter_name)
if filter_value is None: if filter_value is None:
ec2_backend.raise_not_implemented_error("The filter '{0}' for DescribeRouteTables".format(filter_name)) self.ec2_backend.raise_not_implemented_error("The filter '{0}' for DescribeRouteTables".format(filter_name))
return filter_value return filter_value
@ -1678,7 +1686,7 @@ class RouteTableBackend(object):
def create_route_table(self, vpc_id, main=False): def create_route_table(self, vpc_id, main=False):
route_table_id = random_route_table_id() route_table_id = random_route_table_id()
vpc = self.get_vpc(vpc_id) # Validate VPC exists vpc = self.get_vpc(vpc_id) # Validate VPC exists
route_table = RouteTable(route_table_id, vpc_id, main=main) route_table = RouteTable(self, route_table_id, vpc_id, main=main)
self.route_tables[route_table_id] = route_table self.route_tables[route_table_id] = route_table
# AWS creates a default local route. # AWS creates a default local route.
@ -1715,7 +1723,7 @@ class RouteTableBackend(object):
def associate_route_table(self, route_table_id, subnet_id): def associate_route_table(self, route_table_id, subnet_id):
# Idempotent if association already exists. # Idempotent if association already exists.
route_tables_by_subnet = ec2_backend.get_all_route_tables(filters={'association.subnet-id':[subnet_id]}) route_tables_by_subnet = self.get_all_route_tables(filters={'association.subnet-id':[subnet_id]})
if route_tables_by_subnet: if route_tables_by_subnet:
for association_id,check_subnet_id in route_tables_by_subnet[0].associations.items(): for association_id,check_subnet_id in route_tables_by_subnet[0].associations.items():
if subnet_id == check_subnet_id: if subnet_id == check_subnet_id:
@ -1736,12 +1744,12 @@ class RouteTableBackend(object):
def replace_route_table_association(self, association_id, route_table_id): def replace_route_table_association(self, association_id, route_table_id):
# Idempotent if association already exists. # Idempotent if association already exists.
new_route_table = ec2_backend.get_route_table(route_table_id) new_route_table = self.get_route_table(route_table_id)
if association_id in new_route_table.associations: if association_id in new_route_table.associations:
return association_id return association_id
# Find route table which currently has the association, error if none. # Find route table which currently has the association, error if none.
route_tables_by_association_id = ec2_backend.get_all_route_tables(filters={'association.route-table-association-id':[association_id]}) route_tables_by_association_id = self.get_all_route_tables(filters={'association.route-table-association-id':[association_id]})
if not route_tables_by_association_id: if not route_tables_by_association_id:
raise InvalidAssociationIdError(association_id) raise InvalidAssociationIdError(association_id)
@ -1794,7 +1802,7 @@ class RouteBackend(object):
route_table = self.get_route_table(route_table_id) route_table = self.get_route_table(route_table_id)
if interface_id: if interface_id:
ec2_backend.raise_not_implemented_error("CreateRoute to NetworkInterfaceId") self.raise_not_implemented_error("CreateRoute to NetworkInterfaceId")
route = Route(route_table, destination_cidr_block, local=local, route = Route(route_table, destination_cidr_block, local=local,
internet_gateway=self.get_internet_gateway(gateway_id) if gateway_id else None, internet_gateway=self.get_internet_gateway(gateway_id) if gateway_id else None,
@ -1812,7 +1820,7 @@ class RouteBackend(object):
route = route_table.routes[route_id] route = route_table.routes[route_id]
if interface_id: if interface_id:
ec2_backend.raise_not_implemented_error("ReplaceRoute to NetworkInterfaceId") self.raise_not_implemented_error("ReplaceRoute to NetworkInterfaceId")
route.internet_gateway = self.get_internet_gateway(gateway_id) if gateway_id else None route.internet_gateway = self.get_internet_gateway(gateway_id) if gateway_id else None
route.instance = self.get_instance(instance_id) if instance_id else None route.instance = self.get_instance(instance_id) if instance_id else None
@ -1837,7 +1845,8 @@ class RouteBackend(object):
class InternetGateway(TaggedEC2Resource): class InternetGateway(TaggedEC2Resource):
def __init__(self): def __init__(self, ec2_backend):
self.ec2_backend = ec2_backend
self.id = random_internet_gateway_id() self.id = random_internet_gateway_id()
self.vpc = None self.vpc = None
@ -1856,7 +1865,7 @@ class InternetGatewayBackend(object):
super(InternetGatewayBackend, self).__init__() super(InternetGatewayBackend, self).__init__()
def create_internet_gateway(self): def create_internet_gateway(self):
igw = InternetGateway() igw = InternetGateway(self)
self.internet_gateways[igw.id] = igw self.internet_gateways[igw.id] = igw
return igw return igw
@ -1930,12 +1939,14 @@ class VPCGatewayAttachmentBackend(object):
class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource): class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource):
def __init__(self, spot_request_id, price, image_id, type, valid_from, def __init__(self, ec2_backend, spot_request_id, price, image_id, type,
valid_until, launch_group, availability_zone_group, key_name, valid_from, valid_until, launch_group, availability_zone_group,
security_groups, user_data, instance_type, placement, kernel_id, key_name, security_groups, user_data, instance_type, placement,
ramdisk_id, monitoring_enabled, subnet_id, **kwargs): kernel_id, ramdisk_id, monitoring_enabled, subnet_id,
**kwargs):
super(SpotInstanceRequest, self).__init__(**kwargs) super(SpotInstanceRequest, self).__init__(**kwargs)
ls = LaunchSpecification() ls = LaunchSpecification()
self.ec2_backend = ec2_backend
self.launch_specification = ls self.launch_specification = ls
self.id = spot_request_id self.id = spot_request_id
self.state = "open" self.state = "open"
@ -1957,12 +1968,12 @@ class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource):
if security_groups: if security_groups:
for group_name in security_groups: for group_name in security_groups:
group = ec2_backend.get_security_group_from_name(group_name) group = self.ec2_backend.get_security_group_from_name(group_name)
if group: if group:
ls.groups.append(group) ls.groups.append(group)
else: else:
# If not security groups, add the default # If not security groups, add the default
default_group = ec2_backend.get_security_group_from_name("default") default_group = self.ec2_backend.get_security_group_from_name("default")
ls.groups.append(default_group) ls.groups.append(default_group)
def get_filter_value(self, filter_name): def get_filter_value(self, filter_name):
@ -1973,7 +1984,7 @@ class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource):
filter_value = super(SpotInstanceRequest, self).get_filter_value(filter_name) filter_value = super(SpotInstanceRequest, self).get_filter_value(filter_name)
if filter_value is None: if filter_value is None:
ec2_backend.raise_not_implemented_error("The filter '{0}' for DescribeSpotInstanceRequests".format(filter_name)) self.ec2_backend.raise_not_implemented_error("The filter '{0}' for DescribeSpotInstanceRequests".format(filter_name))
return filter_value return filter_value
@ -1992,7 +2003,7 @@ class SpotRequestBackend(object):
requests = [] requests = []
for _ in range(count): for _ in range(count):
spot_request_id = random_spot_request_id() spot_request_id = random_spot_request_id()
request = SpotInstanceRequest( request = SpotInstanceRequest(self,
spot_request_id, price, image_id, type, valid_from, valid_until, spot_request_id, price, image_id, type, valid_from, valid_until,
launch_group, availability_zone_group, key_name, security_groups, launch_group, availability_zone_group, key_name, security_groups,
user_data, instance_type, placement, kernel_id, ramdisk_id, user_data, instance_type, placement, kernel_id, ramdisk_id,
@ -2157,9 +2168,10 @@ class ElasticAddressBackend(object):
class DHCPOptionsSet(TaggedEC2Resource): class DHCPOptionsSet(TaggedEC2Resource):
def __init__(self, domain_name_servers=None, domain_name=None, def __init__(self, ec2_backend, domain_name_servers=None, domain_name=None,
ntp_servers=None, netbios_name_servers=None, ntp_servers=None, netbios_name_servers=None,
netbios_node_type=None): netbios_node_type=None):
self.ec2_backend = ec2_backend
self._options = { self._options = {
"domain-name-servers": domain_name_servers, "domain-name-servers": domain_name_servers,
"domain-name": domain_name, "domain-name": domain_name,
@ -2199,7 +2211,7 @@ class DHCPOptionsSetBackend(object):
raise InvalidParameterValueError(netbios_node_type) raise InvalidParameterValueError(netbios_node_type)
options = DHCPOptionsSet( options = DHCPOptionsSet(
domain_name_servers, domain_name, ntp_servers, self, domain_name_servers, domain_name, ntp_servers,
netbios_name_servers, netbios_node_type netbios_name_servers, netbios_node_type
) )
self.dhcp_options_sets[options.id] = options self.dhcp_options_sets[options.id] = options

View File

@ -2,7 +2,6 @@ from __future__ import unicode_literals
from jinja2 import Template from jinja2 import Template
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import instance_ids_from_querystring, image_ids_from_querystring, filters_from_querystring from moto.ec2.utils import instance_ids_from_querystring, image_ids_from_querystring, filters_from_querystring
@ -15,7 +14,7 @@ class AmisResponse(BaseResponse):
description = "" description = ""
instance_ids = instance_ids_from_querystring(self.querystring) instance_ids = instance_ids_from_querystring(self.querystring)
instance_id = instance_ids[0] instance_id = instance_ids[0]
image = ec2_backend.create_image(instance_id, name, description) image = self.ec2_backend.create_image(instance_id, name, description)
template = Template(CREATE_IMAGE_RESPONSE) template = Template(CREATE_IMAGE_RESPONSE)
return template.render(image=image) return template.render(image=image)
@ -24,26 +23,26 @@ class AmisResponse(BaseResponse):
source_region = self.querystring.get('SourceRegion')[0] source_region = self.querystring.get('SourceRegion')[0]
name = self.querystring.get('Name')[0] if self.querystring.get('Name') else None name = self.querystring.get('Name')[0] if self.querystring.get('Name') else None
description = self.querystring.get('Description')[0] if self.querystring.get('Description') else None description = self.querystring.get('Description')[0] if self.querystring.get('Description') else None
image = ec2_backend.copy_image(source_image_id, source_region, name, description) image = self.ec2_backend.copy_image(source_image_id, source_region, name, description)
template = Template(COPY_IMAGE_RESPONSE) template = Template(COPY_IMAGE_RESPONSE)
return template.render(image=image) return template.render(image=image)
def deregister_image(self): def deregister_image(self):
ami_id = self.querystring.get('ImageId')[0] ami_id = self.querystring.get('ImageId')[0]
success = ec2_backend.deregister_image(ami_id) success = self.ec2_backend.deregister_image(ami_id)
template = Template(DEREGISTER_IMAGE_RESPONSE) template = Template(DEREGISTER_IMAGE_RESPONSE)
return template.render(success=str(success).lower()) return template.render(success=str(success).lower())
def describe_images(self): def describe_images(self):
ami_ids = image_ids_from_querystring(self.querystring) ami_ids = image_ids_from_querystring(self.querystring)
filters = filters_from_querystring(self.querystring) filters = filters_from_querystring(self.querystring)
images = ec2_backend.describe_images(ami_ids=ami_ids, filters=filters) images = self.ec2_backend.describe_images(ami_ids=ami_ids, filters=filters)
template = Template(DESCRIBE_IMAGES_RESPONSE) template = Template(DESCRIBE_IMAGES_RESPONSE)
return template.render(images=images) return template.render(images=images)
def describe_image_attribute(self): def describe_image_attribute(self):
ami_id = self.querystring.get('ImageId')[0] ami_id = self.querystring.get('ImageId')[0]
groups = ec2_backend.get_launch_permission_groups(ami_id) groups = self.ec2_backend.get_launch_permission_groups(ami_id)
template = Template(DESCRIBE_IMAGE_ATTRIBUTES_RESPONSE) template = Template(DESCRIBE_IMAGE_ATTRIBUTES_RESPONSE)
return template.render(ami_id=ami_id, groups=groups) return template.render(ami_id=ami_id, groups=groups)
@ -53,9 +52,9 @@ class AmisResponse(BaseResponse):
group = self.querystring.get('UserGroup.1', [None])[0] group = self.querystring.get('UserGroup.1', [None])[0]
user_id = self.querystring.get('UserId.1', [None])[0] user_id = self.querystring.get('UserId.1', [None])[0]
if (operation_type == 'add'): if (operation_type == 'add'):
ec2_backend.add_launch_permission(ami_id, user_id=user_id, group=group) self.ec2_backend.add_launch_permission(ami_id, user_id=user_id, group=group)
elif (operation_type == 'remove'): elif (operation_type == 'remove'):
ec2_backend.remove_launch_permission(ami_id, user_id=user_id, group=group) self.ec2_backend.remove_launch_permission(ami_id, user_id=user_id, group=group)
return MODIFY_IMAGE_ATTRIBUTE_RESPONSE return MODIFY_IMAGE_ATTRIBUTE_RESPONSE
def register_image(self): def register_image(self):

View File

@ -2,17 +2,16 @@ from __future__ import unicode_literals
from jinja2 import Template from jinja2 import Template
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
class AvailabilityZonesAndRegions(BaseResponse): class AvailabilityZonesAndRegions(BaseResponse):
def describe_availability_zones(self): def describe_availability_zones(self):
zones = ec2_backend.describe_availability_zones() zones = self.ec2_backend.describe_availability_zones()
template = Template(DESCRIBE_ZONES_RESPONSE) template = Template(DESCRIBE_ZONES_RESPONSE)
return template.render(zones=zones) return template.render(zones=zones)
def describe_regions(self): def describe_regions(self):
regions = ec2_backend.describe_regions() regions = self.ec2_backend.describe_regions()
template = Template(DESCRIBE_REGIONS_RESPONSE) template = Template(DESCRIBE_REGIONS_RESPONSE)
return template.render(regions=regions) return template.render(regions=regions)

View File

@ -4,7 +4,6 @@ from moto.core.responses import BaseResponse
from moto.ec2.utils import ( from moto.ec2.utils import (
dhcp_configuration_from_querystring, dhcp_configuration_from_querystring,
sequence_from_querystring) sequence_from_querystring)
from moto.ec2.models import ec2_backend
class DHCPOptions(BaseResponse): class DHCPOptions(BaseResponse):
@ -12,10 +11,10 @@ class DHCPOptions(BaseResponse):
dhcp_opt_id = self.querystring.get("DhcpOptionsId", [None])[0] dhcp_opt_id = self.querystring.get("DhcpOptionsId", [None])[0]
vpc_id = self.querystring.get("VpcId", [None])[0] vpc_id = self.querystring.get("VpcId", [None])[0]
dhcp_opt = ec2_backend.describe_dhcp_options([dhcp_opt_id])[0] dhcp_opt = self.ec2_backend.describe_dhcp_options([dhcp_opt_id])[0]
vpc = ec2_backend.get_vpc(vpc_id) vpc = self.ec2_backend.get_vpc(vpc_id)
ec2_backend.associate_dhcp_options(dhcp_opt, vpc) self.ec2_backend.associate_dhcp_options(dhcp_opt, vpc)
template = Template(ASSOCIATE_DHCP_OPTIONS_RESPONSE) template = Template(ASSOCIATE_DHCP_OPTIONS_RESPONSE)
return template.render() return template.render()
@ -31,7 +30,7 @@ class DHCPOptions(BaseResponse):
netbios_name_servers = dhcp_config.get("netbios-name-servers", None) netbios_name_servers = dhcp_config.get("netbios-name-servers", None)
netbios_node_type = dhcp_config.get("netbios-node-type", None) netbios_node_type = dhcp_config.get("netbios-node-type", None)
dhcp_options_set = ec2_backend.create_dhcp_options( dhcp_options_set = self.ec2_backend.create_dhcp_options(
domain_name_servers=domain_name_servers, domain_name_servers=domain_name_servers,
domain_name=domain_name, domain_name=domain_name,
ntp_servers=ntp_servers, ntp_servers=ntp_servers,
@ -44,7 +43,7 @@ class DHCPOptions(BaseResponse):
def delete_dhcp_options(self): def delete_dhcp_options(self):
dhcp_opt_id = self.querystring.get("DhcpOptionsId", [None])[0] dhcp_opt_id = self.querystring.get("DhcpOptionsId", [None])[0]
delete_status = ec2_backend.delete_dhcp_options_set(dhcp_opt_id) delete_status = self.ec2_backend.delete_dhcp_options_set(dhcp_opt_id)
template = Template(DELETE_DHCP_OPTIONS_RESPONSE) template = Template(DELETE_DHCP_OPTIONS_RESPONSE)
return template.render(delete_status=delete_status) return template.render(delete_status=delete_status)
@ -53,9 +52,9 @@ class DHCPOptions(BaseResponse):
raise NotImplementedError("Filtering not supported in describe_dhcp_options.") raise NotImplementedError("Filtering not supported in describe_dhcp_options.")
elif "DhcpOptionsId.1" in self.querystring: elif "DhcpOptionsId.1" in self.querystring:
dhcp_opt_ids = sequence_from_querystring("DhcpOptionsId", self.querystring) dhcp_opt_ids = sequence_from_querystring("DhcpOptionsId", self.querystring)
dhcp_opt = ec2_backend.describe_dhcp_options(dhcp_opt_ids) dhcp_opt = self.ec2_backend.describe_dhcp_options(dhcp_opt_ids)
else: else:
dhcp_opt = ec2_backend.describe_dhcp_options() dhcp_opt = self.ec2_backend.describe_dhcp_options()
template = Template(DESCRIBE_DHCP_OPTIONS_RESPONSE) template = Template(DESCRIBE_DHCP_OPTIONS_RESPONSE)
return template.render(dhcp_options=dhcp_opt) return template.render(dhcp_options=dhcp_opt)

View File

@ -2,7 +2,6 @@ from __future__ import unicode_literals
from jinja2 import Template from jinja2 import Template
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
class ElasticBlockStore(BaseResponse): class ElasticBlockStore(BaseResponse):
@ -11,7 +10,7 @@ class ElasticBlockStore(BaseResponse):
instance_id = self.querystring.get('InstanceId')[0] instance_id = self.querystring.get('InstanceId')[0]
device_path = self.querystring.get('Device')[0] device_path = self.querystring.get('Device')[0]
attachment = ec2_backend.attach_volume(volume_id, instance_id, device_path) attachment = self.ec2_backend.attach_volume(volume_id, instance_id, device_path)
template = Template(ATTACHED_VOLUME_RESPONSE) template = Template(ATTACHED_VOLUME_RESPONSE)
return template.render(attachment=attachment) return template.render(attachment=attachment)
@ -23,34 +22,34 @@ class ElasticBlockStore(BaseResponse):
if 'Description' in self.querystring: if 'Description' in self.querystring:
description = self.querystring.get('Description')[0] description = self.querystring.get('Description')[0]
volume_id = self.querystring.get('VolumeId')[0] volume_id = self.querystring.get('VolumeId')[0]
snapshot = ec2_backend.create_snapshot(volume_id, description) snapshot = self.ec2_backend.create_snapshot(volume_id, description)
template = Template(CREATE_SNAPSHOT_RESPONSE) template = Template(CREATE_SNAPSHOT_RESPONSE)
return template.render(snapshot=snapshot) return template.render(snapshot=snapshot)
def create_volume(self): def create_volume(self):
size = self.querystring.get('Size')[0] size = self.querystring.get('Size')[0]
zone = self.querystring.get('AvailabilityZone')[0] zone = self.querystring.get('AvailabilityZone')[0]
volume = ec2_backend.create_volume(size, zone) volume = self.ec2_backend.create_volume(size, zone)
template = Template(CREATE_VOLUME_RESPONSE) template = Template(CREATE_VOLUME_RESPONSE)
return template.render(volume=volume) return template.render(volume=volume)
def delete_snapshot(self): def delete_snapshot(self):
snapshot_id = self.querystring.get('SnapshotId')[0] snapshot_id = self.querystring.get('SnapshotId')[0]
success = ec2_backend.delete_snapshot(snapshot_id) success = self.ec2_backend.delete_snapshot(snapshot_id)
return DELETE_SNAPSHOT_RESPONSE return DELETE_SNAPSHOT_RESPONSE
def delete_volume(self): def delete_volume(self):
volume_id = self.querystring.get('VolumeId')[0] volume_id = self.querystring.get('VolumeId')[0]
success = ec2_backend.delete_volume(volume_id) success = self.ec2_backend.delete_volume(volume_id)
return DELETE_VOLUME_RESPONSE return DELETE_VOLUME_RESPONSE
def describe_snapshots(self): def describe_snapshots(self):
snapshots = ec2_backend.describe_snapshots() snapshots = self.ec2_backend.describe_snapshots()
template = Template(DESCRIBE_SNAPSHOTS_RESPONSE) template = Template(DESCRIBE_SNAPSHOTS_RESPONSE)
return template.render(snapshots=snapshots) return template.render(snapshots=snapshots)
def describe_volumes(self): def describe_volumes(self):
volumes = ec2_backend.describe_volumes() volumes = self.ec2_backend.describe_volumes()
template = Template(DESCRIBE_VOLUMES_RESPONSE) template = Template(DESCRIBE_VOLUMES_RESPONSE)
return template.render(volumes=volumes) return template.render(volumes=volumes)
@ -65,7 +64,7 @@ class ElasticBlockStore(BaseResponse):
instance_id = self.querystring.get('InstanceId')[0] instance_id = self.querystring.get('InstanceId')[0]
device_path = self.querystring.get('Device')[0] device_path = self.querystring.get('Device')[0]
attachment = ec2_backend.detach_volume(volume_id, instance_id, device_path) attachment = self.ec2_backend.detach_volume(volume_id, instance_id, device_path)
template = Template(DETATCH_VOLUME_RESPONSE) template = Template(DETATCH_VOLUME_RESPONSE)
return template.render(attachment=attachment) return template.render(attachment=attachment)
@ -77,7 +76,7 @@ class ElasticBlockStore(BaseResponse):
def describe_snapshot_attribute(self): def describe_snapshot_attribute(self):
snapshot_id = self.querystring.get('SnapshotId')[0] snapshot_id = self.querystring.get('SnapshotId')[0]
groups = ec2_backend.get_create_volume_permission_groups(snapshot_id) groups = self.ec2_backend.get_create_volume_permission_groups(snapshot_id)
template = Template(DESCRIBE_SNAPSHOT_ATTRIBUTES_RESPONSE) template = Template(DESCRIBE_SNAPSHOT_ATTRIBUTES_RESPONSE)
return template.render(snapshot_id=snapshot_id, groups=groups) return template.render(snapshot_id=snapshot_id, groups=groups)
@ -87,9 +86,9 @@ class ElasticBlockStore(BaseResponse):
group = self.querystring.get('UserGroup.1', [None])[0] group = self.querystring.get('UserGroup.1', [None])[0]
user_id = self.querystring.get('UserId.1', [None])[0] user_id = self.querystring.get('UserId.1', [None])[0]
if (operation_type == 'add'): if (operation_type == 'add'):
ec2_backend.add_create_volume_permission(snapshot_id, user_id=user_id, group=group) self.ec2_backend.add_create_volume_permission(snapshot_id, user_id=user_id, group=group)
elif (operation_type == 'remove'): elif (operation_type == 'remove'):
ec2_backend.remove_create_volume_permission(snapshot_id, user_id=user_id, group=group) self.ec2_backend.remove_create_volume_permission(snapshot_id, user_id=user_id, group=group)
return MODIFY_SNAPSHOT_ATTRIBUTE_RESPONSE return MODIFY_SNAPSHOT_ATTRIBUTE_RESPONSE
def modify_volume_attribute(self): def modify_volume_attribute(self):

View File

@ -2,7 +2,6 @@ from __future__ import unicode_literals
from jinja2 import Template from jinja2 import Template
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import sequence_from_querystring from moto.ec2.utils import sequence_from_querystring
@ -12,7 +11,7 @@ class ElasticIPAddresses(BaseResponse):
domain = self.querystring.get('Domain')[0] domain = self.querystring.get('Domain')[0]
else: else:
domain = "standard" domain = "standard"
address = ec2_backend.allocate_address(domain) address = self.ec2_backend.allocate_address(domain)
template = Template(ALLOCATE_ADDRESS_RESPONSE) template = Template(ALLOCATE_ADDRESS_RESPONSE)
return template.render(address=address) return template.render(address=address)
@ -20,11 +19,11 @@ class ElasticIPAddresses(BaseResponse):
instance = eni = None instance = eni = None
if "InstanceId" in self.querystring: if "InstanceId" in self.querystring:
instance = ec2_backend.get_instance(self.querystring['InstanceId'][0]) instance = self.ec2_backend.get_instance(self.querystring['InstanceId'][0])
elif "NetworkInterfaceId" in self.querystring: elif "NetworkInterfaceId" in self.querystring:
eni = ec2_backend.get_network_interface(self.querystring['NetworkInterfaceId'][0]) eni = self.ec2_backend.get_network_interface(self.querystring['NetworkInterfaceId'][0])
else: else:
ec2_backend.raise_error("MissingParameter", "Invalid request, expect InstanceId/NetworkId parameter.") self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect InstanceId/NetworkId parameter.")
reassociate = False reassociate = False
if "AllowReassociation" in self.querystring: if "AllowReassociation" in self.querystring:
@ -32,13 +31,13 @@ class ElasticIPAddresses(BaseResponse):
if instance or eni: if instance or eni:
if "PublicIp" in self.querystring: if "PublicIp" in self.querystring:
eip = ec2_backend.associate_address(instance=instance, eni=eni, address=self.querystring['PublicIp'][0], reassociate=reassociate) eip = self.ec2_backend.associate_address(instance=instance, eni=eni, address=self.querystring['PublicIp'][0], reassociate=reassociate)
elif "AllocationId" in self.querystring: elif "AllocationId" in self.querystring:
eip = ec2_backend.associate_address(instance=instance, eni=eni, allocation_id=self.querystring['AllocationId'][0], reassociate=reassociate) eip = self.ec2_backend.associate_address(instance=instance, eni=eni, allocation_id=self.querystring['AllocationId'][0], reassociate=reassociate)
else: else:
ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AllocationId parameter.") self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AllocationId parameter.")
else: else:
ec2_backend.raise_error("MissingParameter", "Invalid request, expect either instance or ENI.") self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect either instance or ENI.")
template = Template(ASSOCIATE_ADDRESS_RESPONSE) template = Template(ASSOCIATE_ADDRESS_RESPONSE)
return template.render(address=eip) return template.render(address=eip)
@ -50,31 +49,31 @@ class ElasticIPAddresses(BaseResponse):
raise NotImplementedError("Filtering not supported in describe_address.") raise NotImplementedError("Filtering not supported in describe_address.")
elif "PublicIp.1" in self.querystring: elif "PublicIp.1" in self.querystring:
public_ips = sequence_from_querystring("PublicIp", self.querystring) public_ips = sequence_from_querystring("PublicIp", self.querystring)
addresses = ec2_backend.address_by_ip(public_ips) addresses = self.ec2_backend.address_by_ip(public_ips)
elif "AllocationId.1" in self.querystring: elif "AllocationId.1" in self.querystring:
allocation_ids = sequence_from_querystring("AllocationId", self.querystring) allocation_ids = sequence_from_querystring("AllocationId", self.querystring)
addresses = ec2_backend.address_by_allocation(allocation_ids) addresses = self.ec2_backend.address_by_allocation(allocation_ids)
else: else:
addresses = ec2_backend.describe_addresses() addresses = self.ec2_backend.describe_addresses()
return template.render(addresses=addresses) return template.render(addresses=addresses)
def disassociate_address(self): def disassociate_address(self):
if "PublicIp" in self.querystring: if "PublicIp" in self.querystring:
disassociated = ec2_backend.disassociate_address(address=self.querystring['PublicIp'][0]) disassociated = self.ec2_backend.disassociate_address(address=self.querystring['PublicIp'][0])
elif "AssociationId" in self.querystring: elif "AssociationId" in self.querystring:
disassociated = ec2_backend.disassociate_address(association_id=self.querystring['AssociationId'][0]) disassociated = self.ec2_backend.disassociate_address(association_id=self.querystring['AssociationId'][0])
else: else:
ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AssociationId parameter.") self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AssociationId parameter.")
return Template(DISASSOCIATE_ADDRESS_RESPONSE).render() return Template(DISASSOCIATE_ADDRESS_RESPONSE).render()
def release_address(self): def release_address(self):
if "PublicIp" in self.querystring: if "PublicIp" in self.querystring:
released = ec2_backend.release_address(address=self.querystring['PublicIp'][0]) released = self.ec2_backend.release_address(address=self.querystring['PublicIp'][0])
elif "AllocationId" in self.querystring: elif "AllocationId" in self.querystring:
released = ec2_backend.release_address(allocation_id=self.querystring['AllocationId'][0]) released = self.ec2_backend.release_address(allocation_id=self.querystring['AllocationId'][0])
else: else:
ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AllocationId parameter.") self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AllocationId parameter.")
return Template(RELEASE_ADDRESS_RESPONSE).render() return Template(RELEASE_ADDRESS_RESPONSE).render()

View File

@ -2,7 +2,6 @@ from __future__ import unicode_literals
from jinja2 import Template from jinja2 import Template
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import sequence_from_querystring, filters_from_querystring from moto.ec2.utils import sequence_from_querystring, filters_from_querystring
@ -11,14 +10,14 @@ class ElasticNetworkInterfaces(BaseResponse):
subnet_id = self.querystring.get('SubnetId')[0] subnet_id = self.querystring.get('SubnetId')[0]
private_ip_address = self.querystring.get('PrivateIpAddress', [None])[0] private_ip_address = self.querystring.get('PrivateIpAddress', [None])[0]
groups = sequence_from_querystring('SecurityGroupId', self.querystring) groups = sequence_from_querystring('SecurityGroupId', self.querystring)
subnet = ec2_backend.get_subnet(subnet_id) subnet = self.ec2_backend.get_subnet(subnet_id)
eni = ec2_backend.create_network_interface(subnet, private_ip_address, groups) eni = self.ec2_backend.create_network_interface(subnet, private_ip_address, groups)
template = Template(CREATE_NETWORK_INTERFACE_RESPONSE) template = Template(CREATE_NETWORK_INTERFACE_RESPONSE)
return template.render(eni=eni) return template.render(eni=eni)
def delete_network_interface(self): def delete_network_interface(self):
eni_id = self.querystring.get('NetworkInterfaceId')[0] eni_id = self.querystring.get('NetworkInterfaceId')[0]
ec2_backend.delete_network_interface(eni_id) self.ec2_backend.delete_network_interface(eni_id)
template = Template(DELETE_NETWORK_INTERFACE_RESPONSE) template = Template(DELETE_NETWORK_INTERFACE_RESPONSE)
return template.render() return template.render()
@ -28,7 +27,7 @@ class ElasticNetworkInterfaces(BaseResponse):
def describe_network_interfaces(self): def describe_network_interfaces(self):
#Partially implemented. Supports only network-interface-id and group-id filters #Partially implemented. Supports only network-interface-id and group-id filters
filters = filters_from_querystring(self.querystring) filters = filters_from_querystring(self.querystring)
enis = ec2_backend.describe_network_interfaces(filters) enis = self.ec2_backend.describe_network_interfaces(filters)
template = Template(DESCRIBE_NETWORK_INTERFACES_RESPONSE) template = Template(DESCRIBE_NETWORK_INTERFACES_RESPONSE)
return template.render(enis=enis) return template.render(enis=enis)
@ -36,13 +35,13 @@ class ElasticNetworkInterfaces(BaseResponse):
eni_id = self.querystring.get('NetworkInterfaceId')[0] eni_id = self.querystring.get('NetworkInterfaceId')[0]
instance_id = self.querystring.get('InstanceId')[0] instance_id = self.querystring.get('InstanceId')[0]
device_index = self.querystring.get('DeviceIndex')[0] device_index = self.querystring.get('DeviceIndex')[0]
attachment_id = ec2_backend.attach_network_interface(eni_id, instance_id, device_index) attachment_id = self.ec2_backend.attach_network_interface(eni_id, instance_id, device_index)
template = Template(ATTACH_NETWORK_INTERFACE_RESPONSE) template = Template(ATTACH_NETWORK_INTERFACE_RESPONSE)
return template.render(attachment_id=attachment_id) return template.render(attachment_id=attachment_id)
def detach_network_interface(self): def detach_network_interface(self):
attachment_id = self.querystring.get('AttachmentId')[0] attachment_id = self.querystring.get('AttachmentId')[0]
ec2_backend.detach_network_interface(attachment_id) self.ec2_backend.detach_network_interface(attachment_id)
template = Template(DETACH_NETWORK_INTERFACE_RESPONSE) template = Template(DETACH_NETWORK_INTERFACE_RESPONSE)
return template.render() return template.render()
@ -50,7 +49,7 @@ class ElasticNetworkInterfaces(BaseResponse):
#Currently supports modifying one and only one security group #Currently supports modifying one and only one security group
eni_id = self.querystring.get('NetworkInterfaceId')[0] eni_id = self.querystring.get('NetworkInterfaceId')[0]
group_id = self.querystring.get('SecurityGroupId.1')[0] group_id = self.querystring.get('SecurityGroupId.1')[0]
ec2_backend.modify_network_interface_attribute(eni_id, group_id) self.ec2_backend.modify_network_interface_attribute(eni_id, group_id)
return MODIFY_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE return MODIFY_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE
def reset_network_interface_attribute(self): def reset_network_interface_attribute(self):

View File

@ -2,7 +2,6 @@ from __future__ import unicode_literals
from jinja2 import Template from jinja2 import Template
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import instance_ids_from_querystring from moto.ec2.utils import instance_ids_from_querystring
@ -10,7 +9,7 @@ class General(BaseResponse):
def get_console_output(self): def get_console_output(self):
self.instance_ids = instance_ids_from_querystring(self.querystring) self.instance_ids = instance_ids_from_querystring(self.querystring)
instance_id = self.instance_ids[0] instance_id = self.instance_ids[0]
instance = ec2_backend.get_instance(instance_id) instance = self.ec2_backend.get_instance(instance_id)
template = Template(GET_CONSOLE_OUTPUT_RESULT) template = Template(GET_CONSOLE_OUTPUT_RESULT)
return template.render(instance=instance) return template.render(instance=instance)

View File

@ -1,6 +1,5 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import sequence_from_querystring from moto.ec2.utils import sequence_from_querystring
from jinja2 import Template from jinja2 import Template
@ -8,18 +7,18 @@ class InternetGateways(BaseResponse):
def attach_internet_gateway(self): def attach_internet_gateway(self):
igw_id = self.querystring.get("InternetGatewayId", [None])[0] igw_id = self.querystring.get("InternetGatewayId", [None])[0]
vpc_id = self.querystring.get("VpcId", [None])[0] vpc_id = self.querystring.get("VpcId", [None])[0]
ec2_backend.attach_internet_gateway(igw_id, vpc_id) self.ec2_backend.attach_internet_gateway(igw_id, vpc_id)
template = Template(ATTACH_INTERNET_GATEWAY_RESPONSE) template = Template(ATTACH_INTERNET_GATEWAY_RESPONSE)
return template.render() return template.render()
def create_internet_gateway(self): def create_internet_gateway(self):
igw = ec2_backend.create_internet_gateway() igw = self.ec2_backend.create_internet_gateway()
template = Template(CREATE_INTERNET_GATEWAY_RESPONSE) template = Template(CREATE_INTERNET_GATEWAY_RESPONSE)
return template.render(internet_gateway=igw) return template.render(internet_gateway=igw)
def delete_internet_gateway(self): def delete_internet_gateway(self):
igw_id = self.querystring.get("InternetGatewayId", [None])[0] igw_id = self.querystring.get("InternetGatewayId", [None])[0]
ec2_backend.delete_internet_gateway(igw_id) self.ec2_backend.delete_internet_gateway(igw_id)
template = Template(DELETE_INTERNET_GATEWAY_RESPONSE) template = Template(DELETE_INTERNET_GATEWAY_RESPONSE)
return template.render() return template.render()
@ -30,9 +29,9 @@ class InternetGateways(BaseResponse):
elif "InternetGatewayId.1" in self.querystring: elif "InternetGatewayId.1" in self.querystring:
igw_ids = sequence_from_querystring( igw_ids = sequence_from_querystring(
"InternetGatewayId", self.querystring) "InternetGatewayId", self.querystring)
igws = ec2_backend.describe_internet_gateways(igw_ids) igws = self.ec2_backend.describe_internet_gateways(igw_ids)
else: else:
igws = ec2_backend.describe_internet_gateways() igws = self.ec2_backend.describe_internet_gateways()
template = Template(DESCRIBE_INTERNET_GATEWAYS_RESPONSE) template = Template(DESCRIBE_INTERNET_GATEWAYS_RESPONSE)
return template.render(internet_gateways=igws) return template.render(internet_gateways=igws)
@ -41,7 +40,7 @@ class InternetGateways(BaseResponse):
# raise else DependencyViolationError() # raise else DependencyViolationError()
igw_id = self.querystring.get("InternetGatewayId", [None])[0] igw_id = self.querystring.get("InternetGatewayId", [None])[0]
vpc_id = self.querystring.get("VpcId", [None])[0] vpc_id = self.querystring.get("VpcId", [None])[0]
ec2_backend.detach_internet_gateway(igw_id, vpc_id) self.ec2_backend.detach_internet_gateway(igw_id, vpc_id)
template = Template(DETACH_INTERNET_GATEWAY_RESPONSE) template = Template(DETACH_INTERNET_GATEWAY_RESPONSE)
return template.render() return template.render()

View File

@ -2,7 +2,6 @@ from __future__ import unicode_literals
import six import six
from jinja2 import Template from jinja2 import Template
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import keypair_names_from_querystring, filters_from_querystring from moto.ec2.utils import keypair_names_from_querystring, filters_from_querystring
@ -10,13 +9,13 @@ class KeyPairs(BaseResponse):
def create_key_pair(self): def create_key_pair(self):
name = self.querystring.get('KeyName')[0] name = self.querystring.get('KeyName')[0]
keypair = ec2_backend.create_key_pair(name) keypair = self.ec2_backend.create_key_pair(name)
template = Template(CREATE_KEY_PAIR_RESPONSE) template = Template(CREATE_KEY_PAIR_RESPONSE)
return template.render(**keypair) return template.render(**keypair)
def delete_key_pair(self): def delete_key_pair(self):
name = self.querystring.get('KeyName')[0] name = self.querystring.get('KeyName')[0]
success = six.text_type(ec2_backend.delete_key_pair(name)).lower() success = six.text_type(self.ec2_backend.delete_key_pair(name)).lower()
return Template(DELETE_KEY_PAIR_RESPONSE).render(success=success) return Template(DELETE_KEY_PAIR_RESPONSE).render(success=success)
def describe_key_pairs(self): def describe_key_pairs(self):
@ -25,7 +24,7 @@ class KeyPairs(BaseResponse):
if len(filters) > 0: if len(filters) > 0:
raise NotImplementedError('Using filters in KeyPairs.describe_key_pairs is not yet implemented') raise NotImplementedError('Using filters in KeyPairs.describe_key_pairs is not yet implemented')
keypairs = ec2_backend.describe_key_pairs(names) keypairs = self.ec2_backend.describe_key_pairs(names)
template = Template(DESCRIBE_KEY_PAIRS_RESPONSE) template = Template(DESCRIBE_KEY_PAIRS_RESPONSE)
return template.render(keypairs=keypairs) return template.render(keypairs=keypairs)

View File

@ -2,7 +2,6 @@ from __future__ import unicode_literals
from jinja2 import Template from jinja2 import Template
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import route_table_ids_from_querystring, filters_from_querystring, optional_from_querystring from moto.ec2.utils import route_table_ids_from_querystring, filters_from_querystring, optional_from_querystring
@ -10,7 +9,7 @@ class RouteTables(BaseResponse):
def associate_route_table(self): def associate_route_table(self):
route_table_id = self.querystring.get('RouteTableId')[0] route_table_id = self.querystring.get('RouteTableId')[0]
subnet_id = self.querystring.get('SubnetId')[0] subnet_id = self.querystring.get('SubnetId')[0]
association_id = ec2_backend.associate_route_table(route_table_id, subnet_id) association_id = self.ec2_backend.associate_route_table(route_table_id, subnet_id)
template = Template(ASSOCIATE_ROUTE_TABLE_RESPONSE) template = Template(ASSOCIATE_ROUTE_TABLE_RESPONSE)
return template.render(association_id=association_id) return template.render(association_id=association_id)
@ -23,7 +22,7 @@ class RouteTables(BaseResponse):
interface_id = optional_from_querystring('NetworkInterfaceId', self.querystring) interface_id = optional_from_querystring('NetworkInterfaceId', self.querystring)
pcx_id = optional_from_querystring('VpcPeeringConnectionId', self.querystring) pcx_id = optional_from_querystring('VpcPeeringConnectionId', self.querystring)
route = ec2_backend.create_route(route_table_id, destination_cidr_block, route = self.ec2_backend.create_route(route_table_id, destination_cidr_block,
gateway_id=internet_gateway_id, gateway_id=internet_gateway_id,
instance_id=instance_id, instance_id=instance_id,
interface_id=interface_id, interface_id=interface_id,
@ -34,33 +33,33 @@ class RouteTables(BaseResponse):
def create_route_table(self): def create_route_table(self):
vpc_id = self.querystring.get('VpcId')[0] vpc_id = self.querystring.get('VpcId')[0]
route_table = ec2_backend.create_route_table(vpc_id) route_table = self.ec2_backend.create_route_table(vpc_id)
template = Template(CREATE_ROUTE_TABLE_RESPONSE) template = Template(CREATE_ROUTE_TABLE_RESPONSE)
return template.render(route_table=route_table) return template.render(route_table=route_table)
def delete_route(self): def delete_route(self):
route_table_id = self.querystring.get('RouteTableId')[0] route_table_id = self.querystring.get('RouteTableId')[0]
destination_cidr_block = self.querystring.get('DestinationCidrBlock')[0] destination_cidr_block = self.querystring.get('DestinationCidrBlock')[0]
ec2_backend.delete_route(route_table_id, destination_cidr_block) self.ec2_backend.delete_route(route_table_id, destination_cidr_block)
template = Template(DELETE_ROUTE_RESPONSE) template = Template(DELETE_ROUTE_RESPONSE)
return template.render() return template.render()
def delete_route_table(self): def delete_route_table(self):
route_table_id = self.querystring.get('RouteTableId')[0] route_table_id = self.querystring.get('RouteTableId')[0]
ec2_backend.delete_route_table(route_table_id) self.ec2_backend.delete_route_table(route_table_id)
template = Template(DELETE_ROUTE_TABLE_RESPONSE) template = Template(DELETE_ROUTE_TABLE_RESPONSE)
return template.render() return template.render()
def describe_route_tables(self): def describe_route_tables(self):
route_table_ids = route_table_ids_from_querystring(self.querystring) route_table_ids = route_table_ids_from_querystring(self.querystring)
filters = filters_from_querystring(self.querystring) filters = filters_from_querystring(self.querystring)
route_tables = ec2_backend.get_all_route_tables(route_table_ids, filters) route_tables = self.ec2_backend.get_all_route_tables(route_table_ids, filters)
template = Template(DESCRIBE_ROUTE_TABLES_RESPONSE) template = Template(DESCRIBE_ROUTE_TABLES_RESPONSE)
return template.render(route_tables=route_tables) return template.render(route_tables=route_tables)
def disassociate_route_table(self): def disassociate_route_table(self):
association_id = self.querystring.get('AssociationId')[0] association_id = self.querystring.get('AssociationId')[0]
ec2_backend.disassociate_route_table(association_id) self.ec2_backend.disassociate_route_table(association_id)
template = Template(DISASSOCIATE_ROUTE_TABLE_RESPONSE) template = Template(DISASSOCIATE_ROUTE_TABLE_RESPONSE)
return template.render() return template.render()
@ -73,7 +72,7 @@ class RouteTables(BaseResponse):
interface_id = optional_from_querystring('NetworkInterfaceId', self.querystring) interface_id = optional_from_querystring('NetworkInterfaceId', self.querystring)
pcx_id = optional_from_querystring('VpcPeeringConnectionId', self.querystring) pcx_id = optional_from_querystring('VpcPeeringConnectionId', self.querystring)
route = ec2_backend.replace_route(route_table_id, destination_cidr_block, route = self.ec2_backend.replace_route(route_table_id, destination_cidr_block,
gateway_id=internet_gateway_id, gateway_id=internet_gateway_id,
instance_id=instance_id, instance_id=instance_id,
interface_id=interface_id, interface_id=interface_id,
@ -85,7 +84,7 @@ class RouteTables(BaseResponse):
def replace_route_table_association(self): def replace_route_table_association(self):
route_table_id = self.querystring.get('RouteTableId')[0] route_table_id = self.querystring.get('RouteTableId')[0]
association_id = self.querystring.get('AssociationId')[0] association_id = self.querystring.get('AssociationId')[0]
new_association_id = ec2_backend.replace_route_table_association(association_id, route_table_id) new_association_id = self.ec2_backend.replace_route_table_association(association_id, route_table_id)
template = Template(REPLACE_ROUTE_TABLE_ASSOCIATION_RESPONSE) template = Template(REPLACE_ROUTE_TABLE_ASSOCIATION_RESPONSE)
return template.render(association_id=new_association_id) return template.render(association_id=new_association_id)

View File

@ -2,7 +2,6 @@ from __future__ import unicode_literals
from jinja2 import Template from jinja2 import Template
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import filters_from_querystring from moto.ec2.utils import filters_from_querystring
@ -41,14 +40,14 @@ class SecurityGroups(BaseResponse):
raise NotImplementedError('SecurityGroups.authorize_security_group_egress is not yet implemented') raise NotImplementedError('SecurityGroups.authorize_security_group_egress is not yet implemented')
def authorize_security_group_ingress(self): def authorize_security_group_ingress(self):
ec2_backend.authorize_security_group_ingress(*process_rules_from_querystring(self.querystring)) self.ec2_backend.authorize_security_group_ingress(*process_rules_from_querystring(self.querystring))
return AUTHORIZE_SECURITY_GROUP_INGRESS_REPONSE return AUTHORIZE_SECURITY_GROUP_INGRESS_REPONSE
def create_security_group(self): def create_security_group(self):
name = self.querystring.get('GroupName')[0] name = self.querystring.get('GroupName')[0]
description = self.querystring.get('GroupDescription', [None])[0] description = self.querystring.get('GroupDescription', [None])[0]
vpc_id = self.querystring.get("VpcId", [None])[0] vpc_id = self.querystring.get("VpcId", [None])[0]
group = ec2_backend.create_security_group(name, description, vpc_id=vpc_id) group = self.ec2_backend.create_security_group(name, description, vpc_id=vpc_id)
template = Template(CREATE_SECURITY_GROUP_RESPONSE) template = Template(CREATE_SECURITY_GROUP_RESPONSE)
return template.render(group=group) return template.render(group=group)
@ -59,9 +58,9 @@ class SecurityGroups(BaseResponse):
sg_id = self.querystring.get('GroupId') sg_id = self.querystring.get('GroupId')
if name: if name:
ec2_backend.delete_security_group(name[0]) self.ec2_backend.delete_security_group(name[0])
elif sg_id: elif sg_id:
ec2_backend.delete_security_group(group_id=sg_id[0]) self.ec2_backend.delete_security_group(group_id=sg_id[0])
return DELETE_GROUP_RESPONSE return DELETE_GROUP_RESPONSE
@ -70,7 +69,7 @@ class SecurityGroups(BaseResponse):
group_ids = self._get_multi_param("GroupId") group_ids = self._get_multi_param("GroupId")
filters = filters_from_querystring(self.querystring) filters = filters_from_querystring(self.querystring)
groups = ec2_backend.describe_security_groups( groups = self.ec2_backend.describe_security_groups(
group_ids=group_ids, group_ids=group_ids,
groupnames=groupnames, groupnames=groupnames,
filters=filters filters=filters
@ -83,7 +82,7 @@ class SecurityGroups(BaseResponse):
raise NotImplementedError('SecurityGroups.revoke_security_group_egress is not yet implemented') raise NotImplementedError('SecurityGroups.revoke_security_group_egress is not yet implemented')
def revoke_security_group_ingress(self): def revoke_security_group_ingress(self):
ec2_backend.revoke_security_group_ingress(*process_rules_from_querystring(self.querystring)) self.ec2_backend.revoke_security_group_ingress(*process_rules_from_querystring(self.querystring))
return REVOKE_SECURITY_GROUP_INGRESS_REPONSE return REVOKE_SECURITY_GROUP_INGRESS_REPONSE

View File

@ -2,7 +2,6 @@ from __future__ import unicode_literals
from jinja2 import Template from jinja2 import Template
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import filters_from_querystring from moto.ec2.utils import filters_from_querystring
@ -17,7 +16,7 @@ class SpotInstances(BaseResponse):
def cancel_spot_instance_requests(self): def cancel_spot_instance_requests(self):
request_ids = self._get_multi_param('SpotInstanceRequestId') request_ids = self._get_multi_param('SpotInstanceRequestId')
requests = ec2_backend.cancel_spot_instance_requests(request_ids) requests = self.ec2_backend.cancel_spot_instance_requests(request_ids)
template = Template(CANCEL_SPOT_INSTANCES_TEMPLATE) template = Template(CANCEL_SPOT_INSTANCES_TEMPLATE)
return template.render(requests=requests) return template.render(requests=requests)
@ -32,7 +31,7 @@ class SpotInstances(BaseResponse):
def describe_spot_instance_requests(self): def describe_spot_instance_requests(self):
filters = filters_from_querystring(self.querystring) filters = filters_from_querystring(self.querystring)
requests = ec2_backend.describe_spot_instance_requests(filters=filters) requests = self.ec2_backend.describe_spot_instance_requests(filters=filters)
template = Template(DESCRIBE_SPOT_INSTANCES_TEMPLATE) template = Template(DESCRIBE_SPOT_INSTANCES_TEMPLATE)
return template.render(requests=requests) return template.render(requests=requests)
@ -58,7 +57,7 @@ class SpotInstances(BaseResponse):
monitoring_enabled = self._get_param('LaunchSpecification.Monitoring.Enabled') monitoring_enabled = self._get_param('LaunchSpecification.Monitoring.Enabled')
subnet_id = self._get_param('LaunchSpecification.SubnetId') subnet_id = self._get_param('LaunchSpecification.SubnetId')
requests = ec2_backend.request_spot_instances( requests = self.ec2_backend.request_spot_instances(
price=price, price=price,
image_id=image_id, image_id=image_id,
count=count, count=count,

View File

@ -2,7 +2,7 @@ from __future__ import unicode_literals
from jinja2 import Template from jinja2 import Template
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend, validate_resource_ids from moto.ec2.models import validate_resource_ids
from moto.ec2.utils import sequence_from_querystring, tags_from_query_string, filters_from_querystring from moto.ec2.utils import sequence_from_querystring, tags_from_query_string, filters_from_querystring
@ -25,7 +25,7 @@ class TagResponse(BaseResponse):
def describe_tags(self): def describe_tags(self):
filters = filters_from_querystring(querystring_dict=self.querystring) filters = filters_from_querystring(querystring_dict=self.querystring)
tags = ec2_backend.describe_tags(filters=filters) tags = self.ec2_backend.describe_tags(filters=filters)
template = Template(DESCRIBE_RESPONSE) template = Template(DESCRIBE_RESPONSE)
return template.render(tags=tags) return template.render(tags=tags)

View File

@ -2,37 +2,36 @@ from __future__ import unicode_literals
from jinja2 import Template from jinja2 import Template
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
class VPCPeeringConnections(BaseResponse): class VPCPeeringConnections(BaseResponse):
def create_vpc_peering_connection(self): def create_vpc_peering_connection(self):
vpc = ec2_backend.get_vpc(self.querystring.get('VpcId')[0]) vpc = self.ec2_backend.get_vpc(self.querystring.get('VpcId')[0])
peer_vpc = ec2_backend.get_vpc(self.querystring.get('PeerVpcId')[0]) peer_vpc = self.ec2_backend.get_vpc(self.querystring.get('PeerVpcId')[0])
vpc_pcx = ec2_backend.create_vpc_peering_connection(vpc, peer_vpc) vpc_pcx = self.ec2_backend.create_vpc_peering_connection(vpc, peer_vpc)
template = Template(CREATE_VPC_PEERING_CONNECTION_RESPONSE) template = Template(CREATE_VPC_PEERING_CONNECTION_RESPONSE)
return template.render(vpc_pcx=vpc_pcx) return template.render(vpc_pcx=vpc_pcx)
def delete_vpc_peering_connection(self): def delete_vpc_peering_connection(self):
vpc_pcx_id = self.querystring.get('VpcPeeringConnectionId')[0] vpc_pcx_id = self.querystring.get('VpcPeeringConnectionId')[0]
vpc_pcx = ec2_backend.delete_vpc_peering_connection(vpc_pcx_id) vpc_pcx = self.ec2_backend.delete_vpc_peering_connection(vpc_pcx_id)
template = Template(DELETE_VPC_PEERING_CONNECTION_RESPONSE) template = Template(DELETE_VPC_PEERING_CONNECTION_RESPONSE)
return template.render(vpc_pcx=vpc_pcx) return template.render(vpc_pcx=vpc_pcx)
def describe_vpc_peering_connections(self): def describe_vpc_peering_connections(self):
vpc_pcxs = ec2_backend.get_all_vpc_peering_connections() vpc_pcxs = self.ec2_backend.get_all_vpc_peering_connections()
template = Template(DESCRIBE_VPC_PEERING_CONNECTIONS_RESPONSE) template = Template(DESCRIBE_VPC_PEERING_CONNECTIONS_RESPONSE)
return template.render(vpc_pcxs=vpc_pcxs) return template.render(vpc_pcxs=vpc_pcxs)
def accept_vpc_peering_connection(self): def accept_vpc_peering_connection(self):
vpc_pcx_id = self.querystring.get('VpcPeeringConnectionId')[0] vpc_pcx_id = self.querystring.get('VpcPeeringConnectionId')[0]
vpc_pcx = ec2_backend.accept_vpc_peering_connection(vpc_pcx_id) vpc_pcx = self.ec2_backend.accept_vpc_peering_connection(vpc_pcx_id)
template = Template(ACCEPT_VPC_PEERING_CONNECTION_RESPONSE) template = Template(ACCEPT_VPC_PEERING_CONNECTION_RESPONSE)
return template.render(vpc_pcx=vpc_pcx) return template.render(vpc_pcx=vpc_pcx)
def reject_vpc_peering_connection(self): def reject_vpc_peering_connection(self):
vpc_pcx_id = self.querystring.get('VpcPeeringConnectionId')[0] vpc_pcx_id = self.querystring.get('VpcPeeringConnectionId')[0]
vpc_pcx = ec2_backend.reject_vpc_peering_connection(vpc_pcx_id) vpc_pcx = self.ec2_backend.reject_vpc_peering_connection(vpc_pcx_id)
template = Template(REJECT_VPC_PEERING_CONNECTION_RESPONSE) template = Template(REJECT_VPC_PEERING_CONNECTION_RESPONSE)
return template.render() return template.render()

View File

@ -3,12 +3,12 @@ from __future__ import unicode_literals
import tests.backport_assert_raises import tests.backport_assert_raises
from nose.tools import assert_raises from nose.tools import assert_raises
from moto.ec2 import ec2_backends
import boto import boto
from boto.exception import EC2ResponseError from boto.exception import EC2ResponseError
import sure # noqa import sure # noqa
from moto import mock_ec2 from moto import mock_ec2
from moto.ec2.models import ec2_backend
@mock_ec2 @mock_ec2
@ -198,6 +198,6 @@ def test_modify_attribute_blockDeviceMapping():
instance.modify_attribute('blockDeviceMapping', {'/dev/sda1': True}) instance.modify_attribute('blockDeviceMapping', {'/dev/sda1': True})
instance = ec2_backend.get_instance(instance.id) instance = ec2_backends[conn.region.name].get_instance(instance.id)
instance.block_device_mapping.should.have.key('/dev/sda1') instance.block_device_mapping.should.have.key('/dev/sda1')
instance.block_device_mapping['/dev/sda1'].delete_on_termination.should.be(True) instance.block_device_mapping['/dev/sda1'].delete_on_termination.should.be(True)