Add dry_run to a number of EC2 services

This commit is contained in:
rocky4570fft 2016-10-15 23:08:44 +10:00
parent 79fe9df6cc
commit 015c92ac44
26 changed files with 710 additions and 235 deletions

View File

@ -3,6 +3,8 @@ import datetime
import json
import re
from boto.exception import JSONResponseError
from jinja2 import Environment, DictLoader, TemplateNotFound
import six
@ -261,6 +263,10 @@ class BaseResponse(_TemplateEnvironmentMixin):
def request_json(self):
return 'JSON' in self.querystring.get('ContentType', [])
def is_not_dryrun(self, action):
if 'true' in self.querystring.get('DryRun', ['false']):
raise JSONResponseError(400, 'DryRunOperation', body={'message': 'An error occurred (DryRunOperation) when calling the %s operation: Request would have succeeded, but DryRun flag is set' % action})
return True
def metadata_response(request, full_url, headers):
"""

View File

@ -13,24 +13,27 @@ class AmisResponse(BaseResponse):
description = ""
instance_ids = instance_ids_from_querystring(self.querystring)
instance_id = instance_ids[0]
image = self.ec2_backend.create_image(instance_id, name, description)
template = self.response_template(CREATE_IMAGE_RESPONSE)
return template.render(image=image)
if self.is_not_dryrun('CreateImage'):
image = self.ec2_backend.create_image(instance_id, name, description)
template = self.response_template(CREATE_IMAGE_RESPONSE)
return template.render(image=image)
def copy_image(self):
source_image_id = self.querystring.get('SourceImageId')[0]
source_region = self.querystring.get('SourceRegion')[0]
name = self.querystring.get('Name')[0] if self.querystring.get('Name') else None
description = self.querystring.get('Description')[0] if self.querystring.get('Description') else None
image = self.ec2_backend.copy_image(source_image_id, source_region, name, description)
template = self.response_template(COPY_IMAGE_RESPONSE)
return template.render(image=image)
if self.is_not_dryrun('CopyImage'):
image = self.ec2_backend.copy_image(source_image_id, source_region, name, description)
template = self.response_template(COPY_IMAGE_RESPONSE)
return template.render(image=image)
def deregister_image(self):
ami_id = self.querystring.get('ImageId')[0]
success = self.ec2_backend.deregister_image(ami_id)
template = self.response_template(DEREGISTER_IMAGE_RESPONSE)
return template.render(success=str(success).lower())
if self.is_not_dryrun('DeregisterImage'):
success = self.ec2_backend.deregister_image(ami_id)
template = self.response_template(DEREGISTER_IMAGE_RESPONSE)
return template.render(success=str(success).lower())
def describe_images(self):
ami_ids = image_ids_from_querystring(self.querystring)
@ -51,17 +54,20 @@ class AmisResponse(BaseResponse):
operation_type = self.querystring.get('OperationType')[0]
group = self.querystring.get('UserGroup.1', [None])[0]
user_ids = sequence_from_querystring('UserId', self.querystring)
if (operation_type == 'add'):
self.ec2_backend.add_launch_permission(ami_id, user_ids=user_ids, group=group)
elif (operation_type == 'remove'):
self.ec2_backend.remove_launch_permission(ami_id, user_ids=user_ids, group=group)
return MODIFY_IMAGE_ATTRIBUTE_RESPONSE
if self.is_not_dryrun('ModifyImageAttribute'):
if (operation_type == 'add'):
self.ec2_backend.add_launch_permission(ami_id, user_ids=user_ids, group=group)
elif (operation_type == 'remove'):
self.ec2_backend.remove_launch_permission(ami_id, user_ids=user_ids, group=group)
return MODIFY_IMAGE_ATTRIBUTE_RESPONSE
def register_image(self):
raise NotImplementedError('AMIs.register_image is not yet implemented')
if self.is_not_dryrun('RegisterImage'):
raise NotImplementedError('AMIs.register_image is not yet implemented')
def reset_image_attribute(self):
raise NotImplementedError('AMIs.reset_image_attribute is not yet implemented')
if self.is_not_dryrun('ResetImageAttribute'):
raise NotImplementedError('AMIs.reset_image_attribute is not yet implemented')
CREATE_IMAGE_RESPONSE = """<CreateImageResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">

View File

