Replaced direct querystring access with inherited _get_param

This commit is contained in:
Nuwan Goonasekera 2017-09-16 18:31:30 +05:30
parent 17d62d9266
commit 7ed1036ba8
18 changed files with 215 additions and 262 deletions

View File

@ -7,10 +7,7 @@ class AmisResponse(BaseResponse):
def create_image(self):
name = self.querystring.get('Name')[0]
if "Description" in self.querystring:
description = self.querystring.get('Description')[0]
else:
description = ""
description = self._get_param('Description', if_none='')
instance_id = self._get_param('InstanceId')
if self.is_not_dryrun('CreateImage'):
image = self.ec2_backend.create_image(
@ -19,12 +16,10 @@ class AmisResponse(BaseResponse):
return template.render(image=image)
def copy_image(self):
source_image_id = self.querystring.get('SourceImageId')[0]
source_region = self.querystring.get('SourceRegion')[0]
name = self.querystring.get(
'Name')[0] if self.querystring.get('Name') else None
description = self.querystring.get(
'Description')[0] if self.querystring.get('Description') else None
source_image_id = self._get_param('SourceImageId')
source_region = self._get_param('SourceRegion')
name = self._get_param('Name')
description = self._get_param('Description')
if self.is_not_dryrun('CopyImage'):
image = self.ec2_backend.copy_image(
source_image_id, source_region, name, description)
@ -32,7 +27,7 @@ class AmisResponse(BaseResponse):
return template.render(image=image)
def deregister_image(self):
ami_id = self.querystring.get('ImageId')[0]
ami_id = self._get_param('ImageId')
if self.is_not_dryrun('DeregisterImage'):
success = self.ec2_backend.deregister_image(ami_id)
template = self.response_template(DEREGISTER_IMAGE_RESPONSE)
@ -48,16 +43,16 @@ class AmisResponse(BaseResponse):
return template.render(images=images)
def describe_image_attribute(self):
ami_id = self.querystring.get('ImageId')[0]
ami_id = self._get_param('ImageId')
groups = self.ec2_backend.get_launch_permission_groups(ami_id)
users = self.ec2_backend.get_launch_permission_users(ami_id)
template = self.response_template(DESCRIBE_IMAGE_ATTRIBUTES_RESPONSE)
return template.render(ami_id=ami_id, groups=groups, users=users)
def modify_image_attribute(self):
ami_id = self.querystring.get('ImageId')[0]
operation_type = self.querystring.get('OperationType')[0]
group = self.querystring.get('UserGroup.1', [None])[0]
ami_id = self._get_param('ImageId')
operation_type = self._get_param('OperationType')
group = self._get_param('UserGroup.1')
user_ids = self._get_multi_param('UserId')
if self.is_not_dryrun('ModifyImageAttribute'):
if (operation_type == 'add'):

View File

@ -7,16 +7,16 @@ class CustomerGateways(BaseResponse):
def create_customer_gateway(self):
# raise NotImplementedError('CustomerGateways(AmazonVPC).create_customer_gateway is not yet implemented')
type = self.querystring.get('Type', None)[0]
ip_address = self.querystring.get('IpAddress', None)[0]
bgp_asn = self.querystring.get('BgpAsn', None)[0]
type = self._get_param('Type')
ip_address = self._get_param('IpAddress')
bgp_asn = self._get_param('BgpAsn')
customer_gateway = self.ec2_backend.create_customer_gateway(
type, ip_address=ip_address, bgp_asn=bgp_asn)
template = self.response_template(CREATE_CUSTOMER_GATEWAY_RESPONSE)
return template.render(customer_gateway=customer_gateway)
def delete_customer_gateway(self):
customer_gateway_id = self.querystring.get('CustomerGatewayId')[0]
customer_gateway_id = self._get_param('CustomerGatewayId')
delete_status = self.ec2_backend.delete_customer_gateway(
customer_gateway_id)
template = self.response_template(DELETE_CUSTOMER_GATEWAY_RESPONSE)

View File

@ -8,8 +8,8 @@ from moto.ec2.utils import (
class DHCPOptions(BaseResponse):
def associate_dhcp_options(self):
dhcp_opt_id = self.querystring.get("DhcpOptionsId", [None])[0]
vpc_id = self.querystring.get("VpcId", [None])[0]
dhcp_opt_id = self._get_param('DhcpOptionsId')
vpc_id = self._get_param('VpcId')
dhcp_opt = self.ec2_backend.describe_dhcp_options([dhcp_opt_id])[0]
vpc = self.ec2_backend.get_vpc(vpc_id)
@ -42,7 +42,7 @@ class DHCPOptions(BaseResponse):
return template.render(dhcp_options_set=dhcp_options_set)
def delete_dhcp_options(self):
dhcp_opt_id = self.querystring.get("DhcpOptionsId", [None])[0]
dhcp_opt_id = self._get_param('DhcpOptionsId')
delete_status = self.ec2_backend.delete_dhcp_options_set(dhcp_opt_id)
template = self.response_template(DELETE_DHCP_OPTIONS_RESPONSE)
return template.render(delete_status=delete_status)

View File

@ -6,9 +6,9 @@ from moto.ec2.utils import filters_from_querystring
class ElasticBlockStore(BaseResponse):
def attach_volume(self):
volume_id = self.querystring.get('VolumeId')[0]
instance_id = self.querystring.get('InstanceId')[0]
device_path = self.querystring.get('Device')[0]
volume_id = self._get_param('VolumeId')
instance_id = self._get_param('InstanceId')
device_path = self._get_param('Device')
if self.is_not_dryrun('AttachVolume'):
attachment = self.ec2_backend.attach_volume(
volume_id, instance_id, device_path)
@ -21,18 +21,18 @@ class ElasticBlockStore(BaseResponse):
'ElasticBlockStore.copy_snapshot is not yet implemented')
def create_snapshot(self):
description = self.querystring.get('Description', [None])[0]
volume_id = self.querystring.get('VolumeId')[0]
volume_id = self._get_param('VolumeId')
description = self._get_param('Description')
if self.is_not_dryrun('CreateSnapshot'):
snapshot = self.ec2_backend.create_snapshot(volume_id, description)
template = self.response_template(CREATE_SNAPSHOT_RESPONSE)
return template.render(snapshot=snapshot)
def create_volume(self):
size = self.querystring.get('Size', [None])[0]
zone = self.querystring.get('AvailabilityZone', [None])[0]
snapshot_id = self.querystring.get('SnapshotId', [None])[0]
encrypted = self.querystring.get('Encrypted', ['false'])[0]
size = self._get_param('Size')
zone = self._get_param('AvailabilityZone')
snapshot_id = self._get_param('SnapshotId')
encrypted = self._get_param('Encrypted', if_none=False)
if self.is_not_dryrun('CreateVolume'):
volume = self.ec2_backend.create_volume(
size, zone, snapshot_id, encrypted)
@ -40,23 +40,20 @@ class ElasticBlockStore(BaseResponse):
return template.render(volume=volume)
def delete_snapshot(self):
snapshot_id = self.querystring.get('SnapshotId')[0]
snapshot_id = self._get_param('SnapshotId')
if self.is_not_dryrun('DeleteSnapshot'):
self.ec2_backend.delete_snapshot(snapshot_id)
return DELETE_SNAPSHOT_RESPONSE
def delete_volume(self):
volume_id = self.querystring.get('VolumeId')[0]
volume_id = self._get_param('VolumeId')
if self.is_not_dryrun('DeleteVolume'):
self.ec2_backend.delete_volume(volume_id)
return DELETE_VOLUME_RESPONSE
def describe_snapshots(self):
filters = filters_from_querystring(self.querystring)
# querystring for multiple snapshotids results in SnapshotId.1,
# SnapshotId.2 etc
snapshot_ids = ','.join(
[','.join(s[1]) for s in self.querystring.items() if 'SnapshotId' in s[0]])
snapshot_ids = self._get_multi_param('SnapshotId')
snapshots = self.ec2_backend.describe_snapshots(filters=filters)
# Describe snapshots to handle filter on snapshot_ids
snapshots = [
@ -66,10 +63,7 @@ class ElasticBlockStore(BaseResponse):
def describe_volumes(self):
filters = filters_from_querystring(self.querystring)
# querystring for multiple volumeids results in VolumeId.1, VolumeId.2
# etc
volume_ids = ','.join(
[','.join(v[1]) for v in self.querystring.items() if 'VolumeId' in v[0]])
volume_ids = self._get_multi_param('VolumeId')
volumes = self.ec2_backend.describe_volumes(filters=filters)
# Describe volumes to handle filter on volume_ids
volumes = [
@ -86,9 +80,9 @@ class ElasticBlockStore(BaseResponse):
'ElasticBlockStore.describe_volume_status is not yet implemented')
def detach_volume(self):
volume_id = self.querystring.get('VolumeId')[0]
instance_id = self.querystring.get('InstanceId')[0]
device_path = self.querystring.get('Device')[0]
volume_id = self._get_param('VolumeId')
instance_id = self._get_param('InstanceId')
device_path = self._get_param('Device')
if self.is_not_dryrun('DetachVolume'):
attachment = self.ec2_backend.detach_volume(
volume_id, instance_id, device_path)
@ -106,7 +100,7 @@ class ElasticBlockStore(BaseResponse):
'ElasticBlockStore.import_volume is not yet implemented')
def describe_snapshot_attribute(self):
snapshot_id = self.querystring.get('SnapshotId')[0]
snapshot_id = self._get_param('SnapshotId')
groups = self.ec2_backend.get_create_volume_permission_groups(
snapshot_id)
template = self.response_template(
@ -114,10 +108,10 @@ class ElasticBlockStore(BaseResponse):
return template.render(snapshot_id=snapshot_id, groups=groups)
def modify_snapshot_attribute(self):
snapshot_id = self.querystring.get('SnapshotId')[0]
operation_type = self.querystring.get('OperationType')[0]
group = self.querystring.get('UserGroup.1', [None])[0]
user_id = self.querystring.get('UserId.1', [None])[0]
snapshot_id = self._get_param('SnapshotId')
operation_type = self._get_param('OperationType')
group = self._get_param('UserGroup.1')
user_id = self._get_param('UserId.1')
if self.is_not_dryrun('ModifySnapshotAttribute'):
if (operation_type == 'add'):
self.ec2_backend.add_create_volume_permission(

View File

@ -6,10 +6,7 @@ from moto.ec2.utils import filters_from_querystring
class ElasticIPAddresses(BaseResponse):
def allocate_address(self):
if "Domain" in self.querystring:
domain = self.querystring.get('Domain')[0]
else:
domain = "standard"
domain = self._get_param('Domain', if_none='standard')
if self.is_not_dryrun('AllocateAddress'):
address = self.ec2_backend.allocate_address(domain)
template = self.response_template(ALLOCATE_ADDRESS_RESPONSE)
@ -20,26 +17,28 @@ class ElasticIPAddresses(BaseResponse):
if "InstanceId" in self.querystring:
instance = self.ec2_backend.get_instance(
self.querystring['InstanceId'][0])
self._get_param('InstanceId'))
elif "NetworkInterfaceId" in self.querystring:
eni = self.ec2_backend.get_network_interface(
self.querystring['NetworkInterfaceId'][0])
self._get_param('NetworkInterfaceId'))
else:
self.ec2_backend.raise_error(
"MissingParameter", "Invalid request, expect InstanceId/NetworkId parameter.")
reassociate = False
if "AllowReassociation" in self.querystring:
reassociate = self.querystring['AllowReassociation'][0] == "true"
reassociate = self._get_param('AllowReassociation') == "true"
if self.is_not_dryrun('AssociateAddress'):
if instance or eni:
if "PublicIp" in self.querystring:
eip = self.ec2_backend.associate_address(instance=instance, eni=eni, address=self.querystring[
'PublicIp'][0], reassociate=reassociate)
eip = self.ec2_backend.associate_address(
instance=instance, eni=eni,
address=self._get_param('PublicIp'), reassociate=reassociate)
elif "AllocationId" in self.querystring:
eip = self.ec2_backend.associate_address(instance=instance, eni=eni, allocation_id=self.querystring[
'AllocationId'][0], reassociate=reassociate)
eip = self.ec2_backend.associate_address(
instance=instance, eni=eni,
allocation_id=self._get_param('AllocationId'), reassociate=reassociate)
else:
self.ec2_backend.raise_error(
"MissingParameter", "Invalid request, expect PublicIp/AllocationId parameter.")
@ -63,10 +62,10 @@ class ElasticIPAddresses(BaseResponse):
if self.is_not_dryrun('DisAssociateAddress'):
if "PublicIp" in self.querystring:
self.ec2_backend.disassociate_address(
address=self.querystring['PublicIp'][0])
address=self._get_param('PublicIp'))
elif "AssociationId" in self.querystring:
self.ec2_backend.disassociate_address(
association_id=self.querystring['AssociationId'][0])
association_id=self._get_param('AssociationId'))
else:
self.ec2_backend.raise_error(
"MissingParameter", "Invalid request, expect PublicIp/AssociationId parameter.")
@ -77,10 +76,10 @@ class ElasticIPAddresses(BaseResponse):
if self.is_not_dryrun('ReleaseAddress'):
if "PublicIp" in self.querystring:
self.ec2_backend.release_address(
address=self.querystring['PublicIp'][0])
address=self._get_param('PublicIp'))
elif "AllocationId" in self.querystring:
self.ec2_backend.release_address(
allocation_id=self.querystring['AllocationId'][0])
allocation_id=self._get_param('AllocationId'))
else:
self.ec2_backend.raise_error(
"MissingParameter", "Invalid request, expect PublicIp/AllocationId parameter.")

