Merge pull request #167 from DreadPirateShawn/ErrorHandling

Error handling: Model-level validations, proper error responses.
This commit is contained in:
Steve Pulec 2014-08-25 21:11:05 -04:00
commit e45f32c6aa
26 changed files with 702 additions and 292 deletions

View File

@ -2,12 +2,6 @@ from werkzeug.exceptions import BadRequest
from jinja2 import Template
class InvalidIdError(RuntimeError):
def __init__(self, id_value):
super(InvalidIdError, self).__init__()
self.id = id_value
class EC2ClientError(BadRequest):
def __init__(self, code, message):
super(EC2ClientError, self).__init__()
@ -21,6 +15,14 @@ class DependencyViolationError(EC2ClientError):
"DependencyViolation", message)
class MissingParameterError(EC2ClientError):
def __init__(self, parameter):
super(MissingParameterError, self).__init__(
"MissingParameter",
"The request must contain the parameter {0}"
.format(parameter))
class InvalidDHCPOptionsIdError(EC2ClientError):
def __init__(self, dhcp_options_id):
super(InvalidDHCPOptionsIdError, self).__init__(
@ -29,6 +31,30 @@ class InvalidDHCPOptionsIdError(EC2ClientError):
.format(dhcp_options_id))
class MalformedDHCPOptionsIdError(EC2ClientError):
def __init__(self, dhcp_options_id):
super(MalformedDHCPOptionsIdError, self).__init__(
"InvalidDhcpOptionsId.Malformed",
"Invalid id: \"{0}\" (expecting \"dopt-...\")"
.format(dhcp_options_id))
class InvalidKeyPairNameError(EC2ClientError):
def __init__(self, key):
super(InvalidKeyPairNameError, self).__init__(
"InvalidKeyPair.NotFound",
"The keypair '{0}' does not exist."
.format(key))
class InvalidKeyPairDuplicateError(EC2ClientError):
def __init__(self, key):
super(InvalidKeyPairDuplicateError, self).__init__(
"InvalidKeyPair.Duplicate",
"The keypair '{0}' already exists."
.format(key))
class InvalidVPCIdError(EC2ClientError):
def __init__(self, vpc_id):
super(InvalidVPCIdError, self).__init__(
@ -37,6 +63,108 @@ class InvalidVPCIdError(EC2ClientError):
.format(vpc_id))
class InvalidSubnetIdError(EC2ClientError):
def __init__(self, subnet_id):
super(InvalidSubnetIdError, self).__init__(
"InvalidSubnetID.NotFound",
"The subnet ID '{0}' does not exist"
.format(subnet_id))
class InvalidSecurityGroupDuplicateError(EC2ClientError):
def __init__(self, name):
super(InvalidSecurityGroupDuplicateError, self).__init__(
"InvalidGroup.Duplicate",
"The security group '{0}' already exists"
.format(name))
class InvalidSecurityGroupNotFoundError(EC2ClientError):
def __init__(self, name):
super(InvalidSecurityGroupNotFoundError, self).__init__(
"InvalidGroup.NotFound",
"The security group '{0}' does not exist"
.format(name))
class InvalidPermissionNotFoundError(EC2ClientError):
def __init__(self):
super(InvalidPermissionNotFoundError, self).__init__(
"InvalidPermission.NotFound",
"Could not find a matching ingress rule")
class InvalidInstanceIdError(EC2ClientError):
def __init__(self, instance_id):
super(InvalidInstanceIdError, self).__init__(
"InvalidInstanceID.NotFound",
"The instance ID '{0}' does not exist"
.format(instance_id))
class InvalidAMIIdError(EC2ClientError):
def __init__(self, ami_id):
super(InvalidAMIIdError, self).__init__(
"InvalidAMIID.NotFound",
"The image id '[{0}]' does not exist"
.format(ami_id))
class InvalidSnapshotIdError(EC2ClientError):
def __init__(self, snapshot_id):
super(InvalidSnapshotIdError, self).__init__(
"InvalidSnapshot.NotFound",
"") # Note: AWS returns empty message for this, as of 2014.08.22.
class InvalidVolumeIdError(EC2ClientError):
def __init__(self, volume_id):
super(InvalidVolumeIdError, self).__init__(
"InvalidVolume.NotFound",
"The volume '{0}' does not exist."
.format(volume_id))
class InvalidVolumeAttachmentError(EC2ClientError):
def __init__(self, volume_id, instance_id):
super(InvalidVolumeAttachmentError, self).__init__(
"InvalidAttachment.NotFound",
"Volume {0} can not be detached from {1} because it is not attached"
.format(volume_id, instance_id))
class InvalidDomainError(EC2ClientError):
def __init__(self, domain):
super(InvalidDomainError, self).__init__(
"InvalidParameterValue",
"Invalid value '{0}' for domain."
.format(domain))
class InvalidAddressError(EC2ClientError):
def __init__(self, ip):
super(InvalidAddressError, self).__init__(
"InvalidAddress.NotFound",
"Address '{0}' not found."
.format(ip))
class InvalidAllocationIdError(EC2ClientError):
def __init__(self, allocation_id):
super(InvalidAllocationIdError, self).__init__(
"InvalidAllocationID.NotFound",
"Allocation ID '{0}' not found."
.format(allocation_id))
class InvalidAssociationIdError(EC2ClientError):
def __init__(self, association_id):
super(InvalidAssociationIdError, self).__init__(
"InvalidAssociationID.NotFound",
"Association ID '{0}' not found."
.format(association_id))
class InvalidVPCPeeringConnectionIdError(EC2ClientError):
def __init__(self, vpc_peering_connection_id):
super(InvalidVPCPeeringConnectionIdError, self).__init__(
@ -61,9 +189,9 @@ class InvalidParameterValueError(EC2ClientError):
.format(parameter_value))
class InvalidInternetGatewayIDError(EC2ClientError):
class InvalidInternetGatewayIdError(EC2ClientError):
def __init__(self, internet_gateway_id):
super(InvalidInternetGatewayIDError, self).__init__(
super(InvalidInternetGatewayIdError, self).__init__(
"InvalidInternetGatewayID.NotFound",
"InternetGatewayID {0} does not exist."
.format(internet_gateway_id))
@ -78,11 +206,11 @@ class GatewayNotAttachedError(EC2ClientError):
class ResourceAlreadyAssociatedError(EC2ClientError):
def __init__(self, resource):
def __init__(self, resource_id):
super(ResourceAlreadyAssociatedError, self).__init__(
"Resource.AlreadyAssociated",
"Resource {0} is already associated."
.format(str(resource)))
.format(resource_id))
ERROR_RESPONSE = u"""<?xml version="1.0" encoding="UTF-8"?>

View File

@ -10,13 +10,31 @@ from boto.ec2.launchspecification import LaunchSpecification
from moto.core import BaseBackend
from moto.core.models import Model
from .exceptions import (
InvalidIdError,
EC2ClientError,
DependencyViolationError,
MissingParameterError,
InvalidParameterValueError,
InvalidDHCPOptionsIdError,
InvalidInternetGatewayIDError,
MalformedDHCPOptionsIdError,
InvalidKeyPairNameError,
InvalidKeyPairDuplicateError,
InvalidInternetGatewayIdError,
GatewayNotAttachedError,
ResourceAlreadyAssociatedError,
InvalidVPCIdError,
InvalidSubnetIdError,
InvalidSecurityGroupDuplicateError,
InvalidSecurityGroupNotFoundError,
InvalidPermissionNotFoundError,
InvalidInstanceIdError,
InvalidAMIIdError,
InvalidSnapshotIdError,
InvalidVolumeIdError,
InvalidVolumeAttachmentError,
InvalidDomainError,
InvalidAddressError,
InvalidAllocationIdError,
InvalidAssociationIdError,
InvalidVPCPeeringConnectionIdError,
InvalidVPCPeeringConnectionStateTransitionError
)
@ -118,6 +136,7 @@ class InstanceBackend(object):
for instance in self.all_instances():
if instance.id == instance_id:
return instance
raise InvalidInstanceIdError(instance_id)
def add_instances(self, image_id, count, user_data, security_group_names,
**kwargs):
@ -200,6 +219,10 @@ class InstanceBackend(object):
if instance.id in instance_ids:
result.append(instance)
# TODO: Trim error message down to specific invalid id.
if instance_ids and len(instance_ids) > len(result):
raise InvalidInstanceIdError(instance_ids)
return result
def get_instance_by_id(self, instance_id):
@ -224,7 +247,7 @@ class InstanceBackend(object):
found_instance_ids = [instance.id for reservation in reservations for instance in reservation.instances]
if len(found_instance_ids) != len(instance_ids):
invalid_id = list(set(instance_ids).difference(set(found_instance_ids)))[0]
raise InvalidIdError(invalid_id)
raise InvalidInstanceIdError(invalid_id)
return reservations
def all_reservations(self, make_copy=False):
@ -244,7 +267,7 @@ class KeyPairBackend(object):
def create_key_pair(self, name):
if name in self.keypairs:
raise InvalidIdError(name)
raise InvalidKeyPairDuplicateError(name)
self.keypairs[name] = keypair = random_key_pair()
keypair['name'] = name
return keypair
@ -260,6 +283,11 @@ class KeyPairBackend(object):
if not filter_names or name in filter_names:
keypair['name'] = name
results.append(keypair)
# TODO: Trim error message down to specific invalid name.
if filter_names and len(filter_names) > len(results):
raise InvalidKeyPairNameError(filter_names)
return results
@ -315,8 +343,6 @@ class AmiBackend(object):
# TODO: check that instance exists and pull info from it.
ami_id = random_ami_id()
instance = self.get_instance(instance_id)
if not instance:
return None
ami = Ami(ami_id, instance, name, description)
self.amis[ami_id] = ami
return ami
@ -327,14 +353,14 @@ class AmiBackend(object):
if ami_id in self.amis:
images.append(self.amis[ami_id])
else:
raise InvalidIdError(ami_id)
raise InvalidAMIIdError(ami_id)
return images or self.amis.values()
def deregister_image(self, ami_id):
if ami_id in self.amis:
self.amis.pop(ami_id)
return True
return False
raise InvalidAMIIdError(ami_id)
class Region(object):
@ -453,11 +479,14 @@ class SecurityGroupBackend(object):
super(SecurityGroupBackend, self).__init__()
def create_security_group(self, name, description, vpc_id=None, force=False):
if not description:
raise MissingParameterError('GroupDescription')
group_id = random_security_group_id()
if not force:
existing_group = self.get_security_group_from_name(name, vpc_id)
if existing_group:
return None
raise InvalidSecurityGroupDuplicateError(name)
group = SecurityGroup(group_id, name, description, vpc_id=vpc_id)
self.groups[vpc_id][group_id] = group
@ -472,11 +501,13 @@ class SecurityGroupBackend(object):
for vpc in self.groups.values():
if group_id in vpc:
return vpc.pop(group_id)
raise InvalidSecurityGroupNotFoundError(group_id)
elif name:
# Group Name. Has to be in standard EC2, VPC needs to be identified by group_id
group = self.get_security_group_from_name(name)
if group:
return self.groups[None].pop(group.id)
raise InvalidSecurityGroupNotFoundError(name)
def get_security_group_from_id(self, group_id):
# 2 levels of chaining necessary since it's a complex structure
@ -565,7 +596,8 @@ class SecurityGroupBackend(object):
if security_rule in group.ingress_rules:
group.ingress_rules.remove(security_rule)
return security_rule
return False
raise InvalidPermissionNotFoundError()
class VolumeAttachment(object):
@ -642,13 +674,19 @@ class EBSBackend(object):
def describe_volumes(self):
return self.volumes.values()
def get_volume(self, volume_id):
volume = self.volumes.get(volume_id, None)
if not volume:
raise InvalidVolumeIdError(volume_id)
return volume
def delete_volume(self, volume_id):
if volume_id in self.volumes:
return self.volumes.pop(volume_id)
return False
raise InvalidVolumeIdError(volume_id)
def attach_volume(self, volume_id, instance_id, device_path):
volume = self.volumes.get(volume_id)
volume = self.get_volume(volume_id)
instance = self.get_instance(instance_id)
if not volume or not instance:
@ -658,19 +696,19 @@ class EBSBackend(object):
return volume.attachment
def detach_volume(self, volume_id, instance_id, device_path):
volume = self.volumes.get(volume_id)
volume = self.get_volume(volume_id)
instance = self.get_instance(instance_id)
if not volume or not instance:
return False
old_attachment = volume.attachment
if not old_attachment:
raise InvalidVolumeAttachmentError(volume_id, instance_id)
volume.attachment = None
return old_attachment
def create_snapshot(self, volume_id, description):
snapshot_id = random_snapshot_id()
volume = self.volumes.get(volume_id)
volume = self.get_volume(volume_id)
snapshot = Snapshot(snapshot_id, volume, description)
self.snapshots[snapshot_id] = snapshot
return snapshot
@ -681,7 +719,7 @@ class EBSBackend(object):
def delete_snapshot(self, snapshot_id):
if snapshot_id in self.snapshots:
return self.snapshots.pop(snapshot_id)
return False
raise InvalidSnapshotIdError(snapshot_id)
class VPC(TaggedEC2Instance):
@ -725,7 +763,9 @@ class VPCBackend(object):
def delete_vpc(self, vpc_id):
vpc = self.vpcs.pop(vpc_id, None)
if vpc and vpc.dhcp_options:
if not vpc:
raise InvalidVPCIdError(vpc_id)
if vpc.dhcp_options:
vpc.dhcp_options.vpc = None
self.delete_dhcp_options_set(vpc.dhcp_options.id)
vpc.dhcp_options = None
@ -798,7 +838,10 @@ class VPCPeeringConnectionBackend(object):
return self.vpc_pcxs.get(vpc_pcx_id)
def delete_vpc_peering_connection(self, vpc_pcx_id):
return self.vpc_pcxs.pop(vpc_pcx_id, None)
deleted = self.vpc_pcxs.pop(vpc_pcx_id, None)
if not deleted:
raise InvalidVPCPeeringConnectionIdError(vpc_pcx_id)
return deleted
def accept_vpc_peering_connection(self, vpc_pcx_id):
vpc_pcx = self.get_vpc_peering_connection(vpc_pcx_id)
@ -852,7 +895,10 @@ class SubnetBackend(object):
return self.subnets.values()
def delete_subnet(self, subnet_id):
return self.subnets.pop(subnet_id, None)
deleted = self.subnets.pop(subnet_id, None)
if not deleted:
raise InvalidSubnetIdError(subnet_id)
return deleted
class SubnetRouteTableAssociation(object):
@ -978,7 +1024,7 @@ class InternetGatewayBackend(object):
if igw_id in self.internet_gateways:
igws.append(self.internet_gateways[igw_id])
else:
raise InvalidInternetGatewayIDError(igw_id)
raise InvalidInternetGatewayIdError(igw_id)
return igws or self.internet_gateways.values()
def delete_internet_gateway(self, internet_gateway_id):
@ -987,7 +1033,7 @@ class InternetGatewayBackend(object):
if igw.vpc:
raise DependencyViolationError(
"{0} is being utilized by {1}"
.format(internet_gateway_id, igw.vpc)
.format(internet_gateway_id, igw.vpc.id)
)
self.internet_gateways.pop(internet_gateway_id)
return True
@ -1004,7 +1050,7 @@ class InternetGatewayBackend(object):
igw_ids = [internet_gateway_id]
igw = self.describe_internet_gateways(internet_gateway_ids=igw_ids)[0]
if igw.vpc:
raise ResourceAlreadyAssociatedError(igw)
raise ResourceAlreadyAssociatedError(internet_gateway_id)
vpc = self.get_vpc(vpc_id)
igw.vpc = vpc
return True
@ -1148,37 +1194,58 @@ class ElasticAddressBackend(object):
super(ElasticAddressBackend, self).__init__()
def allocate_address(self, domain):
if domain not in ['standard', 'vpc']:
raise InvalidDomainError(domain)
address = ElasticAddress(domain)
self.addresses.append(address)
return address
def address_by_ip(self, ips):
return [address for address in self.addresses
eips = [address for address in self.addresses
if address.public_ip in ips]
# TODO: Trim error message down to specific invalid address.
if not eips or len(ips) > len(eips):
raise InvalidAddressError(ips)
return eips
def address_by_allocation(self, allocation_ids):
return [address for address in self.addresses
eips = [address for address in self.addresses
if address.allocation_id in allocation_ids]
# TODO: Trim error message down to specific invalid id.
if not eips or len(allocation_ids) > len(eips):
raise InvalidAllocationIdError(allocation_ids)
return eips
def address_by_association(self, association_ids):
return [address for address in self.addresses
eips = [address for address in self.addresses
if address.association_id in association_ids]
# TODO: Trim error message down to specific invalid id.
if not eips or len(association_ids) > len(eips):
raise InvalidAssociationIdError(association_ids)
return eips
def associate_address(self, instance, address=None, allocation_id=None, reassociate=False):
eips = []
if address:
eips = self.address_by_ip([address])
elif allocation_id:
eips = self.address_by_allocation([allocation_id])
eip = eips[0] if len(eips) > 0 else None
eip = eips[0]
if eip and eip.instance is None or reassociate:
eip.instance = instance
if eip.domain == "vpc":
eip.association_id = random_eip_association_id()
return eip
else:
return None
if eip.instance and not reassociate:
raise ResourceAlreadyAssociatedError(eip.public_ip)
eip.instance = instance
if eip.domain == "vpc":
eip.association_id = random_eip_association_id()
return eip
def describe_addresses(self):
return self.addresses
@ -1189,14 +1256,11 @@ class ElasticAddressBackend(object):
eips = self.address_by_ip([address])
elif association_id:
eips = self.address_by_association([association_id])
eip = eips[0]
if eips:
eip = eips[0]
eip.instance = None
eip.association_id = None
return True
else:
return False
eip.instance = None
eip.association_id = None
return True
def release_address(self, address=None, allocation_id=None):
eips = []
@ -1204,15 +1268,12 @@ class ElasticAddressBackend(object):
eips = self.address_by_ip([address])
elif allocation_id:
eips = self.address_by_allocation([allocation_id])
eip = eips[0]
if eips:
eip = eips[0]
self.disassociate_address(address=eip.public_ip)
eip.allocation_id = None
self.addresses.remove(eip)
return True
else:
return False
self.disassociate_address(address=eip.public_ip)
eip.allocation_id = None
self.addresses.remove(eip)
return True
class DHCPOptionsSet(TaggedEC2Instance):
@ -1247,6 +1308,16 @@ class DHCPOptionsSetBackend(object):
self, domain_name_servers=None, domain_name=None,
ntp_servers=None, netbios_name_servers=None,
netbios_node_type=None):
NETBIOS_NODE_TYPES = [1, 2, 4, 8]
for field_value in domain_name_servers, ntp_servers, netbios_name_servers:
if field_value and len(field_value) > 4:
raise InvalidParameterValueError(",".join(field_value))
if netbios_node_type and netbios_node_type[0] not in NETBIOS_NODE_TYPES:
raise InvalidParameterValueError(netbios_node_type)
options = DHCPOptionsSet(
domain_name_servers, domain_name, ntp_servers,
netbios_name_servers, netbios_node_type
@ -1264,6 +1335,9 @@ class DHCPOptionsSetBackend(object):
return options_sets or self.dhcp_options_sets.values()
def delete_dhcp_options_set(self, options_id):
if not (options_id and options_id.startswith('dopt-')):
raise MalformedDHCPOptionsIdError(options_id)
if options_id in self.dhcp_options_sets:
if self.dhcp_options_sets[options_id].vpc:
raise DependencyViolationError("Cannot delete assigned DHCP options.")
@ -1280,7 +1354,10 @@ class EC2Backend(BaseBackend, InstanceBackend, TagBackend, AmiBackend,
RouteTableBackend, RouteBackend, InternetGatewayBackend,
VPCGatewayAttachmentBackend, SpotRequestBackend,
ElasticAddressBackend, KeyPairBackend, DHCPOptionsSetBackend):
pass
# Use this to generate a proper error template response when in a response handler.
def raise_error(self, code, message):
raise EC2ClientError(code, message)
ec2_backend = EC2Backend()

View File

@ -2,7 +2,6 @@ from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.exceptions import InvalidIdError
from moto.ec2.utils import instance_ids_from_querystring, image_ids_from_querystring
@ -16,8 +15,6 @@ class AmisResponse(BaseResponse):
instance_ids = instance_ids_from_querystring(self.querystring)
instance_id = instance_ids[0]
image = ec2_backend.create_image(instance_id, name, description)
if not image:
return "There is not instance with id {0}".format(instance_id), dict(status=404)
template = Template(CREATE_IMAGE_RESPONSE)
return template.render(image=image)
@ -25,25 +22,16 @@ class AmisResponse(BaseResponse):
ami_id = self.querystring.get('ImageId')[0]
success = ec2_backend.deregister_image(ami_id)
template = Template(DEREGISTER_IMAGE_RESPONSE)
rendered = template.render(success=str(success).lower())
if success:
return rendered
else:
return rendered, dict(status=404)
return template.render(success=str(success).lower())
def describe_image_attribute(self):
raise NotImplementedError('AMIs.describe_image_attribute is not yet implemented')
def describe_images(self):
ami_ids = image_ids_from_querystring(self.querystring)
try:
images = ec2_backend.describe_images(ami_ids=ami_ids)
except InvalidIdError as exc:
template = Template(DESCRIBE_IMAGES_INVALID_IMAGE_ID_RESPONSE)
return template.render(image_id=exc.id), dict(status=400)
else:
template = Template(DESCRIBE_IMAGES_RESPONSE)
return template.render(images=images)
images = ec2_backend.describe_images(ami_ids=ami_ids)
template = Template(DESCRIBE_IMAGES_RESPONSE)
return template.render(images=images)
def modify_image_attribute(self):
raise NotImplementedError('AMIs.modify_image_attribute is not yet implemented')
@ -116,11 +104,6 @@ DESCRIBE_IMAGE_RESPONSE = """<DescribeImageAttributeResponse xmlns="http://ec2.a
</{{key }}>
</DescribeImageAttributeResponse>"""
DESCRIBE_IMAGES_INVALID_IMAGE_ID_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<Response><Errors><Error><Code>InvalidAMIID.NotFound</Code><Message>The image id '[{{ image_id }}]' does not exist</Message></Error></Errors><RequestID>59dbff89-35bd-4eac-99ed-be587EXAMPLE</RequestID></Response>
"""
DEREGISTER_IMAGE_RESPONSE = """<DeregisterImageResponse xmlns="http://ec2.amazonaws.com/doc/2012-12-01/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<return>{{ success }}</return>

View File

@ -4,12 +4,6 @@ from moto.ec2.utils import (
dhcp_configuration_from_querystring,
sequence_from_querystring)
from moto.ec2.models import ec2_backend
from moto.ec2.exceptions import(
InvalidVPCIdError,
InvalidParameterValueError,
)
NETBIOS_NODE_TYPES = [1, 2, 4, 8]
class DHCPOptions(BaseResponse):
@ -18,7 +12,6 @@ class DHCPOptions(BaseResponse):
vpc_id = self.querystring.get("VpcId", [None])[0]
dhcp_opt = ec2_backend.describe_dhcp_options([dhcp_opt_id])[0]
vpc = ec2_backend.get_vpc(vpc_id)
ec2_backend.associate_dhcp_options(dhcp_opt, vpc)
@ -37,13 +30,6 @@ class DHCPOptions(BaseResponse):
netbios_name_servers = dhcp_config.get("netbios-name-servers", None)
netbios_node_type = dhcp_config.get("netbios-node-type", None)
for field_value in domain_name_servers, ntp_servers, netbios_name_servers:
if field_value and len(field_value) > 4:
raise InvalidParameterValueError(",".join(field_value))
if netbios_node_type and netbios_node_type[0] not in NETBIOS_NODE_TYPES:
raise InvalidParameterValueError(netbios_node_type)
dhcp_options_set = ec2_backend.create_dhcp_options(
domain_name_servers=domain_name_servers,
domain_name=domain_name,
@ -56,20 +42,12 @@ class DHCPOptions(BaseResponse):
return template.render(dhcp_options_set=dhcp_options_set)
def delete_dhcp_options(self):
# TODO InvalidDhcpOptionsId.Malformed
delete_status = False
if "DhcpOptionsId" in self.querystring:
dhcp_opt_id = self.querystring["DhcpOptionsId"][0]
delete_status = ec2_backend.delete_dhcp_options_set(dhcp_opt_id)
dhcp_opt_id = self.querystring.get("DhcpOptionsId", [None])[0]
delete_status = ec2_backend.delete_dhcp_options_set(dhcp_opt_id)
template = Template(DELETE_DHCP_OPTIONS_RESPONSE)
return template.render(delete_status=delete_status)
def describe_dhcp_options(self):
if "Filter.1.Name" in self.querystring:
raise NotImplementedError("Filtering not supported in describe_dhcp_options.")
elif "DhcpOptionsId.1" in self.querystring:

View File

@ -11,8 +11,6 @@ class ElasticBlockStore(BaseResponse):
device_path = self.querystring.get('Device')[0]
attachment = ec2_backend.attach_volume(volume_id, instance_id, device_path)
if not attachment:
return "", dict(status=404)
template = Template(ATTACHED_VOLUME_RESPONSE)
return template.render(attachment=attachment)
@ -38,17 +36,11 @@ class ElasticBlockStore(BaseResponse):
def delete_snapshot(self):
snapshot_id = self.querystring.get('SnapshotId')[0]
success = ec2_backend.delete_snapshot(snapshot_id)
if not success:
# Snapshot doesn't exist
return "Snapshot with id {0} does not exist".format(snapshot_id), dict(status=404)
return DELETE_SNAPSHOT_RESPONSE
def delete_volume(self):
volume_id = self.querystring.get('VolumeId')[0]
success = ec2_backend.delete_volume(volume_id)
if not success:
# Volume doesn't exist
return "Volume with id {0} does not exist".format(volume_id), dict(status=404)
return DELETE_VOLUME_RESPONSE
def describe_snapshot_attribute(self):
@ -76,9 +68,6 @@ class ElasticBlockStore(BaseResponse):
device_path = self.querystring.get('Device')[0]
attachment = ec2_backend.detach_volume(volume_id, instance_id, device_path)
if not attachment:
# Volume wasn't attached
return "Volume {0} can not be detached from {1} because it is not attached".format(volume_id, instance_id), dict(status=404)
template = Template(DETATCH_VOLUME_RESPONSE)
return template.render(attachment=attachment)

View File

@ -9,8 +9,6 @@ class ElasticIPAddresses(BaseResponse):
def allocate_address(self):
if "Domain" in self.querystring:
domain = self.querystring.get('Domain')[0]
if domain != "vpc":
return "Invalid domain:{0}.".format(domain), dict(status=400)
else:
domain = "standard"
address = ec2_backend.allocate_address(domain)
@ -23,7 +21,7 @@ class ElasticIPAddresses(BaseResponse):
elif "NetworkInterfaceId" in self.querystring:
raise NotImplementedError("Lookup by allocation id not implemented")
else:
return "Invalid request, expect InstanceId/NetworkId parameter.", dict(status=400)
ec2_backend.raise_error("MissingParameter", "Invalid request, expect InstanceId/NetworkId parameter.")
reassociate = False
if "AllowReassociation" in self.querystring:
@ -34,13 +32,10 @@ class ElasticIPAddresses(BaseResponse):
elif "AllocationId" in self.querystring:
eip = ec2_backend.associate_address(instance, allocation_id=self.querystring['AllocationId'][0], reassociate=reassociate)
else:
return "Invalid request, expect PublicIp/AllocationId parameter.", dict(status=400)
ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AllocationId parameter.")
if eip:
template = Template(ASSOCIATE_ADDRESS_RESPONSE)
return template.render(address=eip)
else:
return "Failed to associate address.", dict(status=400)
template = Template(ASSOCIATE_ADDRESS_RESPONSE)
return template.render(address=eip)
def describe_addresses(self):
template = Template(DESCRIBE_ADDRESS_RESPONSE)
@ -63,12 +58,9 @@ class ElasticIPAddresses(BaseResponse):
elif "AssociationId" in self.querystring:
disassociated = ec2_backend.disassociate_address(association_id=self.querystring['AssociationId'][0])
else:
return "Invalid request, expect PublicIp/AssociationId parameter.", dict(status=400)
ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AssociationId parameter.")
if disassociated:
return Template(DISASSOCIATE_ADDRESS_RESPONSE).render()
else:
return "Address conresponding to PublicIp/AssociationIP not found.", dict(status=400)
return Template(DISASSOCIATE_ADDRESS_RESPONSE).render()
def release_address(self):
if "PublicIp" in self.querystring:
@ -76,12 +68,9 @@ class ElasticIPAddresses(BaseResponse):
elif "AllocationId" in self.querystring:
released = ec2_backend.release_address(allocation_id=self.querystring['AllocationId'][0])
else:
return "Invalid request, expect PublicIp/AllocationId parameter.", dict(status=400)
ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AllocationId parameter.")
if released:
return Template(RELEASE_ADDRESS_RESPONSE).render()
else:
return "Address conresponding to PublicIp/AssociationIP not found.", dict(status=400)
return Template(RELEASE_ADDRESS_RESPONSE).render()
ALLOCATE_ADDRESS_RESPONSE = """<AllocateAddressResponse xmlns="http://ec2.amazonaws.com/doc/2013-07-15/">