@ -4,45 +4,49 @@ from moto.ec2.utils import filters_from_querystring
class ElasticBlockStore(BaseResponse):
def attach_volume(self):
volume_id = self.querystring.get('VolumeId')[0]
instance_id = self.querystring.get('InstanceId')[0]
device_path = self.querystring.get('Device')[0]
attachment = self.ec2_backend.attach_volume(volume_id, instance_id, device_path)
template = self.response_template(ATTACHED_VOLUME_RESPONSE)
return template.render(attachment=attachment)
if self.is_not_dryrun('AttachVolume'):
attachment = self.ec2_backend.attach_volume(volume_id, instance_id, device_path)
template = self.response_template(ATTACHED_VOLUME_RESPONSE)
return template.render(attachment=attachment)
def copy_snapshot(self):
raise NotImplementedError('ElasticBlockStore.copy_snapshot is not yet implemented')
if self.is_not_dryrun('CopySnapshot'):
raise NotImplementedError('ElasticBlockStore.copy_snapshot is not yet implemented')
def create_snapshot(self):
description = None
if 'Description' in self.querystring:
description = self.querystring.get('Description')[0]
description = self.querystring.get('Description', [None])[0]
volume_id = self.querystring.get('VolumeId')[0]
snapshot = self.ec2_backend.create_snapshot(volume_id, description)
template = self.response_template(CREATE_SNAPSHOT_RESPONSE)
return template.render(snapshot=snapshot)
if self.is_not_dryrun('CreateSnapshot'):
snapshot = self.ec2_backend.create_snapshot(volume_id, description)
template = self.response_template(CREATE_SNAPSHOT_RESPONSE)
return template.render(snapshot=snapshot)
def create_volume(self):
size = self._get_param('Size')
zone = self._get_param('AvailabilityZone')
snapshot_id = self._get_param('SnapshotId')
encrypted = self._get_param('Encrypted') or 'false'
volume = self.ec2_backend.create_volume(size, zone, snapshot_id, encrypted)
template = self.response_template(CREATE_VOLUME_RESPONSE)
return template.render(volume=volume)
size = self.querystring.get('Size', [None])[0]
zone = self.querystring.get('AvailabilityZone', [None])[0]
snapshot_id = self.querystring.get('SnapshotId', [None])[0]
encrypted = self.querystring.get('Encrypted', ['false'])[0]
if self.is_not_dryrun('CreateVolume'):
volume = self.ec2_backend.create_volume(size, zone, snapshot_id, encrypted)
template = self.response_template(CREATE_VOLUME_RESPONSE)
return template.render(volume=volume)
def delete_snapshot(self):
snapshot_id = self.querystring.get('SnapshotId')[0]
self.ec2_backend.delete_snapshot(snapshot_id)
return DELETE_SNAPSHOT_RESPONSE
if self.is_not_dryrun('DeleteSnapshot'):
self.ec2_backend.delete_snapshot(snapshot_id)
return DELETE_SNAPSHOT_RESPONSE
def delete_volume(self):
volume_id = self.querystring.get('VolumeId')[0]
self.ec2_backend.delete_volume(volume_id)
return DELETE_VOLUME_RESPONSE
if self.is_not_dryrun('DeleteVolume'):
self.ec2_backend.delete_volume(volume_id)
return DELETE_VOLUME_RESPONSE
def describe_snapshots(self):
filters = filters_from_querystring(self.querystring)
@ -74,16 +78,18 @@ class ElasticBlockStore(BaseResponse):
volume_id = self.querystring.get('VolumeId')[0]
instance_id = self.querystring.get('InstanceId')[0]
device_path = self.querystring.get('Device')[0]
attachment = self.ec2_backend.detach_volume(volume_id, instance_id, device_path)
template = self.response_template(DETATCH_VOLUME_RESPONSE)
return template.render(attachment=attachment)
if self.is_not_dryrun('DetachVolume'):
attachment = self.ec2_backend.detach_volume(volume_id, instance_id, device_path)
template = self.response_template(DETATCH_VOLUME_RESPONSE)
return template.render(attachment=attachment)
def enable_volume_io(self):
raise NotImplementedError('ElasticBlockStore.enable_volume_io is not yet implemented')
if self.is_not_dryrun('EnableVolumeIO'):
raise NotImplementedError('ElasticBlockStore.enable_volume_io is not yet implemented')
def import_volume(self):
raise NotImplementedError('ElasticBlockStore.import_volume is not yet implemented')
if self.is_not_dryrun('ImportVolume'):
raise NotImplementedError('ElasticBlockStore.import_volume is not yet implemented')
def describe_snapshot_attribute(self):
snapshot_id = self.querystring.get('SnapshotId')[0]
@ -96,17 +102,20 @@ class ElasticBlockStore(BaseResponse):
operation_type = self.querystring.get('OperationType')[0]
group = self.querystring.get('UserGroup.1', [None])[0]
user_id = self.querystring.get('UserId.1', [None])[0]
if (operation_type == 'add'):
self.ec2_backend.add_create_volume_permission(snapshot_id, user_id=user_id, group=group)
elif (operation_type == 'remove'):
self.ec2_backend.remove_create_volume_permission(snapshot_id, user_id=user_id, group=group)
return MODIFY_SNAPSHOT_ATTRIBUTE_RESPONSE
if self.is_not_dryrun('ModifySnapshotAttribute'):
if (operation_type == 'add'):
self.ec2_backend.add_create_volume_permission(snapshot_id, user_id=user_id, group=group)
elif (operation_type == 'remove'):
self.ec2_backend.remove_create_volume_permission(snapshot_id, user_id=user_id, group=group)
return MODIFY_SNAPSHOT_ATTRIBUTE_RESPONSE
def modify_volume_attribute(self):
raise NotImplementedError('ElasticBlockStore.modify_volume_attribute is not yet implemented')
if self.is_not_dryrun('ModifyVolumeAttribute'):
raise NotImplementedError('ElasticBlockStore.modify_volume_attribute is not yet implemented')
def reset_snapshot_attribute(self):
raise NotImplementedError('ElasticBlockStore.reset_snapshot_attribute is not yet implemented')
if self.is_not_dryrun('ResetSnapshotAttribute'):
raise NotImplementedError('ElasticBlockStore.reset_snapshot_attribute is not yet implemented')
CREATE_VOLUME_RESPONSE = """<CreateVolumeResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
@ -211,12 +220,12 @@ DESCRIBE_SNAPSHOTS_RESPONSE = """<DescribeSnapshotsResponse xmlns="http://ec2.am
{% for snapshot in snapshots %}
<item>
<snapshotId>{{ snapshot.id }}</snapshotId>
<volumeId>{{ snapshot.volume.id }}</volumeId>
<volumeId>{{ snapshot.volume.id }}</volumeId>
<status>{{ snapshot.status }}</status>
<startTime>{{ snapshot.start_time}}</startTime>
<progress>100%</progress>
<ownerId>111122223333</ownerId>
<volumeSize>{{ snapshot.volume.size }}</volumeSize>
<volumeSize>{{ snapshot.volume.size }}</volumeSize>
<description>{{ snapshot.description }}</description>
<encrypted>{{ snapshot.encrypted }}</encrypted>
<tagSet>
@ -263,4 +272,4 @@ MODIFY_SNAPSHOT_ATTRIBUTE_RESPONSE = """
<requestId>666d2944-9276-4d6a-be12-1f4ada972fd8</requestId>
<return>true</return>
</ModifySnapshotAttributeResponse>
"""
"""

View File

@ -9,9 +9,10 @@ class ElasticIPAddresses(BaseResponse):
domain = self.querystring.get('Domain')[0]
else:
domain = "standard"
address = self.ec2_backend.allocate_address(domain)
template = self.response_template(ALLOCATE_ADDRESS_RESPONSE)
return template.render(address=address)
if self.is_not_dryrun('AllocateAddress'):
address = self.ec2_backend.allocate_address(domain)
template = self.response_template(ALLOCATE_ADDRESS_RESPONSE)
return template.render(address=address)
def associate_address(self):
instance = eni = None
@ -27,18 +28,19 @@ class ElasticIPAddresses(BaseResponse):
if "AllowReassociation" in self.querystring:
reassociate = self.querystring['AllowReassociation'][0] == "true"
if instance or eni:
if "PublicIp" in self.querystring:
eip = self.ec2_backend.associate_address(instance=instance, eni=eni, address=self.querystring['PublicIp'][0], reassociate=reassociate)
elif "AllocationId" in self.querystring:
eip = self.ec2_backend.associate_address(instance=instance, eni=eni, allocation_id=self.querystring['AllocationId'][0], reassociate=reassociate)
if self.is_not_dryrun('AssociateAddress'):
if instance or eni:
if "PublicIp" in self.querystring:
eip = self.ec2_backend.associate_address(instance=instance, eni=eni, address=self.querystring['PublicIp'][0], reassociate=reassociate)
elif "AllocationId" in self.querystring:
eip = self.ec2_backend.associate_address(instance=instance, eni=eni, allocation_id=self.querystring['AllocationId'][0], reassociate=reassociate)
else:
self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AllocationId parameter.")
else:
self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AllocationId parameter.")
else:
self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect either instance or ENI.")
self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect either instance or ENI.")
template = self.response_template(ASSOCIATE_ADDRESS_RESPONSE)
return template.render(address=eip)
template = self.response_template(ASSOCIATE_ADDRESS_RESPONSE)
return template.render(address=eip)
def describe_addresses(self):
template = self.response_template(DESCRIBE_ADDRESS_RESPONSE)
@ -61,24 +63,26 @@ class ElasticIPAddresses(BaseResponse):
return template.render(addresses=addresses)
def disassociate_address(self):
if "PublicIp" in self.querystring:
self.ec2_backend.disassociate_address(address=self.querystring['PublicIp'][0])
elif "AssociationId" in self.querystring:
self.ec2_backend.disassociate_address(association_id=self.querystring['AssociationId'][0])
else:
self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AssociationId parameter.")
if self.is_not_dryrun('DisAssociateAddress'):
if "PublicIp" in self.querystring:
self.ec2_backend.disassociate_address(address=self.querystring['PublicIp'][0])
elif "AssociationId" in self.querystring:
self.ec2_backend.disassociate_address(association_id=self.querystring['AssociationId'][0])
else:
self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AssociationId parameter.")
return self.response_template(DISASSOCIATE_ADDRESS_RESPONSE).render()
return self.response_template(DISASSOCIATE_ADDRESS_RESPONSE).render()
def release_address(self):
if "PublicIp" in self.querystring:
self.ec2_backend.release_address(address=self.querystring['PublicIp'][0])
elif "AllocationId" in self.querystring:
self.ec2_backend.release_address(allocation_id=self.querystring['AllocationId'][0])
else:
self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AllocationId parameter.")
if self.is_not_dryrun('ReleaseAddress'):
if "PublicIp" in self.querystring:
self.ec2_backend.release_address(address=self.querystring['PublicIp'][0])
elif "AllocationId" in self.querystring:
self.ec2_backend.release_address(allocation_id=self.querystring['AllocationId'][0])
else:
self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AllocationId parameter.")
return self.response_template(RELEASE_ADDRESS_RESPONSE).render()
return self.response_template(RELEASE_ADDRESS_RESPONSE).render()
ALLOCATE_ADDRESS_RESPONSE = """<AllocateAddressResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">

View File