View File

@ -6,9 +6,8 @@ from moto.ec2.utils import filters_from_querystring
class ElasticNetworkInterfaces(BaseResponse):
def create_network_interface(self):
subnet_id = self.querystring.get('SubnetId')[0]
private_ip_address = self.querystring.get(
'PrivateIpAddress', [None])[0]
subnet_id = self._get_param('SubnetId')
private_ip_address = self._get_param('PrivateIpAddress')
groups = self._get_multi_param('SecurityGroupId')
subnet = self.ec2_backend.get_subnet(subnet_id)
if self.is_not_dryrun('CreateNetworkInterface'):
@ -19,7 +18,7 @@ class ElasticNetworkInterfaces(BaseResponse):
return template.render(eni=eni)
def delete_network_interface(self):
eni_id = self.querystring.get('NetworkInterfaceId')[0]
eni_id = self._get_param('NetworkInterfaceId')
if self.is_not_dryrun('DeleteNetworkInterface'):
self.ec2_backend.delete_network_interface(eni_id)
template = self.response_template(
@ -38,9 +37,9 @@ class ElasticNetworkInterfaces(BaseResponse):
return template.render(enis=enis)
def attach_network_interface(self):
eni_id = self.querystring.get('NetworkInterfaceId')[0]
instance_id = self.querystring.get('InstanceId')[0]
device_index = self.querystring.get('DeviceIndex')[0]
eni_id = self._get_param('NetworkInterfaceId')
instance_id = self._get_param('InstanceId')
device_index = self._get_param('DeviceIndex')
if self.is_not_dryrun('AttachNetworkInterface'):
attachment_id = self.ec2_backend.attach_network_interface(
eni_id, instance_id, device_index)
@ -49,7 +48,7 @@ class ElasticNetworkInterfaces(BaseResponse):
return template.render(attachment_id=attachment_id)
def detach_network_interface(self):
attachment_id = self.querystring.get('AttachmentId')[0]
attachment_id = self._get_param('AttachmentId')
if self.is_not_dryrun('DetachNetworkInterface'):
self.ec2_backend.detach_network_interface(attachment_id)
template = self.response_template(
@ -58,8 +57,8 @@ class ElasticNetworkInterfaces(BaseResponse):
def modify_network_interface_attribute(self):
# Currently supports modifying one and only one security group
eni_id = self.querystring.get('NetworkInterfaceId')[0]
group_id = self.querystring.get('SecurityGroupId.1')[0]
eni_id = self._get_param('NetworkInterfaceId')
group_id = self._get_param('SecurityGroupId.1')
if self.is_not_dryrun('ModifyNetworkInterface'):
self.ec2_backend.modify_network_interface_attribute(
eni_id, group_id)

View File

@ -3,7 +3,7 @@ from boto.ec2.instancetype import InstanceType
from moto.core.responses import BaseResponse
from moto.core.utils import camelcase_to_underscores
from moto.ec2.utils import filters_from_querystring, \
dict_from_querystring, optional_from_querystring
dict_from_querystring
class InstanceResponse(BaseResponse):
@ -33,20 +33,18 @@ class InstanceResponse(BaseResponse):
return template.render(reservations=reservations_resp, next_token=next_token)
def run_instances(self):
min_count = int(self.querystring.get('MinCount', ['1'])[0])
image_id = self.querystring.get('ImageId')[0]
user_data = self.querystring.get('UserData')
min_count = int(self._get_param('MinCount', if_none='1'))
image_id = self._get_param('ImageId')
user_data = self._get_param('UserData')
security_group_names = self._get_multi_param('SecurityGroup')
security_group_ids = self._get_multi_param('SecurityGroupId')
nics = dict_from_querystring("NetworkInterface", self.querystring)
instance_type = self.querystring.get("InstanceType", ["m1.small"])[0]
placement = self.querystring.get(
"Placement.AvailabilityZone", [None])[0]
subnet_id = self.querystring.get("SubnetId", [None])[0]
private_ip = self.querystring.get("PrivateIpAddress", [None])[0]
associate_public_ip = self.querystring.get(
"AssociatePublicIpAddress", [None])[0]
key_name = self.querystring.get("KeyName", [None])[0]
instance_type = self._get_param('InstanceType', if_none='m1.small')
placement = self._get_param('Placement.AvailabilityZone')
subnet_id = self._get_param('SubnetId')
private_ip = self._get_param('PrivateIpAddress')
associate_public_ip = self._get_param('AssociatePublicIpAddress')
key_name = self._get_param('KeyName')
tags = self._parse_tag_specification("TagSpecification")
region_name = self.region
@ -91,8 +89,7 @@ class InstanceResponse(BaseResponse):
def describe_instance_status(self):
instance_ids = self._get_multi_param('InstanceId')
include_all_instances = optional_from_querystring('IncludeAllInstances',
self.querystring) == 'true'
include_all_instances = self._get_param('IncludeAllInstances') == 'true'
if instance_ids:
instances = self.ec2_backend.get_multi_instances_by_id(
@ -114,7 +111,7 @@ class InstanceResponse(BaseResponse):
def describe_instance_attribute(self):
# TODO this and modify below should raise IncorrectInstanceState if
# instance not in stopped state
attribute = self.querystring.get("Attribute")[0]
attribute = self._get_param('Attribute')
key = camelcase_to_underscores(attribute)
instance_id = self._get_param('InstanceId')
instance, value = self.ec2_backend.describe_instance_attribute(

View File

@ -8,8 +8,8 @@ from moto.ec2.utils import (
class InternetGateways(BaseResponse):
def attach_internet_gateway(self):
igw_id = self.querystring.get("InternetGatewayId", [None])[0]
vpc_id = self.querystring.get("VpcId", [None])[0]
igw_id = self._get_param('InternetGatewayId')
vpc_id = self._get_param('VpcId')
if self.is_not_dryrun('AttachInternetGateway'):
self.ec2_backend.attach_internet_gateway(igw_id, vpc_id)
template = self.response_template(ATTACH_INTERNET_GATEWAY_RESPONSE)
@ -22,7 +22,7 @@ class InternetGateways(BaseResponse):
return template.render(internet_gateway=igw)
def delete_internet_gateway(self):
igw_id = self.querystring.get("InternetGatewayId", [None])[0]
igw_id = self._get_param('InternetGatewayId')
if self.is_not_dryrun('DeleteInternetGateway'):
self.ec2_backend.delete_internet_gateway(igw_id)
template = self.response_template(DELETE_INTERNET_GATEWAY_RESPONSE)
@ -44,8 +44,8 @@ class InternetGateways(BaseResponse):
def detach_internet_gateway(self):
# TODO validate no instances with EIPs in VPC before detaching
# raise else DependencyViolationError()
igw_id = self.querystring.get("InternetGatewayId", [None])[0]
vpc_id = self.querystring.get("VpcId", [None])[0]
igw_id = self._get_param('InternetGatewayId')
vpc_id = self._get_param('VpcId')
if self.is_not_dryrun('DetachInternetGateway'):
self.ec2_backend.detach_internet_gateway(igw_id, vpc_id)
template = self.response_template(DETACH_INTERNET_GATEWAY_RESPONSE)

View File

@ -7,14 +7,14 @@ from moto.ec2.utils import filters_from_querystring
class KeyPairs(BaseResponse):
def create_key_pair(self):
name = self.querystring.get('KeyName')[0]
name = self._get_param('KeyName')
if self.is_not_dryrun('CreateKeyPair'):
keypair = self.ec2_backend.create_key_pair(name)
template = self.response_template(CREATE_KEY_PAIR_RESPONSE)
return template.render(keypair=keypair)
def delete_key_pair(self):
name = self.querystring.get('KeyName')[0]
name = self._get_param('KeyName')
if self.is_not_dryrun('DeleteKeyPair'):
success = six.text_type(
self.ec2_backend.delete_key_pair(name)).lower()
@ -28,8 +28,8 @@ class KeyPairs(BaseResponse):
return template.render(keypairs=keypairs)
def import_key_pair(self):
name = self.querystring.get('KeyName')[0]
material = self.querystring.get('PublicKeyMaterial')[0]
name = self._get_param('KeyName')
material = self._get_param('PublicKeyMaterial')
if self.is_not_dryrun('ImportKeyPair'):
keypair = self.ec2_backend.import_key_pair(name, material)
template = self.response_template(IMPORT_KEYPAIR_RESPONSE)

View File

@ -6,22 +6,22 @@ from moto.ec2.utils import filters_from_querystring
class NetworkACLs(BaseResponse):
def create_network_acl(self):
vpc_id = self.querystring.get('VpcId')[0]
vpc_id = self._get_param('VpcId')
network_acl = self.ec2_backend.create_network_acl(vpc_id)
template = self.response_template(CREATE_NETWORK_ACL_RESPONSE)
return template.render(network_acl=network_acl)
def create_network_acl_entry(self):
network_acl_id = self.querystring.get('NetworkAclId')[0]
rule_number = self.querystring.get('RuleNumber')[0]
protocol = self.querystring.get('Protocol')[0]
rule_action = self.querystring.get('RuleAction')[0]
egress = self.querystring.get('Egress')[0]
cidr_block = self.querystring.get('CidrBlock')[0]
icmp_code = self.querystring.get('Icmp.Code', [None])[0]
icmp_type = self.querystring.get('Icmp.Type', [None])[0]
port_range_from = self.querystring.get('PortRange.From')[0]
port_range_to = self.querystring.get('PortRange.To')[0]
network_acl_id = self._get_param('NetworkAclId')
rule_number = self._get_param('RuleNumber')
protocol = self._get_param('Protocol')
rule_action = self._get_param('RuleAction')
egress = self._get_param('Egress')
cidr_block = self._get_param('CidrBlock')
icmp_code = self._get_param('Icmp.Code')
icmp_type = self._get_param('Icmp.Type')
port_range_from = self._get_param('PortRange.From')
port_range_to = self._get_param('PortRange.To')
network_acl_entry = self.ec2_backend.create_network_acl_entry(
network_acl_id, rule_number, protocol, rule_action,
@ -32,30 +32,30 @@ class NetworkACLs(BaseResponse):
return template.render(network_acl_entry=network_acl_entry)
def delete_network_acl(self):
network_acl_id = self.querystring.get('NetworkAclId')[0]
network_acl_id = self._get_param('NetworkAclId')
self.ec2_backend.delete_network_acl(network_acl_id)
template = self.response_template(DELETE_NETWORK_ACL_ASSOCIATION)
return template.render()
def delete_network_acl_entry(self):
network_acl_id = self.querystring.get('NetworkAclId')[0]
rule_number = self.querystring.get('RuleNumber')[0]
egress = self.querystring.get('Egress')[0]
network_acl_id = self._get_param('NetworkAclId')
rule_number = self._get_param('RuleNumber')
egress = self._get_param('Egress')
self.ec2_backend.delete_network_acl_entry(network_acl_id, rule_number, egress)
template = self.response_template(DELETE_NETWORK_ACL_ENTRY_RESPONSE)
return template.render()
def replace_network_acl_entry(self):
network_acl_id = self.querystring.get('NetworkAclId')[0]
rule_number = self.querystring.get('RuleNumber')[0]
protocol = self.querystring.get('Protocol')[0]
rule_action = self.querystring.get('RuleAction')[0]
egress = self.querystring.get('Egress')[0]
cidr_block = self.querystring.get('CidrBlock')[0]
icmp_code = self.querystring.get('Icmp.Code', [None])[0]
icmp_type = self.querystring.get('Icmp.Type', [None])[0]
port_range_from = self.querystring.get('PortRange.From')[0]
port_range_to = self.querystring.get('PortRange.To')[0]
network_acl_id = self._get_param('NetworkAclId')
rule_number = self._get_param('RuleNumber')
protocol = self._get_param('Protocol')
rule_action = self._get_param('RuleAction')
egress = self._get_param('Egress')
cidr_block = self._get_param('CidrBlock')
icmp_code = self._get_param('Icmp.Code')
icmp_type = self._get_param('Icmp.Type')
port_range_from = self._get_param('PortRange.From')
port_range_to = self._get_param('PortRange.To')
self.ec2_backend.replace_network_acl_entry(
network_acl_id, rule_number, protocol, rule_action,
@ -74,8 +74,8 @@ class NetworkACLs(BaseResponse):
return template.render(network_acls=network_acls)
def replace_network_acl_association(self):
association_id = self.querystring.get('AssociationId')[0]
network_acl_id = self.querystring.get('NetworkAclId')[0]
association_id = self._get_param('AssociationId')
network_acl_id = self._get_param('NetworkAclId')
association = self.ec2_backend.replace_network_acl_association(
association_id,

View File

@ -1,29 +1,25 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from moto.ec2.utils import filters_from_querystring, optional_from_querystring
from moto.ec2.utils import filters_from_querystring
class RouteTables(BaseResponse):
def associate_route_table(self):
route_table_id = self.querystring.get('RouteTableId')[0]
subnet_id = self.querystring.get('SubnetId')[0]
route_table_id = self._get_param('RouteTableId')
subnet_id = self._get_param('SubnetId')
association_id = self.ec2_backend.associate_route_table(
route_table_id, subnet_id)
template = self.response_template(ASSOCIATE_ROUTE_TABLE_RESPONSE)
return template.render(association_id=association_id)
def create_route(self):
route_table_id = self.querystring.get('RouteTableId')[0]
destination_cidr_block = self.querystring.get(
'DestinationCidrBlock')[0]
gateway_id = optional_from_querystring('GatewayId', self.querystring)
instance_id = optional_from_querystring('InstanceId', self.querystring)
interface_id = optional_from_querystring(
'NetworkInterfaceId', self.querystring)
pcx_id = optional_from_querystring(
'VpcPeeringConnectionId', self.querystring)
route_table_id = self._get_param('RouteTableId')
destination_cidr_block = self._get_param('DestinationCidrBlock')
gateway_id = self._get_param('GatewayId')
instance_id = self._get_param('InstanceId')
interface_id = self._get_param('NetworkInterfaceId')
pcx_id = self._get_param('VpcPeeringConnectionId')
self.ec2_backend.create_route(route_table_id, destination_cidr_block,
gateway_id=gateway_id,
@ -35,21 +31,20 @@ class RouteTables(BaseResponse):
return template.render()
def create_route_table(self):
vpc_id = self.querystring.get('VpcId')[0]
vpc_id = self._get_param('VpcId')
route_table = self.ec2_backend.create_route_table(vpc_id)
template = self.response_template(CREATE_ROUTE_TABLE_RESPONSE)
return template.render(route_table=route_table)
def delete_route(self):
route_table_id = self.querystring.get('RouteTableId')[0]
destination_cidr_block = self.querystring.get(
'DestinationCidrBlock')[0]
route_table_id = self._get_param('RouteTableId')
destination_cidr_block = self._get_param('DestinationCidrBlock')
self.ec2_backend.delete_route(route_table_id, destination_cidr_block)
template = self.response_template(DELETE_ROUTE_RESPONSE)
return template.render()
def delete_route_table(self):
route_table_id = self.querystring.get('RouteTableId')[0]
route_table_id = self._get_param('RouteTableId')
self.ec2_backend.delete_route_table(route_table_id)
template = self.response_template(DELETE_ROUTE_TABLE_RESPONSE)
return template.render()
@ -63,22 +58,18 @@ class RouteTables(BaseResponse):
return template.render(route_tables=route_tables)
def disassociate_route_table(self):
association_id = self.querystring.get('AssociationId')[0]
association_id = self._get_param('AssociationId')
self.ec2_backend.disassociate_route_table(association_id)
template = self.response_template(DISASSOCIATE_ROUTE_TABLE_RESPONSE)
return template.render()
def replace_route(self):
route_table_id = self.querystring.get('RouteTableId')[0]
destination_cidr_block = self.querystring.get(
'DestinationCidrBlock')[0]
gateway_id = optional_from_querystring('GatewayId', self.querystring)
instance_id = optional_from_querystring('InstanceId', self.querystring)
interface_id = optional_from_querystring(
'NetworkInterfaceId', self.querystring)
pcx_id = optional_from_querystring(
'VpcPeeringConnectionId', self.querystring)
route_table_id = self._get_param('RouteTableId')
destination_cidr_block = self._get_param('DestinationCidrBlock')
gateway_id = self._get_param('GatewayId')
instance_id = self._get_param('InstanceId')
interface_id = self._get_param('NetworkInterfaceId')
pcx_id = self._get_param('VpcPeeringConnectionId')
self.ec2_backend.replace_route(route_table_id, destination_cidr_block,
gateway_id=gateway_id,
@ -90,8 +81,8 @@ class RouteTables(BaseResponse):
return template.render()
def replace_route_table_association(self):
route_table_id = self.querystring.get('RouteTableId')[0]
association_id = self.querystring.get('AssociationId')[0]
route_table_id = self._get_param('RouteTableId')
association_id = self._get_param('AssociationId')
new_association_id = self.ec2_backend.replace_route_table_association(
association_id, route_table_id)
template = self.response_template(

View File

@ -11,69 +11,66 @@ def try_parse_int(value, default=None):
return default
def process_rules_from_querystring(querystring):
try:
group_name_or_id = querystring.get('GroupName')[0]
except:
group_name_or_id = querystring.get('GroupId')[0]
querytree = {}
for key, value in querystring.items():
key_splitted = key.split('.')
key_splitted = [try_parse_int(e, e) for e in key_splitted]
d = querytree
for subkey in key_splitted[:-1]:
if subkey not in d:
d[subkey] = {}
d = d[subkey]
d[key_splitted[-1]] = value
ip_permissions = querytree.get('IpPermissions') or {}
for ip_permission_idx in sorted(ip_permissions.keys()):
ip_permission = ip_permissions[ip_permission_idx]
ip_protocol = ip_permission.get('IpProtocol', [None])[0]
from_port = ip_permission.get('FromPort', [None])[0]
to_port = ip_permission.get('ToPort', [None])[0]
ip_ranges = []
ip_ranges_tree = ip_permission.get('IpRanges') or {}
for ip_range_idx in sorted(ip_ranges_tree.keys()):
ip_ranges.append(ip_ranges_tree[ip_range_idx]['CidrIp'][0])
source_groups = []
source_group_ids = []
groups_tree = ip_permission.get('Groups') or {}
for group_idx in sorted(groups_tree.keys()):
group_dict = groups_tree[group_idx]
if 'GroupId' in group_dict:
source_group_ids.append(group_dict['GroupId'][0])
elif 'GroupName' in group_dict:
source_groups.append(group_dict['GroupName'][0])
yield (group_name_or_id, ip_protocol, from_port, to_port, ip_ranges,
source_groups, source_group_ids)
class SecurityGroups(BaseResponse):
def _process_rules_from_querystring(self):
group_name_or_id = (self._get_param('GroupName') or
self._get_param('GroupId'))
querytree = {}
for key, value in self.querystring.items():
key_splitted = key.split('.')
key_splitted = [try_parse_int(e, e) for e in key_splitted]
d = querytree
for subkey in key_splitted[:-1]:
if subkey not in d:
d[subkey] = {}
d = d[subkey]
d[key_splitted[-1]] = value
ip_permissions = querytree.get('IpPermissions') or {}
for ip_permission_idx in sorted(ip_permissions.keys()):
ip_permission = ip_permissions[ip_permission_idx]
ip_protocol = ip_permission.get('IpProtocol', [None])[0]
from_port = ip_permission.get('FromPort', [None])[0]
to_port = ip_permission.get('ToPort', [None])[0]
ip_ranges = []
ip_ranges_tree = ip_permission.get('IpRanges') or {}
for ip_range_idx in sorted(ip_ranges_tree.keys()):
ip_ranges.append(ip_ranges_tree[ip_range_idx]['CidrIp'][0])
source_groups = []
source_group_ids = []
groups_tree = ip_permission.get('Groups') or {}
for group_idx in sorted(groups_tree.keys()):
group_dict = groups_tree[group_idx]
if 'GroupId' in group_dict:
source_group_ids.append(group_dict['GroupId'][0])
elif 'GroupName' in group_dict:
source_groups.append(group_dict['GroupName'][0])
yield (group_name_or_id, ip_protocol, from_port, to_port, ip_ranges,
source_groups, source_group_ids)
def authorize_security_group_egress(self):
if self.is_not_dryrun('GrantSecurityGroupEgress'):
for args in process_rules_from_querystring(self.querystring):
for args in self._process_rules_from_querystring():
self.ec2_backend.authorize_security_group_egress(*args)
return AUTHORIZE_SECURITY_GROUP_EGRESS_RESPONSE
def authorize_security_group_ingress(self):
if self.is_not_dryrun('GrantSecurityGroupIngress'):
for args in process_rules_from_querystring(self.querystring):
for args in self._process_rules_from_querystring():
self.ec2_backend.authorize_security_group_ingress(*args)
return AUTHORIZE_SECURITY_GROUP_INGRESS_REPONSE
def create_security_group(self):
name = self.querystring.get('GroupName')[0]
description = self.querystring.get('GroupDescription', [None])[0]
vpc_id = self.querystring.get("VpcId", [None])[0]
name = self._get_param('GroupName')
description = self._get_param('GroupDescription')
vpc_id = self._get_param('VpcId')
if self.is_not_dryrun('CreateSecurityGroup'):
group = self.ec2_backend.create_security_group(
@ -86,14 +83,14 @@ class SecurityGroups(BaseResponse):
# See
# http://docs.aws.amazon.com/AWSEC2/latest/APIReference/ApiReference-query-DeleteSecurityGroup.html
name = self.querystring.get('GroupName')
sg_id = self.querystring.get('GroupId')
name = self._get_param('GroupName')
sg_id = self._get_param('GroupId')
if self.is_not_dryrun('DeleteSecurityGroup'):
if name:
self.ec2_backend.delete_security_group(name[0])
self.ec2_backend.delete_security_group(name)
elif sg_id:
self.ec2_backend.delete_security_group(group_id=sg_id[0])
self.ec2_backend.delete_security_group(group_id=sg_id)
return DELETE_GROUP_RESPONSE
@ -113,7 +110,7 @@ class SecurityGroups(BaseResponse):
def revoke_security_group_egress(self):
if self.is_not_dryrun('RevokeSecurityGroupEgress'):
for args in process_rules_from_querystring(self.querystring):
for args in self._process_rules_from_querystring():
success = self.ec2_backend.revoke_security_group_egress(*args)
if not success:
return "Could not find a matching egress rule", dict(status=404)
@ -121,7 +118,7 @@ class SecurityGroups(BaseResponse):
def revoke_security_group_ingress(self):
if self.is_not_dryrun('RevokeSecurityGroupIngress'):
for args in process_rules_from_querystring(self.querystring):
for args in self._process_rules_from_querystring():
self.ec2_backend.revoke_security_group_ingress(*args)
return REVOKE_SECURITY_GROUP_INGRESS_REPONSE

View File

@ -7,14 +7,11 @@ from moto.ec2.utils import filters_from_querystring
class Subnets(BaseResponse):
def create_subnet(self):
vpc_id = self.querystring.get('VpcId')[0]
cidr_block = self.querystring.get('CidrBlock')[0]
if 'AvailabilityZone' in self.querystring:
availability_zone = self.querystring['AvailabilityZone'][0]
else:
zone = random.choice(
self.ec2_backend.describe_availability_zones())
availability_zone = zone.name
vpc_id = self._get_param('VpcId')
cidr_block = self._get_param('CidrBlock')
availability_zone = self._get_param(
'AvailabilityZone', if_none=random.choice(
self.ec2_backend.describe_availability_zones()).name)
subnet = self.ec2_backend.create_subnet(
vpc_id,
cidr_block,
@ -24,30 +21,21 @@ class Subnets(BaseResponse):
return template.render(subnet=subnet)
def delete_subnet(self):
subnet_id = self.querystring.get('SubnetId')[0]
subnet_id = self._get_param('SubnetId')
subnet = self.ec2_backend.delete_subnet(subnet_id)
template = self.response_template(DELETE_SUBNET_RESPONSE)
return template.render(subnet=subnet)
def describe_subnets(self):
subnet_ids = self._get_multi_param('SubnetId')
filters = filters_from_querystring(self.querystring)
subnet_ids = []
idx = 1
key = 'SubnetId.{0}'.format(idx)
while key in self.querystring:
v = self.querystring[key]
subnet_ids.append(v[0])
idx += 1
key = 'SubnetId.{0}'.format(idx)
subnets = self.ec2_backend.get_all_subnets(subnet_ids, filters)
template = self.response_template(DESCRIBE_SUBNETS_RESPONSE)
return template.render(subnets=subnets)
def modify_subnet_attribute(self):
subnet_id = self.querystring.get('SubnetId')[0]
map_public_ip = self.querystring.get('MapPublicIpOnLaunch.Value')[0]
subnet_id = self._get_param('SubnetId')
map_public_ip = self._get_param('MapPublicIpOnLaunch.Value')
self.ec2_backend.modify_subnet_attribute(subnet_id, map_public_ip)
return MODIFY_SUBNET_ATTRIBUTE_RESPONSE

View File

@ -6,8 +6,8 @@ from moto.ec2.utils import filters_from_querystring
class VirtualPrivateGateways(BaseResponse):
def attach_vpn_gateway(self):
vpn_gateway_id = self.querystring.get('VpnGatewayId')[0]
vpc_id = self.querystring.get('VpcId')[0]
vpn_gateway_id = self._get_param('VpnGatewayId')
vpc_id = self._get_param('VpcId')
attachment = self.ec2_backend.attach_vpn_gateway(
vpn_gateway_id,
vpc_id
@ -16,13 +16,13 @@ class VirtualPrivateGateways(BaseResponse):
return template.render(attachment=attachment)
def create_vpn_gateway(self):
type = self.querystring.get('Type', None)[0]
type = self._get_param('Type')
vpn_gateway = self.ec2_backend.create_vpn_gateway(type)
template = self.response_template(CREATE_VPN_GATEWAY_RESPONSE)
return template.render(vpn_gateway=vpn_gateway)
def delete_vpn_gateway(self):
vpn_gateway_id = self.querystring.get('VpnGatewayId')[0]
vpn_gateway_id = self._get_param('VpnGatewayId')
vpn_gateway = self.ec2_backend.delete_vpn_gateway(vpn_gateway_id)
template = self.response_template(DELETE_VPN_GATEWAY_RESPONSE)
return template.render(vpn_gateway=vpn_gateway)
@ -34,8 +34,8 @@ class VirtualPrivateGateways(BaseResponse):
return template.render(vpn_gateways=vpn_gateways)
def detach_vpn_gateway(self):
vpn_gateway_id = self.querystring.get('VpnGatewayId')[0]
vpc_id = self.querystring.get('VpcId')[0]
vpn_gateway_id = self._get_param('VpnGatewayId')
vpc_id = self._get_param('VpcId')
attachment = self.ec2_backend.detach_vpn_gateway(
vpn_gateway_id,
vpc_id

View File

@ -5,16 +5,15 @@ from moto.core.responses import BaseResponse
class VPCPeeringConnections(BaseResponse):
def create_vpc_peering_connection(self):
vpc = self.ec2_backend.get_vpc(self.querystring.get('VpcId')[0])
peer_vpc = self.ec2_backend.get_vpc(
self.querystring.get('PeerVpcId')[0])
vpc = self.ec2_backend.get_vpc(self._get_param('VpcId'))
peer_vpc = self.ec2_backend.get_vpc(self._get_param('PeerVpcId'))
vpc_pcx = self.ec2_backend.create_vpc_peering_connection(vpc, peer_vpc)
template = self.response_template(
CREATE_VPC_PEERING_CONNECTION_RESPONSE)
return template.render(vpc_pcx=vpc_pcx)
def delete_vpc_peering_connection(self):
vpc_pcx_id = self.querystring.get('VpcPeeringConnectionId')[0]
vpc_pcx_id = self._get_param('VpcPeeringConnectionId')
vpc_pcx = self.ec2_backend.delete_vpc_peering_connection(vpc_pcx_id)
template = self.response_template(
DELETE_VPC_PEERING_CONNECTION_RESPONSE)
@ -27,14 +26,14 @@ class VPCPeeringConnections(BaseResponse):
return template.render(vpc_pcxs=vpc_pcxs)
def accept_vpc_peering_connection(self):
vpc_pcx_id = self.querystring.get('VpcPeeringConnectionId')[0]
vpc_pcx_id = self._get_param('VpcPeeringConnectionId')
vpc_pcx = self.ec2_backend.accept_vpc_peering_connection(vpc_pcx_id)
template = self.response_template(
ACCEPT_VPC_PEERING_CONNECTION_RESPONSE)
return template.render(vpc_pcx=vpc_pcx)
def reject_vpc_peering_connection(self):
vpc_pcx_id = self.querystring.get('VpcPeeringConnectionId')[0]
vpc_pcx_id = self._get_param('VpcPeeringConnectionId')
self.ec2_backend.reject_vpc_peering_connection(vpc_pcx_id)
template = self.response_template(
REJECT_VPC_PEERING_CONNECTION_RESPONSE)

View File

@ -7,15 +7,14 @@ from moto.ec2.utils import filters_from_querystring
class VPCs(BaseResponse):
def create_vpc(self):
cidr_block = self.querystring.get('CidrBlock')[0]
instance_tenancy = self.querystring.get(
'InstanceTenancy', ['default'])[0]
cidr_block = self._get_param('CidrBlock')
instance_tenancy = self._get_param('InstanceTenancy', if_none='default')
vpc = self.ec2_backend.create_vpc(cidr_block, instance_tenancy)
template = self.response_template(CREATE_VPC_RESPONSE)
return template.render(vpc=vpc)
def delete_vpc(self):
vpc_id = self.querystring.get('VpcId')[0]
vpc_id = self._get_param('VpcId')
vpc = self.ec2_backend.delete_vpc(vpc_id)
template = self.response_template(DELETE_VPC_RESPONSE)
return template.render(vpc=vpc)
@ -28,15 +27,15 @@ class VPCs(BaseResponse):
return template.render(vpcs=vpcs)
def describe_vpc_attribute(self):
vpc_id = self.querystring.get('VpcId')[0]
attribute = self.querystring.get('Attribute')[0]
vpc_id = self._get_param('VpcId')
attribute = self._get_param('Attribute')
attr_name = camelcase_to_underscores(attribute)
value = self.ec2_backend.describe_vpc_attribute(vpc_id, attr_name)
template = self.response_template(DESCRIBE_VPC_ATTRIBUTE_RESPONSE)
return template.render(vpc_id=vpc_id, attribute=attribute, value=value)
def modify_vpc_attribute(self):
vpc_id = self.querystring.get('VpcId')[0]
vpc_id = self._get_param('VpcId')
for attribute in ('EnableDnsSupport', 'EnableDnsHostnames'):
if self.querystring.get('%s.Value' % attribute):

View File

@ -6,17 +6,17 @@ from moto.ec2.utils import filters_from_querystring
class VPNConnections(BaseResponse):
def create_vpn_connection(self):
type = self.querystring.get("Type", [None])[0]
cgw_id = self.querystring.get("CustomerGatewayId", [None])[0]
vgw_id = self.querystring.get("VPNGatewayId", [None])[0]
static_routes = self.querystring.get("StaticRoutesOnly", [None])[0]
type = self._get_param('Type')
cgw_id = self._get_param('CustomerGatewayId')
vgw_id = self._get_param('VPNGatewayId')
static_routes = self._get_param('StaticRoutesOnly')
vpn_connection = self.ec2_backend.create_vpn_connection(
type, cgw_id, vgw_id, static_routes_only=static_routes)
template = self.response_template(CREATE_VPN_CONNECTION_RESPONSE)
return template.render(vpn_connection=vpn_connection)
def delete_vpn_connection(self):
vpn_connection_id = self.querystring.get('VpnConnectionId')[0]
vpn_connection_id = self._get_param('VpnConnectionId')
vpn_connection = self.ec2_backend.delete_vpn_connection(
vpn_connection_id)
template = self.response_template(DELETE_VPN_CONNECTION_RESPONSE)

View File

@ -230,11 +230,6 @@ def dhcp_configuration_from_querystring(querystring, option=u'DhcpConfiguration'
return response_values
def optional_from_querystring(parameter, querystring):
parameter_array = querystring.get(parameter)
return parameter_array[0] if parameter_array else None
def filters_from_querystring(querystring_dict):
response_values = {}
for key, value in querystring_dict.items():