View File

@ -10,11 +10,8 @@ class General(BaseResponse):
self.instance_ids = instance_ids_from_querystring(self.querystring)
instance_id = self.instance_ids[0]
instance = ec2_backend.get_instance(instance_id)
if instance:
template = Template(GET_CONSOLE_OUTPUT_RESULT)
return template.render(instance=instance)
else:
return "", dict(status=404)
template = Template(GET_CONSOLE_OUTPUT_RESULT)
return template.render(instance=instance)
GET_CONSOLE_OUTPUT_RESULT = '''

View File

@ -4,18 +4,13 @@ from moto.core.responses import BaseResponse
from moto.core.utils import camelcase_to_underscores
from moto.ec2.models import ec2_backend
from moto.ec2.utils import instance_ids_from_querystring, filters_from_querystring, filter_reservations
from moto.ec2.exceptions import InvalidIdError
class InstanceResponse(BaseResponse):
def describe_instances(self):
instance_ids = instance_ids_from_querystring(self.querystring)
if instance_ids:
try:
reservations = ec2_backend.get_reservations_by_instance_ids(instance_ids)
except InvalidIdError as exc:
template = Template(EC2_INVALID_INSTANCE_ID)
return template.render(instance_id=exc.id), dict(status=400)
reservations = ec2_backend.get_reservations_by_instance_ids(instance_ids)
else:
reservations = ec2_backend.all_reservations(make_copy=True)
@ -364,14 +359,6 @@ EC2_MODIFY_INSTANCE_ATTRIBUTE = """<ModifyInstanceAttributeResponse xmlns="http:
<return>true</return>
</ModifyInstanceAttributeResponse>"""
EC2_INVALID_INSTANCE_ID = """<?xml version="1.0" encoding="UTF-8"?>
<Response><Errors><Error><Code>InvalidInstanceID.NotFound</Code>
<Message>The instance ID '{{ instance_id }}' does not exist</Message></Error>
</Errors>
<RequestID>39070fe4-6f6d-4565-aecd-7850607e4555</RequestID></Response>"""
EC2_INSTANCE_STATUS = """<?xml version="1.0" encoding="UTF-8"?>
<DescribeInstanceStatusResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
@ -405,4 +392,4 @@ EC2_INSTANCE_STATUS = """<?xml version="1.0" encoding="UTF-8"?>
</item>
{% endfor %}
</instanceStatusSet>
</DescribeInstanceStatusResponse>"""
</DescribeInstanceStatusResponse>"""

View File

@ -1,22 +1,16 @@
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.exceptions import InvalidIdError
from moto.ec2.utils import keypair_names_from_querystring, filters_from_querystring
class KeyPairs(BaseResponse):
def create_key_pair(self):
try:
name = self.querystring.get('KeyName')[0]
keypair = ec2_backend.create_key_pair(name)
except InvalidIdError as exc:
template = Template(CREATE_KEY_PAIR_INVALID_NAME)
return template.render(keypair_id=exc.id), dict(status=400)
else:
template = Template(CREATE_KEY_PAIR_RESPONSE)
return template.render(**keypair)
name = self.querystring.get('KeyName')[0]
keypair = ec2_backend.create_key_pair(name)
template = Template(CREATE_KEY_PAIR_RESPONSE)
return template.render(**keypair)
def delete_key_pair(self):
name = self.querystring.get('KeyName')[0]
@ -29,14 +23,9 @@ class KeyPairs(BaseResponse):
if len(filters) > 0:
raise NotImplementedError('Using filters in KeyPairs.describe_key_pairs is not yet implemented')
try:
keypairs = ec2_backend.describe_key_pairs(names)
except InvalidIdError as exc:
template = Template(CREATE_KEY_PAIR_NOT_FOUND)
return template.render(keypair_id=exc.id), dict(status=400)
else:
template = Template(DESCRIBE_KEY_PAIRS_RESPONSE)
return template.render(keypairs=keypairs)
keypairs = ec2_backend.describe_key_pairs(names)
template = Template(DESCRIBE_KEY_PAIRS_RESPONSE)
return template.render(keypairs=keypairs)
def import_key_pair(self):
raise NotImplementedError('KeyPairs.import_key_pair is not yet implemented')
@ -65,16 +54,6 @@ CREATE_KEY_PAIR_RESPONSE = """<CreateKeyPairResponse xmlns="http://ec2.amazonaws
</CreateKeyPairResponse>"""
CREATE_KEY_PAIR_INVALID_NAME = """<?xml version="1.0" encoding="UTF-8"?>
<Response><Errors><Error><Code>InvalidKeyPair.Duplicate</Code><Message>The keypair '{{ keypair_id }}' already exists.</Message></Error></Errors><RequestID>f4f76e81-8ca5-4e61-a6d5-a4a96EXAMPLE</RequestID></Response>
"""
CREATE_KEY_PAIR_NOT_FOUND = """<?xml version="1.0" encoding="UTF-8"?>
<Response><Errors><Error><Code>InvalidKeyPair.NotFound</Code><Message>The keypair '{{ keypair_id }}' does not exist.</Message></Error></Errors><RequestID>f4f76e81-8ca5-4e61-a6d5-a4a96EXAMPLE</RequestID></Response>
"""
DELETE_KEY_PAIR_RESPONSE = """<DeleteKeyPairResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<return>{{ success }}</return>

View File

@ -44,16 +44,9 @@ class SecurityGroups(BaseResponse):
def create_security_group(self):
name = self.querystring.get('GroupName')[0]
try:
description = self.querystring.get('GroupDescription')[0]
except TypeError:
# No description found, return error
return "The request must contain the parameter GroupDescription", dict(status=400)
description = self.querystring.get('GroupDescription', [None])[0]
vpc_id = self.querystring.get("VpcId", [None])[0]
group = ec2_backend.create_security_group(name, description, vpc_id=vpc_id)
if not group:
# There was an exisitng group
return "There was an existing security group with name {0}".format(name), dict(status=409)
template = Template(CREATE_SECURITY_GROUP_RESPONSE)
return template.render(group=group)
@ -68,10 +61,6 @@ class SecurityGroups(BaseResponse):
elif sg_id:
group = ec2_backend.delete_security_group(group_id=sg_id[0])
# needs name or group now
if not group:
# There was no such group
return "There was no security group with name {0}".format(name), dict(status=404)
return DELETE_GROUP_RESPONSE
def describe_security_groups(self):
@ -83,9 +72,7 @@ class SecurityGroups(BaseResponse):
raise NotImplementedError('SecurityGroups.revoke_security_group_egress is not yet implemented')
def revoke_security_group_ingress(self):
success = ec2_backend.revoke_security_group_ingress(*process_rules_from_querystring(self.querystring))
if not success:
return "Could not find a matching ingress rule", dict(status=404)
ec2_backend.revoke_security_group_ingress(*process_rules_from_querystring(self.querystring))
return REVOKE_SECURITY_GROUP_INGRESS_REPONSE

View File

@ -15,11 +15,8 @@ class Subnets(BaseResponse):
def delete_subnet(self):
subnet_id = self.querystring.get('SubnetId')[0]
subnet = ec2_backend.delete_subnet(subnet_id)
if subnet:
template = Template(DELETE_SUBNET_RESPONSE)
return template.render(subnet=subnet)
else:
return "", dict(status=404)
template = Template(DELETE_SUBNET_RESPONSE)
return template.render(subnet=subnet)
def describe_subnets(self):
subnets = ec2_backend.get_all_subnets()

View File

@ -15,11 +15,8 @@ class VPCPeeringConnections(BaseResponse):
def delete_vpc_peering_connection(self):
vpc_pcx_id = self.querystring.get('VpcPeeringConnectionId')[0]
vpc_pcx = ec2_backend.delete_vpc_peering_connection(vpc_pcx_id)
if vpc_pcx:
template = Template(DELETE_VPC_PEERING_CONNECTION_RESPONSE)
return template.render(vpc_pcx=vpc_pcx)
else:
return "", dict(status=404)
template = Template(DELETE_VPC_PEERING_CONNECTION_RESPONSE)
return template.render(vpc_pcx=vpc_pcx)
def describe_vpc_peering_connections(self):
vpc_pcxs = ec2_backend.get_all_vpc_peering_connections()
@ -29,20 +26,14 @@ class VPCPeeringConnections(BaseResponse):
def accept_vpc_peering_connection(self):
vpc_pcx_id = self.querystring.get('VpcPeeringConnectionId')[0]
vpc_pcx = ec2_backend.accept_vpc_peering_connection(vpc_pcx_id)
if vpc_pcx:
template = Template(ACCEPT_VPC_PEERING_CONNECTION_RESPONSE)
return template.render(vpc_pcx=vpc_pcx)
else:
return "", dict(status=404)
template = Template(ACCEPT_VPC_PEERING_CONNECTION_RESPONSE)
return template.render(vpc_pcx=vpc_pcx)
def reject_vpc_peering_connection(self):
vpc_pcx_id = self.querystring.get('VpcPeeringConnectionId')[0]
vpc_pcx = ec2_backend.reject_vpc_peering_connection(vpc_pcx_id)
if vpc_pcx:
template = Template(REJECT_VPC_PEERING_CONNECTION_RESPONSE)
return template.render()
else:
return "", dict(status=404)
template = Template(REJECT_VPC_PEERING_CONNECTION_RESPONSE)
return template.render()
CREATE_VPC_PEERING_CONNECTION_RESPONSE = """

View File

@ -14,11 +14,8 @@ class VPCs(BaseResponse):
def delete_vpc(self):
vpc_id = self.querystring.get('VpcId')[0]
vpc = ec2_backend.delete_vpc(vpc_id)
if vpc:
template = Template(DELETE_VPC_RESPONSE)
return template.render(vpc=vpc)
else:
return "", dict(status=404)
template = Template(DELETE_VPC_RESPONSE)
return template.render(vpc=vpc)
def describe_vpcs(self):
vpcs = ec2_backend.get_all_vpcs()

View File

@ -0,0 +1,37 @@
"""
Patch courtesy of:
https://marmida.com/blog/index.php/2012/08/08/monkey-patching-assert_raises/
"""
# code for monkey-patching
import nose.tools
# let's fix nose.tools.assert_raises (which is really unittest.assertRaises)
# so that it always supports context management
# in order for these changes to be available to other modules, you'll need
# to guarantee this module is imported by your fixture before either nose or
# unittest are imported
try:
nose.tools.assert_raises(Exception)
except TypeError:
# this version of assert_raises doesn't support the 1-arg version
class AssertRaisesContext(object):
def __init__(self, expected):
self.expected = expected
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, tb):
self.exception = exc_val
nose.tools.assert_equal(exc_type, self.expected)
# if you get to this line, the last assertion must have passed
# suppress the propagation of this exception
return True
def assert_raises_context(exc_type):
return AssertRaisesContext(exc_type)
nose.tools.assert_raises = assert_raises_context

View File

@ -1,3 +1,7 @@
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
from boto.exception import EC2ResponseError
@ -19,7 +23,11 @@ def test_ami_create_and_delete():
success = conn.deregister_image(image)
success.should.be.true
success = conn.deregister_image.when.called_with(image).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.deregister_image(image)
cm.exception.code.should.equal('InvalidAMIID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
@ -46,7 +54,12 @@ def test_ami_tagging():
def test_ami_create_from_missing_instance():
conn = boto.connect_ec2('the_key', 'the_secret')
args = ["i-abcdefg", "test-ami", "this is a test ami"]
conn.create_image.when.called_with(*args).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.create_image(*args)
cm.exception.code.should.equal('InvalidInstanceID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
@ -64,4 +77,10 @@ def test_ami_pulls_attributes_from_instance():
@mock_ec2
def test_getting_missing_ami():
conn = boto.connect_ec2('the_key', 'the_secret')
conn.get_image.when.called_with('ami-missing').should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.get_image('ami-missing')
cm.exception.code.should.equal('InvalidAMIID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none

View File

@ -1,3 +1,7 @@
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
from boto.exception import EC2ResponseError
@ -26,7 +30,11 @@ def test_dhcp_options_associate_invalid_dhcp_id():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
conn.associate_dhcp_options.when.called_with("foo", vpc.id).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.associate_dhcp_options("foo", vpc.id)
cm.exception.code.should.equal('InvalidDhcpOptionID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
@ -35,7 +43,11 @@ def test_dhcp_options_associate_invalid_vpc_id():
conn = boto.connect_vpc('the_key', 'the_secret')
dhcp_options = conn.create_dhcp_options(SAMPLE_DOMAIN_NAME, SAMPLE_NAME_SERVERS)
conn.associate_dhcp_options.when.called_with(dhcp_options.id, "foo").should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.associate_dhcp_options(dhcp_options.id, "foo")
cm.exception.code.should.equal('InvalidVpcID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
@ -49,11 +61,19 @@ def test_dhcp_options_delete_with_vpc():
rval = conn.associate_dhcp_options(dhcp_options_id, vpc.id)
rval.should.be.equal(True)
#conn.delete_dhcp_options(dhcp_options_id)
conn.delete_dhcp_options.when.called_with(dhcp_options_id).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.delete_dhcp_options(dhcp_options_id)
cm.exception.code.should.equal('DependencyViolation')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
vpc.delete()
conn.get_all_dhcp_options.when.called_with([dhcp_options_id]).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.get_all_dhcp_options([dhcp_options_id])
cm.exception.code.should.equal('InvalidDhcpOptionID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
@ -72,8 +92,18 @@ def test_create_dhcp_options_invalid_options():
"""Create invalid dhcp options"""
conn = boto.connect_vpc('the_key', 'the_secret')
servers = ["f", "f", "f", "f", "f"]
conn.create_dhcp_options.when.called_with(ntp_servers=servers).should.throw(EC2ResponseError)
conn.create_dhcp_options.when.called_with(netbios_node_type="0").should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.create_dhcp_options(ntp_servers=servers)
cm.exception.code.should.equal('InvalidParameterValue')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
with assert_raises(EC2ResponseError) as cm:
conn.create_dhcp_options(netbios_node_type="0")
cm.exception.code.should.equal('InvalidParameterValue')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
@ -94,7 +124,11 @@ def test_describe_dhcp_options_invalid_id():
"""get error on invalid dhcp_option_id lookup"""
conn = boto.connect_vpc('the_key', 'the_secret')
conn.get_all_dhcp_options.when.called_with(["1"]).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.get_all_dhcp_options(["1"])
cm.exception.code.should.equal('InvalidDhcpOptionID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
@ -107,7 +141,12 @@ def test_delete_dhcp_options():
dhcp_options.should.be.length_of(1)
conn.delete_dhcp_options(dhcp_option.id) # .should.be.equal(True)
conn.get_all_dhcp_options.when.called_with([dhcp_option.id]).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.get_all_dhcp_options([dhcp_option.id])
cm.exception.code.should.equal('InvalidDhcpOptionID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
@ -115,7 +154,25 @@ def test_delete_dhcp_options_invalid_id():
conn = boto.connect_vpc('the_key', 'the_secret')
conn.create_dhcp_options()
conn.delete_dhcp_options.when.called_with("1").should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.delete_dhcp_options("dopt-abcd1234")
cm.exception.code.should.equal('InvalidDhcpOptionID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_delete_dhcp_options_malformed_id():
conn = boto.connect_vpc('the_key', 'the_secret')
conn.create_dhcp_options()
with assert_raises(EC2ResponseError) as cm:
conn.delete_dhcp_options("foo-abcd1234")
cm.exception.code.should.equal('InvalidDhcpOptionsId.Malformed')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2

View File

@ -1,3 +1,7 @@
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
from boto.exception import EC2ResponseError
import sure # noqa
@ -22,7 +26,11 @@ def test_create_and_delete_volume():
conn.get_all_volumes().should.have.length_of(0)
# Deleting something that was already deleted should throw an error
volume.delete.when.called_with().should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
volume.delete()
cm.exception.code.should.equal('InvalidVolume.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
@ -47,14 +55,23 @@ def test_volume_attach_and_detach():
volume.update()
volume.volume_state().should.equal('available')
volume.attach.when.called_with(
'i-1234abcd', "/dev/sdh").should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm1:
volume.attach('i-1234abcd', "/dev/sdh")
cm1.exception.code.should.equal('InvalidInstanceID.NotFound')
cm1.exception.status.should.equal(400)
cm1.exception.request_id.should_not.be.none
conn.detach_volume.when.called_with(
volume.id, instance.id, "/dev/sdh").should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm2:
conn.detach_volume(volume.id, instance.id, "/dev/sdh")
cm2.exception.code.should.equal('InvalidAttachment.NotFound')
cm2.exception.status.should.equal(400)
cm2.exception.request_id.should_not.be.none
conn.detach_volume.when.called_with(
volume.id, 'i-1234abcd', "/dev/sdh").should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm3:
conn.detach_volume(volume.id, 'i-1234abcd', "/dev/sdh")
cm3.exception.code.should.equal('InvalidInstanceID.NotFound')
cm3.exception.status.should.equal(400)
cm3.exception.request_id.should_not.be.none
@mock_ec2
@ -76,7 +93,11 @@ def test_create_snapshot():
conn.get_all_snapshots().should.have.length_of(1)
# Deleting something that was already deleted should throw an error
snapshot.delete.when.called_with().should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
snapshot.delete()
cm.exception.code.should.equal('InvalidSnapshot.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2

View File

@ -1,4 +1,7 @@
"""Test mocking of Elatic IP Address"""
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
from boto.exception import EC2ResponseError
@ -41,7 +44,11 @@ def test_eip_allocate_invalid_domain():
"""Allocate EIP invalid domain"""
conn = boto.connect_ec2('the_key', 'the_secret')
conn.allocate_address.when.called_with(domain="bogus").should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.allocate_address(domain="bogus")
cm.exception.code.should.equal('InvalidParameterValue')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
@ -54,7 +61,13 @@ def test_eip_associate_classic():
eip = conn.allocate_address()
eip.instance_id.should.be.none
conn.associate_address.when.called_with(public_ip=eip.public_ip).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.associate_address(public_ip=eip.public_ip)
cm.exception.code.should.equal('MissingParameter')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
conn.associate_address(instance_id=instance.id, public_ip=eip.public_ip)
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
eip.instance_id.should.be.equal(instance.id)
@ -77,7 +90,13 @@ def test_eip_associate_vpc():
eip = conn.allocate_address(domain='vpc')
eip.instance_id.should.be.none
conn.associate_address.when.called_with(allocation_id=eip.allocation_id).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.associate_address(allocation_id=eip.allocation_id)
cm.exception.code.should.equal('MissingParameter')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
conn.associate_address(instance_id=instance.id, allocation_id=eip.allocation_id)
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
eip.instance_id.should.be.equal(instance.id)
@ -100,8 +119,15 @@ def test_eip_reassociate():
eip = conn.allocate_address()
conn.associate_address(instance_id=instance.id, public_ip=eip.public_ip)
conn.associate_address.when.called_with(instance_id=instance.id, public_ip=eip.public_ip, allow_reassociation=False).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.associate_address(instance_id=instance.id, public_ip=eip.public_ip, allow_reassociation=False)
cm.exception.code.should.equal('Resource.AlreadyAssociated')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
conn.associate_address.when.called_with(instance_id=instance.id, public_ip=eip.public_ip, allow_reassociation=True).should_not.throw(EC2ResponseError)
eip.release()
eip = None
@ -116,7 +142,12 @@ def test_eip_associate_invalid_args():
instance = reservation.instances[0]
eip = conn.allocate_address()
conn.associate_address.when.called_with(instance_id=instance.id).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.associate_address(instance_id=instance.id)
cm.exception.code.should.equal('MissingParameter')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
instance.terminate()
@ -125,27 +156,47 @@ def test_eip_associate_invalid_args():
def test_eip_disassociate_bogus_association():
"""Disassociate bogus EIP"""
conn = boto.connect_ec2('the_key', 'the_secret')
conn.disassociate_address.when.called_with(association_id="bogus").should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.disassociate_address(association_id="bogus")
cm.exception.code.should.equal('InvalidAssociationID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_eip_release_bogus_eip():
"""Release bogus EIP"""
conn = boto.connect_ec2('the_key', 'the_secret')
conn.release_address.when.called_with(allocation_id="bogus").should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.release_address(allocation_id="bogus")
cm.exception.code.should.equal('InvalidAllocationID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_eip_disassociate_arg_error():
"""Invalid arguments disassociate address"""
conn = boto.connect_ec2('the_key', 'the_secret')
conn.disassociate_address.when.called_with().should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.disassociate_address()
cm.exception.code.should.equal('MissingParameter')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_eip_release_arg_error():
"""Invalid arguments release address"""
conn = boto.connect_ec2('the_key', 'the_secret')
conn.release_address.when.called_with().should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.release_address()
cm.exception.code.should.equal('MissingParameter')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
@ -186,9 +237,12 @@ def test_eip_describe():
@mock_ec2
def test_eip_describe_none():
"""Find nothing when seach for bogus IP"""
"""Error when search for bogus IP"""
conn = boto.connect_ec2('the_key', 'the_secret')
lookup_addresses = conn.get_all_addresses(addresses=["256.256.256.256"])
len(lookup_addresses).should.be.equal(0)
with assert_raises(EC2ResponseError) as cm:
conn.get_all_addresses(addresses=["256.256.256.256"])
cm.exception.code.should.equal('InvalidAddress.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none

View File

@ -1,3 +1,7 @@
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
from boto.exception import EC2ResponseError
import sure # noqa
@ -18,4 +22,9 @@ def test_console_output():
@mock_ec2
def test_console_output_without_instance():
conn = boto.connect_ec2('the_key', 'the_secret')
conn.get_console_output.when.called_with('i-1234abcd').should.throw(Exception)
with assert_raises(EC2ResponseError) as cm:
conn.get_console_output('i-1234abcd')
cm.exception.code.should.equal('InvalidInstanceID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none

View File

@ -1,3 +1,7 @@
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import base64
import boto
@ -72,10 +76,11 @@ def test_get_instances_by_id():
instance_ids.should.equal([instance1.id, instance2.id])
# Call get_all_instances with a bad id should raise an error
conn.get_all_instances.when.called_with(instance_ids=[instance1.id, "i-1234abcd"]).should.throw(
EC2ResponseError,
"The instance ID 'i-1234abcd' does not exist"
)
with assert_raises(EC2ResponseError) as cm:
conn.get_all_instances(instance_ids=[instance1.id, "i-1234abcd"])
cm.exception.code.should.equal('InvalidInstanceID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
@ -274,4 +279,11 @@ def test_describe_instance_status_with_instance_filter():
all_status = conn.get_all_instance_status(instance_ids=[instance.id])
len(all_status).should.equal(1)
all_status[0].id.should.equal(instance.id)
all_status[0].id.should.equal(instance.id)
# Call get_all_instance_status with a bad id should raise an error
with assert_raises(EC2ResponseError) as cm:
conn.get_all_instance_status(instance_ids=[instance.id, "i-1234abcd"])
cm.exception.code.should.equal('InvalidInstanceID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none

View File

@ -1,3 +1,7 @@
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import re
import boto
@ -41,7 +45,12 @@ def test_igw_attach_bad_vpc():
""" internet gateway fail to attach w/ bad vpc """
conn = boto.connect_vpc('the_key', 'the_secret')
igw = conn.create_internet_gateway()
conn.attach_internet_gateway.when.called_with(igw.id, BAD_VPC).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.attach_internet_gateway(igw.id, BAD_VPC)
cm.exception.code.should.equal('InvalidVpcID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_igw_attach_twice():
@ -51,7 +60,12 @@ def test_igw_attach_twice():
vpc1 = conn.create_vpc(VPC_CIDR)
vpc2 = conn.create_vpc(VPC_CIDR)
conn.attach_internet_gateway(igw.id, vpc1.id)
conn.attach_internet_gateway.when.called_with(igw.id, vpc2.id).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.attach_internet_gateway(igw.id, vpc2.id)
cm.exception.code.should.equal('Resource.AlreadyAssociated')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_igw_detach():
@ -65,20 +79,46 @@ def test_igw_detach():
igw.attachments.should.have.length_of(0)
@mock_ec2
def test_igw_detach_bad_vpc():
""" internet gateway fail to detach w/ bad vpc """
def test_igw_detach_wrong_vpc():
""" internet gateway fail to detach w/ wrong vpc """
conn = boto.connect_vpc('the_key', 'the_secret')
igw = conn.create_internet_gateway()
vpc1 = conn.create_vpc(VPC_CIDR)
vpc2 = conn.create_vpc(VPC_CIDR)
conn.attach_internet_gateway(igw.id, vpc1.id)
with assert_raises(EC2ResponseError) as cm:
conn.detach_internet_gateway(igw.id, vpc2.id)
cm.exception.code.should.equal('Gateway.NotAttached')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_igw_detach_invalid_vpc():
""" internet gateway fail to detach w/ invalid vpc """
conn = boto.connect_vpc('the_key', 'the_secret')
igw = conn.create_internet_gateway()
vpc = conn.create_vpc(VPC_CIDR)
conn.attach_internet_gateway(igw.id, vpc.id)
conn.detach_internet_gateway.when.called_with(igw.id, BAD_VPC).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.detach_internet_gateway(igw.id, BAD_VPC)
cm.exception.code.should.equal('Gateway.NotAttached')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_igw_detach_unattached():
""" internet gateway fail to detach unattached """
conn = boto.connect_vpc('the_key', 'the_secret')
igw = conn.create_internet_gateway()
conn.detach_internet_gateway.when.called_with(igw.id, BAD_VPC).should.throw(EC2ResponseError)
vpc = conn.create_vpc(VPC_CIDR)
with assert_raises(EC2ResponseError) as cm:
conn.detach_internet_gateway(igw.id, vpc.id)
cm.exception.code.should.equal('Gateway.NotAttached')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_igw_delete():
@ -98,7 +138,12 @@ def test_igw_delete_attached():
igw = conn.create_internet_gateway()
vpc = conn.create_vpc(VPC_CIDR)
conn.attach_internet_gateway(igw.id, vpc.id)
conn.delete_internet_gateway.when.called_with(igw.id).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.delete_internet_gateway(igw.id)
cm.exception.code.should.equal('DependencyViolation')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_igw_desribe():
@ -112,4 +157,8 @@ def test_igw_desribe():
def test_igw_desribe_bad_id():
""" internet gateway fail to fetch by bad id """
conn = boto.connect_vpc('the_key', 'the_secret')
conn.get_all_internet_gateways.when.called_with([BAD_IGW]).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.get_all_internet_gateways([BAD_IGW])
cm.exception.code.should.equal('InvalidInternetGatewayID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none

View File

@ -1,3 +1,7 @@
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
import sure # noqa
@ -11,6 +15,17 @@ def test_key_pairs_empty():
assert len(conn.get_all_key_pairs()) == 0
@mock_ec2
def test_key_pairs_invalid_id():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as cm:
conn.get_all_key_pairs('foo')
cm.exception.code.should.equal('InvalidKeyPair.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_key_pairs_create():
conn = boto.connect_ec2('the_key', 'the_secret')
@ -42,10 +57,12 @@ def test_key_pairs_create_exist():
kp = conn.create_key_pair('foo')
assert kp.material.startswith('---- BEGIN RSA PRIVATE KEY ----')
assert len(conn.get_all_key_pairs()) == 1
conn.create_key_pair.when.called_with('foo').should.throw(
EC2ResponseError,
"The keypair 'foo' already exists."
)
with assert_raises(EC2ResponseError) as cm:
conn.create_key_pair('foo')
cm.exception.code.should.equal('InvalidKeyPair.Duplicate')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2

View File

@ -1,3 +1,7 @@
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
from boto.exception import EC2ResponseError
import sure # noqa
@ -14,7 +18,11 @@ def test_create_and_describe_security_group():
security_group.description.should.equal('this is a test security group')
# Trying to create another group with the same name should throw an error
conn.create_security_group.when.called_with('test security group', 'this is a test security group').should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.create_security_group('test security group', 'this is a test security group')
cm.exception.code.should.equal('InvalidGroup.Duplicate')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
all_groups = conn.get_all_security_groups()
all_groups.should.have.length_of(1)
@ -24,7 +32,12 @@ def test_create_and_describe_security_group():
@mock_ec2
def test_create_security_group_without_description_raises_error():
conn = boto.connect_ec2('the_key', 'the_secret')
conn.create_security_group.when.called_with('test security group', '').should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.create_security_group('test security group', '')
cm.exception.code.should.equal('MissingParameter')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
@ -39,7 +52,11 @@ def test_create_and_describe_vpc_security_group():
security_group.description.should.equal('this is a test security group')
# Trying to create another group with the same name in the same VPC should throw an error
conn.create_security_group.when.called_with('test security group', 'this is a test security group', vpc_id).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.create_security_group('test security group', 'this is a test security group', vpc_id)
cm.exception.code.should.equal('InvalidGroup.Duplicate')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
all_groups = conn.get_all_security_groups()
@ -74,7 +91,11 @@ def test_deleting_security_groups():
conn.get_all_security_groups().should.have.length_of(2)
# Deleting a group that doesn't exist should throw an error
conn.delete_security_group.when.called_with('foobar').should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.delete_security_group('foobar')
cm.exception.code.should.equal('InvalidGroup.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
# Delete by name
conn.delete_security_group('test2')
@ -108,7 +129,11 @@ def test_authorize_ip_range_and_revoke():
security_group.rules[0].grants[0].cidr_ip.should.equal("123.123.123.123/32")
# Wrong Cidr should throw error
security_group.revoke.when.called_with(ip_protocol="tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.122/32").should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
security_group.revoke(ip_protocol="tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.122/32")
cm.exception.code.should.equal('InvalidPermission.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
# Actually revoke
security_group.revoke(ip_protocol="tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.123/32")
@ -132,7 +157,11 @@ def test_authorize_other_group_and_revoke():
security_group.rules[0].grants[0].group_id.should.equal(other_security_group.id)
# Wrong source group should throw error
security_group.revoke.when.called_with(ip_protocol="tcp", from_port="22", to_port="2222", src_group=wrong_group).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
security_group.revoke(ip_protocol="tcp", from_port="22", to_port="2222", src_group=wrong_group)
cm.exception.code.should.equal('InvalidPermission.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
# Actually revoke
security_group.revoke(ip_protocol="tcp", from_port="22", to_port="2222", src_group=other_security_group)

View File

@ -1,3 +1,7 @@
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
from boto.exception import EC2ResponseError
import sure # noqa
@ -19,8 +23,11 @@ def test_subnets():
all_subnets = conn.get_all_subnets()
all_subnets.should.have.length_of(0)
conn.delete_subnet.when.called_with(
subnet.id).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.delete_subnet(subnet.id)
cm.exception.code.should.equal('InvalidSubnetID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2

View File

@ -1,3 +1,7 @@
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
from boto.exception import EC2ResponseError
import sure # noqa
@ -40,8 +44,11 @@ def test_vpc_peering_connections_accept():
vpc_pcx = conn.accept_vpc_peering_connection(vpc_pcx.id)
vpc_pcx._status.code.should.equal('active')
conn.reject_vpc_peering_connection.when.called_with(
vpc_pcx.id).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.reject_vpc_peering_connection(vpc_pcx.id)
cm.exception.code.should.equal('InvalidStateTransition')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
all_vpc_pcxs = conn.get_all_vpc_peering_connections()
all_vpc_pcxs.should.have.length_of(1)
@ -57,8 +64,11 @@ def test_vpc_peering_connections_reject():
verdict = conn.reject_vpc_peering_connection(vpc_pcx.id)
verdict.should.equal(True)
conn.accept_vpc_peering_connection.when.called_with(
vpc_pcx.id).should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.accept_vpc_peering_connection(vpc_pcx.id)
cm.exception.code.should.equal('InvalidStateTransition')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
all_vpc_pcxs = conn.get_all_vpc_peering_connections()
all_vpc_pcxs.should.have.length_of(1)
@ -77,6 +87,9 @@ def test_vpc_peering_connections_delete():
all_vpc_pcxs = conn.get_all_vpc_peering_connections()
all_vpc_pcxs.should.have.length_of(0)
conn.delete_vpc_peering_connection.when.called_with(
"pcx-1234abcd").should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.delete_vpc_peering_connection("pcx-1234abcd")
cm.exception.code.should.equal('InvalidVpcPeeringConnectionId.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none

View File

@ -1,3 +1,7 @@
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
from boto.exception import EC2ResponseError
import sure # noqa
@ -19,8 +23,11 @@ def test_vpcs():
all_vpcs = conn.get_all_vpcs()
all_vpcs.should.have.length_of(0)
conn.delete_vpc.when.called_with(
"vpc-1234abcd").should.throw(EC2ResponseError)
with assert_raises(EC2ResponseError) as cm:
conn.delete_vpc("vpc-1234abcd")
cm.exception.code.should.equal('InvalidVpcID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2