@ -9,15 +9,17 @@ class ElasticNetworkInterfaces(BaseResponse):
private_ip_address = self.querystring.get('PrivateIpAddress', [None])[0]
groups = sequence_from_querystring('SecurityGroupId', self.querystring)
subnet = self.ec2_backend.get_subnet(subnet_id)
eni = self.ec2_backend.create_network_interface(subnet, private_ip_address, groups)
template = self.response_template(CREATE_NETWORK_INTERFACE_RESPONSE)
return template.render(eni=eni)
if self.is_not_dryrun('CreateNetworkInterface'):
eni = self.ec2_backend.create_network_interface(subnet, private_ip_address, groups)
template = self.response_template(CREATE_NETWORK_INTERFACE_RESPONSE)
return template.render(eni=eni)
def delete_network_interface(self):
eni_id = self.querystring.get('NetworkInterfaceId')[0]
self.ec2_backend.delete_network_interface(eni_id)
template = self.response_template(DELETE_NETWORK_INTERFACE_RESPONSE)
return template.render()
if self.is_not_dryrun('DeleteNetworkInterface'):
self.ec2_backend.delete_network_interface(eni_id)
template = self.response_template(DELETE_NETWORK_INTERFACE_RESPONSE)
return template.render()
def describe_network_interface_attribute(self):
raise NotImplementedError('ElasticNetworkInterfaces(AmazonVPC).describe_network_interface_attribute is not yet implemented')
@ -33,25 +35,29 @@ class ElasticNetworkInterfaces(BaseResponse):
eni_id = self.querystring.get('NetworkInterfaceId')[0]
instance_id = self.querystring.get('InstanceId')[0]
device_index = self.querystring.get('DeviceIndex')[0]
attachment_id = self.ec2_backend.attach_network_interface(eni_id, instance_id, device_index)
template = self.response_template(ATTACH_NETWORK_INTERFACE_RESPONSE)
return template.render(attachment_id=attachment_id)
if self.is_not_dryrun('AttachNetworkInterface'):
attachment_id = self.ec2_backend.attach_network_interface(eni_id, instance_id, device_index)
template = self.response_template(ATTACH_NETWORK_INTERFACE_RESPONSE)
return template.render(attachment_id=attachment_id)
def detach_network_interface(self):
attachment_id = self.querystring.get('AttachmentId')[0]
self.ec2_backend.detach_network_interface(attachment_id)
template = self.response_template(DETACH_NETWORK_INTERFACE_RESPONSE)
return template.render()
if self.is_not_dryrun('DetachNetworkInterface'):
self.ec2_backend.detach_network_interface(attachment_id)
template = self.response_template(DETACH_NETWORK_INTERFACE_RESPONSE)
return template.render()
def modify_network_interface_attribute(self):
# Currently supports modifying one and only one security group
eni_id = self.querystring.get('NetworkInterfaceId')[0]
group_id = self.querystring.get('SecurityGroupId.1')[0]
self.ec2_backend.modify_network_interface_attribute(eni_id, group_id)
return MODIFY_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE
if self.is_not_dryrun('ModifyNetworkInterface'):
self.ec2_backend.modify_network_interface_attribute(eni_id, group_id)
return MODIFY_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE
def reset_network_interface_attribute(self):
raise NotImplementedError('ElasticNetworkInterfaces(AmazonVPC).reset_network_interface_attribute is not yet implemented')
if self.is_not_dryrun('ResetNetworkInterface'):
raise NotImplementedError('ElasticNetworkInterfaces(AmazonVPC).reset_network_interface_attribute is not yet implemented')
CREATE_NETWORK_INTERFACE_RESPONSE = """
<CreateNetworkInterfaceResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">

View File

@ -1,11 +1,11 @@
from __future__ import unicode_literals
from boto.ec2.instancetype import InstanceType
from boto.exception import JSONResponseError
from moto.core.responses import BaseResponse
from moto.core.utils import camelcase_to_underscores
from moto.ec2.utils import instance_ids_from_querystring, filters_from_querystring, \
dict_from_querystring, optional_from_querystring
class InstanceResponse(BaseResponse):
def describe_instances(self):
filter_dict = filters_from_querystring(self.querystring)
@ -32,38 +32,43 @@ class InstanceResponse(BaseResponse):
associate_public_ip = self.querystring.get("AssociatePublicIpAddress", [None])[0]
key_name = self.querystring.get("KeyName", [None])[0]
new_reservation = self.ec2_backend.add_instances(
image_id, min_count, user_data, security_group_names,
instance_type=instance_type, placement=placement, subnet_id=subnet_id,
key_name=key_name, security_group_ids=security_group_ids,
nics=nics, private_ip=private_ip, associate_public_ip=associate_public_ip)
if self.is_not_dryrun('RunInstance'):
new_reservation = self.ec2_backend.add_instances(
image_id, min_count, user_data, security_group_names,
instance_type=instance_type, placement=placement, subnet_id=subnet_id,
key_name=key_name, security_group_ids=security_group_ids,
nics=nics, private_ip=private_ip, associate_public_ip=associate_public_ip)
template = self.response_template(EC2_RUN_INSTANCES)
return template.render(reservation=new_reservation)
template = self.response_template(EC2_RUN_INSTANCES)
return template.render(reservation=new_reservation)
def terminate_instances(self):
instance_ids = instance_ids_from_querystring(self.querystring)
instances = self.ec2_backend.terminate_instances(instance_ids)
template = self.response_template(EC2_TERMINATE_INSTANCES)
return template.render(instances=instances)
if self.is_not_dryrun('TerminateInstance'):
instances = self.ec2_backend.terminate_instances(instance_ids)
template = self.response_template(EC2_TERMINATE_INSTANCES)
return template.render(instances=instances)
def reboot_instances(self):
instance_ids = instance_ids_from_querystring(self.querystring)
instances = self.ec2_backend.reboot_instances(instance_ids)
template = self.response_template(EC2_REBOOT_INSTANCES)
return template.render(instances=instances)
if self.is_not_dryrun('RebootInstance'):
instances = self.ec2_backend.reboot_instances(instance_ids)
template = self.response_template(EC2_REBOOT_INSTANCES)
return template.render(instances=instances)
def stop_instances(self):
instance_ids = instance_ids_from_querystring(self.querystring)
instances = self.ec2_backend.stop_instances(instance_ids)
template = self.response_template(EC2_STOP_INSTANCES)
return template.render(instances=instances)
if self.is_not_dryrun('StopInstance'):
instances = self.ec2_backend.stop_instances(instance_ids)
template = self.response_template(EC2_STOP_INSTANCES)
return template.render(instances=instances)
def start_instances(self):
instance_ids = instance_ids_from_querystring(self.querystring)
instances = self.ec2_backend.start_instances(instance_ids)
template = self.response_template(EC2_START_INSTANCES)
return template.render(instances=instances)
if self.is_not_dryrun('StartInstance'):
instances = self.ec2_backend.start_instances(instance_ids)
template = self.response_template(EC2_START_INSTANCES)
return template.render(instances=instances)
def describe_instance_status(self):
instance_ids = instance_ids_from_querystring(self.querystring)
@ -133,7 +138,6 @@ class InstanceResponse(BaseResponse):
mapping_counter = 1
mapping_device_name_fmt = 'BlockDeviceMapping.%s.DeviceName'
mapping_del_on_term_fmt = 'BlockDeviceMapping.%s.Ebs.DeleteOnTermination'
while True:
mapping_device_name = mapping_device_name_fmt % mapping_counter
if mapping_device_name not in self.querystring.keys():
@ -148,14 +152,15 @@ class InstanceResponse(BaseResponse):
instance_id = instance_ids[0]
instance = self.ec2_backend.get_instance(instance_id)
block_device_type = instance.block_device_mapping[device_name_value]
block_device_type.delete_on_termination = del_on_term_value
if self.is_not_dryrun('ModifyInstanceAttribute'):
block_device_type = instance.block_device_mapping[device_name_value]
block_device_type.delete_on_termination = del_on_term_value
# +1 for the next device
mapping_counter += 1
# +1 for the next device
mapping_counter += 1
if mapping_counter > 1:
return EC2_MODIFY_INSTANCE_ATTRIBUTE
if mapping_counter > 1:
return EC2_MODIFY_INSTANCE_ATTRIBUTE
def _dot_value_instance_attribute_handler(self):
attribute_key = None
@ -167,23 +172,25 @@ class InstanceResponse(BaseResponse):
if not attribute_key:
return
value = self.querystring.get(attribute_key)[0]
normalized_attribute = camelcase_to_underscores(attribute_key.split(".")[0])
instance_ids = instance_ids_from_querystring(self.querystring)
instance_id = instance_ids[0]
self.ec2_backend.modify_instance_attribute(instance_id, normalized_attribute, value)
return EC2_MODIFY_INSTANCE_ATTRIBUTE
if self.is_not_dryrun('Modify'+attribute_key.split(".")[0]):
value = self.querystring.get(attribute_key)[0]
normalized_attribute = camelcase_to_underscores(attribute_key.split(".")[0])
instance_ids = instance_ids_from_querystring(self.querystring)
instance_id = instance_ids[0]
self.ec2_backend.modify_instance_attribute(instance_id, normalized_attribute, value)
return EC2_MODIFY_INSTANCE_ATTRIBUTE
def _security_grp_instance_attribute_handler(self):
new_security_grp_list = []
for key, value in self.querystring.items():
if 'GroupId.' in key:
if 'GroupId.' in key:
new_security_grp_list.append(self.querystring.get(key)[0])
instance_ids = instance_ids_from_querystring(self.querystring)
instance_id = instance_ids[0]
self.ec2_backend.modify_instance_security_groups(instance_id, new_security_grp_list)
return EC2_MODIFY_INSTANCE_ATTRIBUTE
if self.is_not_dryrun('ModifyInstanceSecurityGroups'):
self.ec2_backend.modify_instance_security_groups(instance_id, new_security_grp_list)
return EC2_MODIFY_INSTANCE_ATTRIBUTE
EC2_RUN_INSTANCES = """<RunInstancesResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
@ -204,7 +211,7 @@ EC2_RUN_INSTANCES = """<RunInstancesResponse xmlns="http://ec2.amazonaws.com/doc
<instanceState>
<code>0</code>
<name>pending</name>
</instanceState>
</instanceState>
<privateDnsName>{{ instance.private_dns }}</privateDnsName>
<publicDnsName>{{ instance.public_dns }}</publicDnsName>
<reason/>
@ -315,7 +322,7 @@ EC2_DESCRIBE_INSTANCES = """<DescribeInstancesResponse xmlns="http://ec2.amazona
<groupSet>
{% for group in reservation.dynamic_group_list %}
<item>
{% if group.id %}
{% if group.id %}
<groupId>{{ group.id }}</groupId>
<groupName>{{ group.name }}</groupName>
{% else %}
@ -366,12 +373,12 @@ EC2_DESCRIBE_INSTANCES = """<DescribeInstancesResponse xmlns="http://ec2.amazona
<groupSet>
{% for group in instance.dynamic_group_list %}
<item>
{% if group.id %}
<groupId>{{ group.id }}</groupId>
<groupName>{{ group.name }}</groupName>
{% else %}
<groupId>{{ group }}</groupId>
{% endif %}
{% if group.id %}
<groupId>{{ group.id }}</groupId>
<groupName>{{ group.name }}</groupName>
{% else %}
<groupId>{{ group }}</groupId>
{% endif %}
</item>
{% endfor %}
</groupSet>
@ -395,7 +402,7 @@ EC2_DESCRIBE_INSTANCES = """<DescribeInstancesResponse xmlns="http://ec2.amazona
<size>{{deviceobject.size}}</size>
</ebs>
</item>
{% endfor %}
{% endfor %}
</blockDeviceMapping>
<virtualizationType>{{ instance.virtualization_type }}</virtualizationType>
<clientToken>ABCDE1234567890123</clientToken>
@ -427,12 +434,12 @@ EC2_DESCRIBE_INSTANCES = """<DescribeInstancesResponse xmlns="http://ec2.amazona
<groupSet>
{% for group in nic.group_set %}
<item>
{% if group.id %}
<groupId>{{ group.id }}</groupId>
<groupName>{{ group.name }}</groupName>
{% else %}
<groupId>{{ group }}</groupId>
{% endif %}
{% if group.id %}
<groupId>{{ group.id }}</groupId>
<groupName>{{ group.name }}</groupName>
{% else %}
<groupId>{{ group }}</groupId>
{% endif %}
</item>
{% endfor %}
</groupSet>
@ -622,4 +629,4 @@ EC2_DESCRIBE_INSTANCE_TYPES = """<?xml version="1.0" encoding="UTF-8"?>
</item>
{% endfor %}
</instanceTypeSet>
</DescribeInstanceTypesResponse>"""
</DescribeInstanceTypesResponse>"""

View File

@ -10,20 +10,23 @@ class InternetGateways(BaseResponse):
def attach_internet_gateway(self):
igw_id = self.querystring.get("InternetGatewayId", [None])[0]
vpc_id = self.querystring.get("VpcId", [None])[0]
self.ec2_backend.attach_internet_gateway(igw_id, vpc_id)
template = self.response_template(ATTACH_INTERNET_GATEWAY_RESPONSE)
return template.render()
if self.is_not_dryrun('AttachInternetGateway'):
self.ec2_backend.attach_internet_gateway(igw_id, vpc_id)
template = self.response_template(ATTACH_INTERNET_GATEWAY_RESPONSE)
return template.render()
def create_internet_gateway(self):
igw = self.ec2_backend.create_internet_gateway()
template = self.response_template(CREATE_INTERNET_GATEWAY_RESPONSE)
return template.render(internet_gateway=igw)
if self.is_not_dryrun('CreateInternetGateway'):
igw = self.ec2_backend.create_internet_gateway()
template = self.response_template(CREATE_INTERNET_GATEWAY_RESPONSE)
return template.render(internet_gateway=igw)
def delete_internet_gateway(self):
igw_id = self.querystring.get("InternetGatewayId", [None])[0]
self.ec2_backend.delete_internet_gateway(igw_id)
template = self.response_template(DELETE_INTERNET_GATEWAY_RESPONSE)
return template.render()
if self.is_not_dryrun('DeleteInternetGateway'):
self.ec2_backend.delete_internet_gateway(igw_id)
template = self.response_template(DELETE_INTERNET_GATEWAY_RESPONSE)
return template.render()
def describe_internet_gateways(self):
filter_dict = filters_from_querystring(self.querystring)
@ -42,9 +45,10 @@ class InternetGateways(BaseResponse):
# raise else DependencyViolationError()
igw_id = self.querystring.get("InternetGatewayId", [None])[0]
vpc_id = self.querystring.get("VpcId", [None])[0]
self.ec2_backend.detach_internet_gateway(igw_id, vpc_id)
template = self.response_template(DETACH_INTERNET_GATEWAY_RESPONSE)
return template.render()
if self.is_not_dryrun('DetachInternetGateway'):
self.ec2_backend.detach_internet_gateway(igw_id, vpc_id)
template = self.response_template(DETACH_INTERNET_GATEWAY_RESPONSE)
return template.render()
ATTACH_INTERNET_GATEWAY_RESPONSE = u"""<AttachInternetGatewayResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">

View File

@ -1,10 +1,12 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from moto.core.responses import BaseResponse, JSONResponseError
class IPAddresses(BaseResponse):
def assign_private_ip_addresses(self):
raise NotImplementedError('IPAddresses.assign_private_ip_addresses is not yet implemented')
if self.is_not_dryrun('AssignPrivateIPAddress'):
raise NotImplementedError('IPAddresses.assign_private_ip_addresses is not yet implemented')
def unassign_private_ip_addresses(self):
raise NotImplementedError('IPAddresses.unassign_private_ip_addresses is not yet implemented')
if self.is_not_dryrun('UnAssignPrivateIPAddress'):
raise NotImplementedError('IPAddresses.unassign_private_ip_addresses is not yet implemented')

View File

@ -8,14 +8,16 @@ class KeyPairs(BaseResponse):
def create_key_pair(self):
name = self.querystring.get('KeyName')[0]
keypair = self.ec2_backend.create_key_pair(name)
template = self.response_template(CREATE_KEY_PAIR_RESPONSE)
return template.render(**keypair)
if self.is_not_dryrun('CreateKeyPair'):
keypair = self.ec2_backend.create_key_pair(name)
template = self.response_template(CREATE_KEY_PAIR_RESPONSE)
return template.render(**keypair)
def delete_key_pair(self):
name = self.querystring.get('KeyName')[0]
success = six.text_type(self.ec2_backend.delete_key_pair(name)).lower()
return self.response_template(DELETE_KEY_PAIR_RESPONSE).render(success=success)
if self.is_not_dryrun('DeleteKeyPair'):
success = six.text_type(self.ec2_backend.delete_key_pair(name)).lower()
return self.response_template(DELETE_KEY_PAIR_RESPONSE).render(success=success)
def describe_key_pairs(self):
names = keypair_names_from_querystring(self.querystring)
@ -30,9 +32,10 @@ class KeyPairs(BaseResponse):
def import_key_pair(self):
name = self.querystring.get('KeyName')[0]
material = self.querystring.get('PublicKeyMaterial')[0]
keypair = self.ec2_backend.import_key_pair(name, material)
template = self.response_template(IMPORT_KEYPAIR_RESPONSE)
return template.render(**keypair)
if self.is_not_dryrun('ImportKeyPair'):
keypair = self.ec2_backend.import_key_pair(name, material)
template = self.response_template(IMPORT_KEYPAIR_RESPONSE)
return template.render(**keypair)
DESCRIBE_KEY_PAIRS_RESPONSE = """<DescribeKeyPairsResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">

View File

@ -4,7 +4,9 @@ from moto.core.responses import BaseResponse
class Monitoring(BaseResponse):
def monitor_instances(self):
raise NotImplementedError('Monitoring.monitor_instances is not yet implemented')
if self.is_not_dryrun('MonitorInstances'):
raise NotImplementedError('Monitoring.monitor_instances is not yet implemented')
def unmonitor_instances(self):
raise NotImplementedError('Monitoring.unmonitor_instances is not yet implemented')
if self.is_not_dryrun('UnMonitorInstances'):
raise NotImplementedError('Monitoring.unmonitor_instances is not yet implemented')

View File

@ -4,10 +4,12 @@ from moto.core.responses import BaseResponse
class PlacementGroups(BaseResponse):
def create_placement_group(self):
raise NotImplementedError('PlacementGroups.create_placement_group is not yet implemented')
if self.is_not_dryrun('CreatePlacementGroup'):
raise NotImplementedError('PlacementGroups.create_placement_group is not yet implemented')
def delete_placement_group(self):
raise NotImplementedError('PlacementGroups.delete_placement_group is not yet implemented')
if self.is_not_dryrun('DeletePlacementGroup'):
raise NotImplementedError('PlacementGroups.delete_placement_group is not yet implemented')
def describe_placement_groups(self):
raise NotImplementedError('PlacementGroups.describe_placement_groups is not yet implemented')

View File

@ -4,10 +4,12 @@ from moto.core.responses import BaseResponse
class ReservedInstances(BaseResponse):
def cancel_reserved_instances_listing(self):
raise NotImplementedError('ReservedInstances.cancel_reserved_instances_listing is not yet implemented')
if self.is_not_dryrun('CancelReservedInstances'):
raise NotImplementedError('ReservedInstances.cancel_reserved_instances_listing is not yet implemented')
def create_reserved_instances_listing(self):
raise NotImplementedError('ReservedInstances.create_reserved_instances_listing is not yet implemented')
if self.is_not_dryrun('CreateReservedInstances'):
raise NotImplementedError('ReservedInstances.create_reserved_instances_listing is not yet implemented')
def describe_reserved_instances(self):
raise NotImplementedError('ReservedInstances.describe_reserved_instances is not yet implemented')
@ -19,4 +21,5 @@ class ReservedInstances(BaseResponse):
raise NotImplementedError('ReservedInstances.describe_reserved_instances_offerings is not yet implemented')
def purchase_reserved_instances_offering(self):
raise NotImplementedError('ReservedInstances.purchase_reserved_instances_offering is not yet implemented')
if self.is_not_dryrun('PurchaseReservedInstances'):
raise NotImplementedError('ReservedInstances.purchase_reserved_instances_offering is not yet implemented')

View File

@ -31,20 +31,24 @@ def process_rules_from_querystring(querystring):
class SecurityGroups(BaseResponse):
def authorize_security_group_egress(self):
self.ec2_backend.authorize_security_group_egress(*process_rules_from_querystring(self.querystring))
return AUTHORIZE_SECURITY_GROUP_EGRESS_RESPONSE
if self.is_not_dryrun('GrantSecurityGroupEgress'):
self.ec2_backend.authorize_security_group_egress(*process_rules_from_querystring(self.querystring))
return AUTHORIZE_SECURITY_GROUP_EGRESS_RESPONSE
def authorize_security_group_ingress(self):
self.ec2_backend.authorize_security_group_ingress(*process_rules_from_querystring(self.querystring))
return AUTHORIZE_SECURITY_GROUP_INGRESS_REPONSE
if self.is_not_dryrun('GrantSecurityGroupIngress'):
self.ec2_backend.authorize_security_group_ingress(*process_rules_from_querystring(self.querystring))
return AUTHORIZE_SECURITY_GROUP_INGRESS_REPONSE
def create_security_group(self):
name = self.querystring.get('GroupName')[0]
description = self.querystring.get('GroupDescription', [None])[0]
vpc_id = self.querystring.get("VpcId", [None])[0]
group = self.ec2_backend.create_security_group(name, description, vpc_id=vpc_id)
template = self.response_template(CREATE_SECURITY_GROUP_RESPONSE)
return template.render(group=group)
if self.is_not_dryrun('CreateSecurityGroup'):
group = self.ec2_backend.create_security_group(name, description, vpc_id=vpc_id)
template = self.response_template(CREATE_SECURITY_GROUP_RESPONSE)
return template.render(group=group)
def delete_security_group(self):
# TODO this should raise an error if there are instances in the group. See http://docs.aws.amazon.com/AWSEC2/latest/APIReference/ApiReference-query-DeleteSecurityGroup.html
@ -52,12 +56,13 @@ class SecurityGroups(BaseResponse):
name = self.querystring.get('GroupName')
sg_id = self.querystring.get('GroupId')
if name:
self.ec2_backend.delete_security_group(name[0])
elif sg_id:
self.ec2_backend.delete_security_group(group_id=sg_id[0])
if self.is_not_dryrun('DeleteSecurityGroup'):
if name:
self.ec2_backend.delete_security_group(name[0])
elif sg_id:
self.ec2_backend.delete_security_group(group_id=sg_id[0])
return DELETE_GROUP_RESPONSE
return DELETE_GROUP_RESPONSE
def describe_security_groups(self):
groupnames = self._get_multi_param("GroupName")
@ -74,14 +79,16 @@ class SecurityGroups(BaseResponse):
return template.render(groups=groups)
def revoke_security_group_egress(self):
success = self.ec2_backend.revoke_security_group_egress(*process_rules_from_querystring(self.querystring))
if not success:
return "Could not find a matching egress rule", dict(status=404)
return REVOKE_SECURITY_GROUP_EGRESS_RESPONSE
if self.is_not_dryrun('RevokeSecurityGroupEgress'):
success = self.ec2_backend.revoke_security_group_egress(*process_rules_from_querystring(self.querystring))
if not success:
return "Could not find a matching egress rule", dict(status=404)
return REVOKE_SECURITY_GROUP_EGRESS_RESPONSE
def revoke_security_group_ingress(self):
self.ec2_backend.revoke_security_group_ingress(*process_rules_from_querystring(self.querystring))
return REVOKE_SECURITY_GROUP_INGRESS_REPONSE
if self.is_not_dryrun('RevokeSecurityGroupIngress'):
self.ec2_backend.revoke_security_group_ingress(*process_rules_from_querystring(self.querystring))
return REVOKE_SECURITY_GROUP_INGRESS_REPONSE
CREATE_SECURITY_GROUP_RESPONSE = """<CreateSecurityGroupResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">

View File

@ -7,15 +7,18 @@ class SpotInstances(BaseResponse):
def cancel_spot_instance_requests(self):
request_ids = self._get_multi_param('SpotInstanceRequestId')
requests = self.ec2_backend.cancel_spot_instance_requests(request_ids)
template = self.response_template(CANCEL_SPOT_INSTANCES_TEMPLATE)
return template.render(requests=requests)
if self.is_not_dryrun('CancelSpotInstance'):
requests = self.ec2_backend.cancel_spot_instance_requests(request_ids)
template = self.response_template(CANCEL_SPOT_INSTANCES_TEMPLATE)
return template.render(requests=requests)
def create_spot_datafeed_subscription(self):
raise NotImplementedError('SpotInstances.create_spot_datafeed_subscription is not yet implemented')
if self.is_not_dryrun('CreateSpotDatafeedSubscription'):
raise NotImplementedError('SpotInstances.create_spot_datafeed_subscription is not yet implemented')
def delete_spot_datafeed_subscription(self):
raise NotImplementedError('SpotInstances.delete_spot_datafeed_subscription is not yet implemented')
if self.is_not_dryrun('DeleteSpotDatafeedSubscription'):
raise NotImplementedError('SpotInstances.delete_spot_datafeed_subscription is not yet implemented')
def describe_spot_datafeed_subscription(self):
raise NotImplementedError('SpotInstances.describe_spot_datafeed_subscription is not yet implemented')
@ -48,28 +51,29 @@ class SpotInstances(BaseResponse):
monitoring_enabled = self._get_param('LaunchSpecification.Monitoring.Enabled')
subnet_id = self._get_param('LaunchSpecification.SubnetId')
requests = self.ec2_backend.request_spot_instances(
price=price,
image_id=image_id,
count=count,
type=type,
valid_from=valid_from,
valid_until=valid_until,
launch_group=launch_group,
availability_zone_group=availability_zone_group,
key_name=key_name,
security_groups=security_groups,
user_data=user_data,
instance_type=instance_type,
placement=placement,
kernel_id=kernel_id,
ramdisk_id=ramdisk_id,
monitoring_enabled=monitoring_enabled,
subnet_id=subnet_id,
)
if self.is_not_dryrun('RequestSpotInstance'):
requests = self.ec2_backend.request_spot_instances(
price=price,
image_id=image_id,
count=count,
type=type,
valid_from=valid_from,
valid_until=valid_until,
launch_group=launch_group,
availability_zone_group=availability_zone_group,
key_name=key_name,
security_groups=security_groups,
user_data=user_data,
instance_type=instance_type,
placement=placement,
kernel_id=kernel_id,
ramdisk_id=ramdisk_id,
monitoring_enabled=monitoring_enabled,
subnet_id=subnet_id,
)
template = self.response_template(REQUEST_SPOT_INSTANCES_TEMPLATE)
return template.render(requests=requests)
template = self.response_template(REQUEST_SPOT_INSTANCES_TEMPLATE)
return template.render(requests=requests)
REQUEST_SPOT_INSTANCES_TEMPLATE = """<RequestSpotInstancesResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">

View File

@ -12,15 +12,17 @@ class TagResponse(BaseResponse):
validate_resource_ids(resource_ids)
self.ec2_backend.do_resources_exist(resource_ids)
tags = tags_from_query_string(self.querystring)
self.ec2_backend.create_tags(resource_ids, tags)
return CREATE_RESPONSE
if self.is_not_dryrun('CreateTags'):
self.ec2_backend.create_tags(resource_ids, tags)
return CREATE_RESPONSE
def delete_tags(self):
resource_ids = sequence_from_querystring('ResourceId', self.querystring)
validate_resource_ids(resource_ids)
tags = tags_from_query_string(self.querystring)
self.ec2_backend.delete_tags(resource_ids, tags)
return DELETE_RESPONSE
if self.is_not_dryrun('DeleteTags'):
self.ec2_backend.delete_tags(resource_ids, tags)
return DELETE_RESPONSE
def describe_tags(self):
filters = filters_from_querystring(querystring_dict=self.querystring)

View File

@ -223,6 +223,7 @@ class ELBResponse(BaseResponse):
return template.render(instance_ids=instance_ids)
def add_tags(self):
for key, value in self.querystring.items():
if "LoadBalancerNames.member" in key:
number = key.split('.')[2]

View File

@ -5,7 +5,7 @@ from nose.tools import assert_raises
import boto
import boto.ec2
from boto.exception import EC2ResponseError
from boto.exception import EC2ResponseError, JSONResponseError
import sure # noqa
@ -18,6 +18,13 @@ def test_ami_create_and_delete():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
with assert_raises(JSONResponseError) as ex:
image_id = conn.create_image(instance.id, "test-ami", "this is a test ami", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateImage operation: Request would have succeeded, but DryRun flag is set')
image_id = conn.create_image(instance.id, "test-ami", "this is a test ami")
all_images = conn.get_all_images()
@ -45,6 +52,12 @@ def test_ami_create_and_delete():
snapshot.volume_id.should.equal(volume.id)
# Deregister
with assert_raises(JSONResponseError) as ex:
success = conn.deregister_image(image_id, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the DeregisterImage operation: Request would have succeeded, but DryRun flag is set')
success = conn.deregister_image(image_id)
success.should.be.true
@ -67,6 +80,12 @@ def test_ami_copy():
source_image = conn.get_all_images(image_ids=[source_image_id])[0]
# Boto returns a 'CopyImage' object with an image_id attribute here. Use the image_id to fetch the full info.
with assert_raises(JSONResponseError) as ex:
copy_image_ref = conn.copy_image(source_image.region.name, source_image.id, "test-copy-ami", "this is a test copy ami", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CopyImage operation: Request would have succeeded, but DryRun flag is set')
copy_image_ref = conn.copy_image(source_image.region.name, source_image.id, "test-copy-ami", "this is a test copy ami")
copy_image_id = copy_image_ref.image_id
copy_image = conn.get_all_images(image_ids=[copy_image_id])[0]
@ -108,6 +127,12 @@ def test_ami_tagging():
conn.create_image(instance.id, "test-ami", "this is a test ami")
image = conn.get_all_images()[0]
with assert_raises(JSONResponseError) as ex:
image.add_tag("a key", "some value", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set')
image.add_tag("a key", "some value")
tag = conn.get_all_tags()[0]
@ -264,6 +289,12 @@ def test_ami_attribute_group_permissions():
'groups': 'all'}
# Add 'all' group and confirm
with assert_raises(JSONResponseError) as ex:
conn.modify_image_attribute(**dict(ADD_GROUP_ARGS, **{'dry_run': True}))
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the ModifyImageAttribute operation: Request would have succeeded, but DryRun flag is set')
conn.modify_image_attribute(**ADD_GROUP_ARGS)
attributes = conn.get_image_attribute(image.id, attribute='launchPermission')

View File

@ -5,7 +5,7 @@ from nose.tools import assert_raises
from moto.ec2 import ec2_backends
import boto
from boto.exception import EC2ResponseError
from boto.exception import EC2ResponseError, JSONResponseError
import sure # noqa
from moto import mock_ec2
@ -23,6 +23,13 @@ def test_create_and_delete_volume():
all_volumes[0].encrypted.should.be(False)
volume = all_volumes[0]
with assert_raises(JSONResponseError) as ex:
volume.delete(dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the DeleteVolume operation: Request would have succeeded, but DryRun flag is set')
volume.delete()
conn.get_all_volumes().should.have.length_of(0)
@ -35,11 +42,28 @@ def test_create_and_delete_volume():
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_create_encrypted_volume_dryrun():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(JSONResponseError) as ex:
conn.create_volume(80, "us-east-1a", encrypted=True, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateVolume operation: Request would have succeeded, but DryRun flag is set')
@mock_ec2
def test_create_encrypted_volume():
conn = boto.connect_ec2('the_key', 'the_secret')
conn.create_volume(80, "us-east-1a", encrypted=True)
with assert_raises(JSONResponseError) as ex:
conn.create_volume(80, "us-east-1a", encrypted=True, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateVolume operation: Request would have succeeded, but DryRun flag is set')
all_volumes = conn.get_all_volumes()
all_volumes[0].encrypted.should.be(True)
@ -141,6 +165,12 @@ def test_volume_attach_and_detach():
volume.update()
volume.volume_state().should.equal('available')
with assert_raises(JSONResponseError) as ex:
volume.attach(instance.id, "/dev/sdh", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the AttachVolume operation: Request would have succeeded, but DryRun flag is set')
volume.attach(instance.id, "/dev/sdh")
volume.update()
@ -149,6 +179,12 @@ def test_volume_attach_and_detach():
volume.attach_data.instance_id.should.equal(instance.id)
with assert_raises(JSONResponseError) as ex:
volume.detach(dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the DetachVolume operation: Request would have succeeded, but DryRun flag is set')
volume.detach()
volume.update()
@ -178,6 +214,12 @@ def test_create_snapshot():
conn = boto.connect_ec2('the_key', 'the_secret')
volume = conn.create_volume(80, "us-east-1a")
with assert_raises(JSONResponseError) as ex:
snapshot = volume.create_snapshot('a dryrun snapshot', dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateSnapshot operation: Request would have succeeded, but DryRun flag is set')
snapshot = volume.create_snapshot('a test snapshot')
snapshot.update()
snapshot.status.should.equal('completed')
@ -282,6 +324,8 @@ def test_snapshot_filters():
@mock_ec2
def test_snapshot_attribute():
import copy
conn = boto.connect_ec2('the_key', 'the_secret')
volume = conn.create_volume(80, "us-east-1a")
snapshot = volume.create_snapshot()
@ -302,6 +346,13 @@ def test_snapshot_attribute():
'groups': 'all'}
# Add 'all' group and confirm
with assert_raises(JSONResponseError) as ex:
conn.modify_snapshot_attribute(**dict(ADD_GROUP_ARGS, **{'dry_run': True}))
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the ModifySnapshotAttribute operation: Request would have succeeded, but DryRun flag is set')
conn.modify_snapshot_attribute(**ADD_GROUP_ARGS)
attributes = conn.get_snapshot_attribute(snapshot.id, attribute='createVolumePermission')
@ -312,6 +363,12 @@ def test_snapshot_attribute():
conn.modify_snapshot_attribute.when.called_with(**ADD_GROUP_ARGS).should_not.throw(EC2ResponseError)
# Remove 'all' group and confirm
with assert_raises(JSONResponseError) as ex:
conn.modify_snapshot_attribute(**dict(REMOVE_GROUP_ARGS, **{'dry_run': True}))
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the ModifySnapshotAttribute operation: Request would have succeeded, but DryRun flag is set')
conn.modify_snapshot_attribute(**REMOVE_GROUP_ARGS)
attributes = conn.get_snapshot_attribute(snapshot.id, attribute='createVolumePermission')
@ -365,6 +422,13 @@ def test_snapshot_attribute():
def test_create_volume_from_snapshot():
conn = boto.connect_ec2('the_key', 'the_secret')
volume = conn.create_volume(80, "us-east-1a")
snapshot = volume.create_snapshot('a test snapshot')
with assert_raises(JSONResponseError) as ex:
snapshot = volume.create_snapshot('a test snapshot', dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateSnapshot operation: Request would have succeeded, but DryRun flag is set')
snapshot = volume.create_snapshot('a test snapshot')
snapshot.update()
@ -404,6 +468,12 @@ def test_modify_attribute_blockDeviceMapping():
instance = reservation.instances[0]
with assert_raises(JSONResponseError) as ex:
instance.modify_attribute('blockDeviceMapping', {'/dev/sda1': True}, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the ModifyInstanceAttribute operation: Request would have succeeded, but DryRun flag is set')
instance.modify_attribute('blockDeviceMapping', {'/dev/sda1': True})
instance = ec2_backends[conn.region.name].get_instance(instance.id)
@ -416,6 +486,14 @@ def test_volume_tag_escaping():
conn = boto.connect_ec2('the_key', 'the_secret')
vol = conn.create_volume(10, 'us-east-1a')
snapshot = conn.create_snapshot(vol.id, 'Desc')
with assert_raises(JSONResponseError) as ex:
snapshot.add_tags({'key': '</closed>'}, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set')
dict(conn.get_all_snapshots()[0].tags).should_not.be.equal({'key': '</closed>'})
snapshot.add_tags({'key': '</closed>'})
dict(conn.get_all_snapshots()[0].tags).should.equal({'key': '</closed>'})

View File

@ -4,7 +4,7 @@ import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
from boto.exception import EC2ResponseError
from boto.exception import EC2ResponseError, JSONResponseError
import six
import sure # noqa
@ -19,11 +19,24 @@ def test_eip_allocate_classic():
"""Allocate/release Classic EIP"""
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(JSONResponseError) as ex:
standard = conn.allocate_address(dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the AllocateAddress operation: Request would have succeeded, but DryRun flag is set')
standard = conn.allocate_address()
standard.should.be.a(boto.ec2.address.Address)
standard.public_ip.should.be.a(six.text_type)
standard.instance_id.should.be.none
standard.domain.should.be.equal("standard")
with assert_raises(JSONResponseError) as ex:
standard.release(dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the ReleaseAddress operation: Request would have succeeded, but DryRun flag is set')
standard.release()
standard.should_not.be.within(conn.get_all_addresses())
@ -33,6 +46,12 @@ def test_eip_allocate_vpc():
"""Allocate/release VPC EIP"""
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(JSONResponseError) as ex:
vpc = conn.allocate_address(domain="vpc", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the AllocateAddress operation: Request would have succeeded, but DryRun flag is set')
vpc = conn.allocate_address(domain="vpc")
vpc.should.be.a(boto.ec2.address.Address)
vpc.domain.should.be.equal("vpc")
@ -69,9 +88,22 @@ def test_eip_associate_classic():
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
with assert_raises(JSONResponseError) as ex:
conn.associate_address(instance_id=instance.id, public_ip=eip.public_ip, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the AssociateAddress operation: Request would have succeeded, but DryRun flag is set')
conn.associate_address(instance_id=instance.id, public_ip=eip.public_ip)
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
eip.instance_id.should.be.equal(instance.id)
with assert_raises(JSONResponseError) as ex:
conn.disassociate_address(public_ip=eip.public_ip, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the DisAssociateAddress operation: Request would have succeeded, but DryRun flag is set')
conn.disassociate_address(public_ip=eip.public_ip)
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
eip.instance_id.should.be.equal(u'')
@ -105,6 +137,13 @@ def test_eip_associate_vpc():
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
eip.instance_id.should.be.equal(u'')
eip.association_id.should.be.none
with assert_raises(JSONResponseError) as ex:
eip.release(dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the ReleaseAddress operation: Request would have succeeded, but DryRun flag is set')
eip.release()
eip = None
@ -130,6 +169,7 @@ def test_eip_associate_network_interface():
conn.associate_address(network_interface_id=eni.id, allocation_id=eip.allocation_id)
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
eip.network_interface_id.should.be.equal(eni.id)
conn.disassociate_address(association_id=eip.association_id)
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
eip.network_interface_id.should.be.equal(u'')

View File

@ -7,7 +7,7 @@ import boto3
import boto
import boto.cloudformation
import boto.ec2
from boto.exception import EC2ResponseError
from boto.exception import EC2ResponseError, JSONResponseError
import sure # noqa
from moto import mock_ec2, mock_cloudformation
@ -21,6 +21,13 @@ def test_elastic_network_interfaces():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
with assert_raises(JSONResponseError) as ex:
eni = conn.create_network_interface(subnet.id, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateNetworkInterface operation: Request would have succeeded, but DryRun flag is set')
eni = conn.create_network_interface(subnet.id)
all_enis = conn.get_all_network_interfaces()
@ -29,6 +36,12 @@ def test_elastic_network_interfaces():
eni.groups.should.have.length_of(0)
eni.private_ip_addresses.should.have.length_of(0)
with assert_raises(JSONResponseError) as ex:
conn.delete_network_interface(eni.id, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the DeleteNetworkInterface operation: Request would have succeeded, but DryRun flag is set')
conn.delete_network_interface(eni.id)
all_enis = conn.get_all_network_interfaces()
@ -104,6 +117,12 @@ def test_elastic_network_interfaces_modify_attribute():
eni.groups.should.have.length_of(1)
eni.groups[0].id.should.equal(security_group1.id)
with assert_raises(JSONResponseError) as ex:
conn.modify_network_interface_attribute(eni.id, 'groupset', [security_group2.id], dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the ModifyNetworkInterface operation: Request would have succeeded, but DryRun flag is set')
conn.modify_network_interface_attribute(eni.id, 'groupset', [security_group2.id])
all_enis = conn.get_all_network_interfaces()
@ -163,6 +182,13 @@ def test_elastic_network_interfaces_get_by_tag_name():
subnet = ec2.create_subnet(VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-2a')
eni1 = ec2.create_network_interface(SubnetId=subnet.id, PrivateIpAddress='10.0.10.5')
with assert_raises(JSONResponseError) as ex:
eni1.create_tags(Tags=[{'Key': 'Name', 'Value': 'eni1'}], DryRun=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set')
eni1.create_tags(Tags=[{'Key': 'Name', 'Value': 'eni1'}])
# The status of the new interface should be 'available'

View File

@ -8,7 +8,7 @@ import datetime
import boto
from boto.ec2.instance import Reservation, InstanceAttribute
from boto.exception import EC2ResponseError
from boto.exception import EC2ResponseError, JSONResponseError
from freezegun import freeze_time
import sure # noqa
@ -40,6 +40,13 @@ def test_add_servers():
@mock_ec2
def test_instance_launch_and_terminate():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(JSONResponseError) as ex:
reservation = conn.run_instances('ami-1234abcd', dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the RunInstance operation: Request would have succeeded, but DryRun flag is set')
reservation = conn.run_instances('ami-1234abcd')
reservation.should.be.a(Reservation)
reservation.instances.should.have.length_of(1)
@ -67,6 +74,12 @@ def test_instance_launch_and_terminate():
volume.attach_data.instance_id.should.equal(instance.id)
volume.status.should.equal('in-use')
with assert_raises(JSONResponseError) as ex:
conn.terminate_instances([instance.id], dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the TerminateInstance operation: Request would have succeeded, but DryRun flag is set')
conn.terminate_instances([instance.id])
reservations = conn.get_all_instances()
@ -405,11 +418,24 @@ def test_instance_start_and_stop():
instances.should.have.length_of(2)
instance_ids = [instance.id for instance in instances]
with assert_raises(JSONResponseError) as ex:
stopped_instances = conn.stop_instances(instance_ids, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the StopInstance operation: Request would have succeeded, but DryRun flag is set')
stopped_instances = conn.stop_instances(instance_ids)
for instance in stopped_instances:
instance.state.should.equal('stopping')
with assert_raises(JSONResponseError) as ex:
started_instances = conn.start_instances([instances[0].id], dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the StartInstance operation: Request would have succeeded, but DryRun flag is set')
started_instances = conn.start_instances([instances[0].id])
started_instances[0].state.should.equal('pending')
@ -419,6 +445,13 @@ def test_instance_reboot():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
with assert_raises(JSONResponseError) as ex:
instance.reboot(dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the RebootInstance operation: Request would have succeeded, but DryRun flag is set')
instance.reboot()
instance.state.should.equal('pending')
@ -429,6 +462,12 @@ def test_instance_attribute_instance_type():
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
with assert_raises(JSONResponseError) as ex:
instance.modify_attribute("instanceType", "m1.small", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the ModifyInstanceType operation: Request would have succeeded, but DryRun flag is set')
instance.modify_attribute("instanceType", "m1.small")
instance_attribute = instance.get_attribute("instanceType")
@ -443,6 +482,13 @@ def test_modify_instance_attribute_security_groups():
sg_id = 'sg-1234abcd'
sg_id2 = 'sg-abcd4321'
with assert_raises(JSONResponseError) as ex:
instance.modify_attribute("groupSet", [sg_id, sg_id2], dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the ModifyInstanceSecurityGroups operation: Request would have succeeded, but DryRun flag is set')
instance.modify_attribute("groupSet", [sg_id, sg_id2])
instance_attribute = instance.get_attribute("groupSet")
@ -458,6 +504,12 @@ def test_instance_attribute_user_data():
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
with assert_raises(JSONResponseError) as ex:
instance.modify_attribute("userData", "this is my user data", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the ModifyUserData operation: Request would have succeeded, but DryRun flag is set')
instance.modify_attribute("userData", "this is my user data")
instance_attribute = instance.get_attribute("userData")
@ -479,6 +531,13 @@ def test_instance_attribute_source_dest_check():
instance_attribute.get("sourceDestCheck").should.equal(True)
# Set to false (note: Boto converts bool to string, eg 'false')
with assert_raises(JSONResponseError) as ex:
instance.modify_attribute("sourceDestCheck", False, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the ModifySourceDestCheck operation: Request would have succeeded, but DryRun flag is set')
instance.modify_attribute("sourceDestCheck", False)
instance.update()
@ -516,6 +575,13 @@ def test_user_data_with_run_instance():
@mock_ec2
def test_run_instance_with_security_group_name():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(JSONResponseError) as ex:
group = conn.create_security_group('group1', "some description", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateSecurityGroup operation: Request would have succeeded, but DryRun flag is set')
group = conn.create_security_group('group1', "some description")
reservation = conn.run_instances('ami-1234abcd',
@ -671,6 +737,12 @@ def test_instance_with_nic_attach_detach():
set([group.id for group in eni.groups]).should.equal(set([security_group2.id]))
# Attach
with assert_raises(JSONResponseError) as ex:
conn.attach_network_interface(eni.id, instance.id, device_index=1, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the AttachNetworkInterface operation: Request would have succeeded, but DryRun flag is set')
conn.attach_network_interface(eni.id, instance.id, device_index=1)
# Check attached instance and ENI data
@ -686,6 +758,12 @@ def test_instance_with_nic_attach_detach():
set([group.id for group in eni.groups]).should.equal(set([security_group1.id,security_group2.id]))
# Detach
with assert_raises(JSONResponseError) as ex:
conn.detach_network_interface(instance_eni.attachment.id, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the DetachNetworkInterface operation: Request would have succeeded, but DryRun flag is set')
conn.detach_network_interface(instance_eni.attachment.id)
# Check detached instance and ENI data
@ -799,6 +877,13 @@ def test_get_instance_by_security_group():
instance = conn.get_only_instances()[0]
security_group = conn.create_security_group('test', 'test')
with assert_raises(JSONResponseError) as ex:
conn.modify_instance_attribute(instance.id, "groupSet", [security_group.id], dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the ModifyInstanceSecurityGroups operation: Request would have succeeded, but DryRun flag is set')
conn.modify_instance_attribute(instance.id, "groupSet", [security_group.id])
security_group_instances = security_group.instances()

View File

@ -6,7 +6,7 @@ from nose.tools import assert_raises
import re
import boto
from boto.exception import EC2ResponseError
from boto.exception import EC2ResponseError, JSONResponseError
import sure # noqa
@ -23,6 +23,13 @@ def test_igw_create():
conn = boto.connect_vpc('the_key', 'the_secret')
conn.get_all_internet_gateways().should.have.length_of(0)
with assert_raises(JSONResponseError) as ex:
igw = conn.create_internet_gateway(dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateInternetGateway operation: Request would have succeeded, but DryRun flag is set')
igw = conn.create_internet_gateway()
conn.get_all_internet_gateways().should.have.length_of(1)
igw.id.should.match(r'igw-[0-9a-f]+')
@ -36,6 +43,13 @@ def test_igw_attach():
conn = boto.connect_vpc('the_key', 'the_secret')
igw = conn.create_internet_gateway()
vpc = conn.create_vpc(VPC_CIDR)
with assert_raises(JSONResponseError) as ex:
conn.attach_internet_gateway(igw.id, vpc.id, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the AttachInternetGateway operation: Request would have succeeded, but DryRun flag is set')
conn.attach_internet_gateway(igw.id, vpc.id)
igw = conn.get_all_internet_gateways()[0]
@ -75,6 +89,13 @@ def test_igw_detach():
igw = conn.create_internet_gateway()
vpc = conn.create_vpc(VPC_CIDR)
conn.attach_internet_gateway(igw.id, vpc.id)
with assert_raises(JSONResponseError) as ex:
conn.detach_internet_gateway(igw.id, vpc.id, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the DetachInternetGateway operation: Request would have succeeded, but DryRun flag is set')
conn.detach_internet_gateway(igw.id, vpc.id)
igw = conn.get_all_internet_gateways()[0]
igw.attachments.should.have.length_of(0)
@ -129,6 +150,13 @@ def test_igw_delete():
conn.get_all_internet_gateways().should.have.length_of(0)
igw = conn.create_internet_gateway()
conn.get_all_internet_gateways().should.have.length_of(1)
with assert_raises(JSONResponseError) as ex:
conn.delete_internet_gateway(igw.id, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the DeleteInternetGateway operation: Request would have succeeded, but DryRun flag is set')
conn.delete_internet_gateway(igw.id)
conn.get_all_internet_gateways().should.have.length_of(0)

View File

@ -7,7 +7,7 @@ import boto
import six
import sure # noqa
from boto.exception import EC2ResponseError
from boto.exception import EC2ResponseError, JSONResponseError
from moto import mock_ec2
@ -31,6 +31,13 @@ def test_key_pairs_invalid_id():
@mock_ec2
def test_key_pairs_create():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(JSONResponseError) as ex:
kp = conn.create_key_pair('foo', dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateKeyPair operation: Request would have succeeded, but DryRun flag is set')
kp = conn.create_key_pair('foo')
assert kp.material.startswith('---- BEGIN RSA PRIVATE KEY ----')
kps = conn.get_all_key_pairs()
@ -82,6 +89,13 @@ def test_key_pairs_delete_no_exist():
def test_key_pairs_delete_exist():
conn = boto.connect_ec2('the_key', 'the_secret')
conn.create_key_pair('foo')
with assert_raises(JSONResponseError) as ex:
r = conn.delete_key_pair('foo', dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the DeleteKeyPair operation: Request would have succeeded, but DryRun flag is set')
r = conn.delete_key_pair('foo')
r.should.be.ok
assert len(conn.get_all_key_pairs()) == 0
@ -90,6 +104,13 @@ def test_key_pairs_delete_exist():
@mock_ec2
def test_key_pairs_import():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(JSONResponseError) as ex:
kp = conn.import_key_pair('foo', b'content', dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the ImportKeyPair operation: Request would have succeeded, but DryRun flag is set')
kp = conn.import_key_pair('foo', b'content')
assert kp.name == 'foo'
kps = conn.get_all_key_pairs()

View File

@ -5,7 +5,7 @@ from nose.tools import assert_raises
import boto3
import boto
from boto.exception import EC2ResponseError
from boto.exception import EC2ResponseError, JSONResponseError
import sure # noqa
from moto import mock_ec2
@ -14,6 +14,13 @@ from moto import mock_ec2
@mock_ec2
def test_create_and_describe_security_group():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(JSONResponseError) as ex:
security_group = conn.create_security_group('test security group', 'this is a test security group', dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateSecurityGroup operation: Request would have succeeded, but DryRun flag is set')
security_group = conn.create_security_group('test security group', 'this is a test security group')
security_group.name.should.equal('test security group')
@ -110,6 +117,12 @@ def test_deleting_security_groups():
cm.exception.request_id.should_not.be.none
# Delete by name
with assert_raises(JSONResponseError) as ex:
conn.delete_security_group('test2', dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the DeleteSecurityGroup operation: Request would have succeeded, but DryRun flag is set')
conn.delete_security_group('test2')
conn.get_all_security_groups().should.have.length_of(2)
@ -133,6 +146,12 @@ def test_authorize_ip_range_and_revoke():
conn = boto.connect_ec2('the_key', 'the_secret')
security_group = conn.create_security_group('test', 'test')
with assert_raises(JSONResponseError) as ex:
success = security_group.authorize(ip_protocol="tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.123/32", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the GrantSecurityGroupIngress operation: Request would have succeeded, but DryRun flag is set')
success = security_group.authorize(ip_protocol="tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.123/32")
assert success.should.be.true
@ -148,6 +167,12 @@ def test_authorize_ip_range_and_revoke():
cm.exception.request_id.should_not.be.none
# Actually revoke
with assert_raises(JSONResponseError) as ex:
security_group.revoke(ip_protocol="tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.123/32", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the RevokeSecurityGroupIngress operation: Request would have succeeded, but DryRun flag is set')
security_group.revoke(ip_protocol="tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.123/32")
security_group = conn.get_all_security_groups()[0]
@ -155,6 +180,13 @@ def test_authorize_ip_range_and_revoke():
# Test for egress as well
egress_security_group = conn.create_security_group('testegress', 'testegress', vpc_id='vpc-3432589')
with assert_raises(JSONResponseError) as ex:
success = conn.authorize_security_group_egress(egress_security_group.id, "tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.123/32", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the GrantSecurityGroupEgress operation: Request would have succeeded, but DryRun flag is set')
success = conn.authorize_security_group_egress(egress_security_group.id, "tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.123/32")
assert success.should.be.true
egress_security_group = conn.get_all_security_groups(groupnames='testegress')[0]
@ -167,6 +199,12 @@ def test_authorize_ip_range_and_revoke():
egress_security_group.revoke.when.called_with(ip_protocol="tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.122/32").should.throw(EC2ResponseError)
# Actually revoke
with assert_raises(JSONResponseError) as ex:
conn.revoke_security_group_egress(egress_security_group.id, "tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.123/32", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the RevokeSecurityGroupEgress operation: Request would have succeeded, but DryRun flag is set')
conn.revoke_security_group_egress(egress_security_group.id, "tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.123/32")
egress_security_group = conn.get_all_security_groups()[0]
@ -293,7 +331,15 @@ def test_authorize_bad_cidr_throws_invalid_parameter_value():
def test_security_group_tagging():
conn = boto.connect_vpc()
vpc = conn.create_vpc("10.0.0.0/16")
sg = conn.create_security_group("test-sg", "Test SG", vpc.id)
with assert_raises(JSONResponseError) as ex:
sg.add_tag("Test", "Tag", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set')
sg.add_tag("Test", "Tag")
tag = conn.get_all_tags()[0]
@ -336,7 +382,15 @@ Boto3
@mock_ec2
def test_security_group_tagging_boto3():
conn = boto3.client('ec2', region_name='us-east-1')
sg = conn.create_security_group(GroupName="test-sg", Description="Test SG")
with assert_raises(JSONResponseError) as ex:
conn.create_tags(Resources=[sg['GroupId']], Tags=[{'Key': 'Test', 'Value': 'Tag'}], DryRun=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set')
conn.create_tags(Resources=[sg['GroupId']], Tags=[{'Key': 'Test', 'Value': 'Tag'}])
describe = conn.describe_security_groups(Filters=[{'Name': 'tag-value', 'Values': ['Tag']}])
tag = describe["SecurityGroups"][0]['Tags'][0]

View File

@ -1,8 +1,10 @@
from __future__ import unicode_literals
from nose.tools import assert_raises
import datetime
import boto
import sure # noqa
from boto.exception import JSONResponseError
from moto import mock_ec2
from moto.backends import get_model
@ -19,6 +21,20 @@ def test_request_spot_instances():
start = iso_8601_datetime_with_milliseconds(datetime.datetime(2013, 1, 1))
end = iso_8601_datetime_with_milliseconds(datetime.datetime(2013, 1, 2))
with assert_raises(JSONResponseError) as ex:
request = conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234', count=1, type='one-time',
valid_from=start, valid_until=end, launch_group="the-group",
availability_zone_group='my-group', key_name="test",
security_groups=['group1', 'group2'], user_data=b"some test data",
instance_type='m1.small', placement='us-east-1c',
kernel_id="test-kernel", ramdisk_id="test-ramdisk",
monitoring_enabled=True, subnet_id="subnet123", dry_run=True
)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the RequestSpotInstance operation: Request would have succeeded, but DryRun flag is set')
request = conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234', count=1, type='one-time',
valid_from=start, valid_until=end, launch_group="the-group",
@ -95,6 +111,13 @@ def test_cancel_spot_instance_request():
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(1)
with assert_raises(JSONResponseError) as ex:
conn.cancel_spot_instance_requests([requests[0].id], dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CancelSpotInstance operation: Request would have succeeded, but DryRun flag is set')
conn.cancel_spot_instance_requests([requests[0].id])
requests = conn.get_all_spot_instance_requests()
@ -195,3 +218,4 @@ def test_request_spot_instances_setting_instance_id():
request = conn.get_all_spot_instance_requests()[0]
assert request.state == 'active'
assert request.instance_id == 'i-12345678'

View File

@ -1,8 +1,10 @@
from __future__ import unicode_literals
from nose.tools import assert_raises
import itertools
import boto
from boto.exception import EC2ResponseError
from boto.exception import EC2ResponseError, JSONResponseError
from boto.ec2.instance import Reservation
import sure # noqa
@ -16,6 +18,12 @@ def test_add_tag():
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
with assert_raises(JSONResponseError) as ex:
instance.add_tag("a key", "some value", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set')
instance.add_tag("a key", "some value")
chain = itertools.chain.from_iterable
existing_instances = list(chain([res.instances for res in conn.get_all_instances()]))
@ -37,6 +45,12 @@ def test_remove_tag():
tag.name.should.equal("a key")
tag.value.should.equal("some value")
with assert_raises(JSONResponseError) as ex:
instance.remove_tag("a key", dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the DeleteTags operation: Request would have succeeded, but DryRun flag is set')
instance.remove_tag("a key")
conn.get_all_tags().should.have.length_of(0)
@ -82,6 +96,12 @@ def test_create_tags():
'another key': 'some other value',
'blank key': ''}
with assert_raises(JSONResponseError) as ex:
conn.create_tags(instance.id, tag_dict, dry_run=True)
ex.exception.reason.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal('An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set')
conn.create_tags(instance.id, tag_dict)
tags = conn.get_all_tags()
set([key for key in tag_dict]).should.equal(set([tag.name for tag in tags]))