Merge branch 'master' into dynamov2_no_indexes

This commit is contained in:
Sorin 2014-01-07 11:10:24 +02:00
commit 776e1bc65a
57 changed files with 367 additions and 202 deletions

3
.gitignore vendored
View File

@ -3,5 +3,8 @@ dist/*
.tox
.coverage
*.pyc
*~
.noseids
build/
.idea/

View File

@ -5,6 +5,7 @@ python:
env:
matrix:
#- BOTO_VERSION=2.13.3
- BOTO_VERSION=2.19.0
- BOTO_VERSION=2.12.0
- BOTO_VERSION=2.11.0
- BOTO_VERSION=2.10.0

View File

@ -12,3 +12,5 @@ Moto is written by Steve Pulec with contributions from:
* [Konstantinos Koukopoulos](https://github.com/kouk)
* [attili](https://github.com/attili)
* [JJ Zeng](https://github.com/jjofseattle)
* [Jon Haddad](https://github.com/rustyrazorblade)
* [Andres Riancho](https://github.com/andresriancho)

View File

@ -83,6 +83,8 @@ class FakeAutoScalingGroup(object):
self.launch_config = autoscaling_backend.launch_configurations[launch_config_name]
self.launch_config_name = launch_config_name
self.vpc_zone_identifier = vpc_zone_identifier
self.health_check_period = health_check_period
self.health_check_type = health_check_type
self.set_desired_capacity(desired_capacity)

View File

@ -7,9 +7,13 @@ from .utils import convert_regex_to_flask_path
class MockAWS(object):
nested_count = 0
def __init__(self, backend):
self.backend = backend
HTTPretty.reset()
if self.__class__.nested_count == 0:
HTTPretty.reset()
def __call__(self, func):
return self.decorate_callable(func)
@ -21,8 +25,11 @@ class MockAWS(object):
self.stop()
def start(self):
self.__class__.nested_count += 1
self.backend.reset()
HTTPretty.enable()
if not HTTPretty.is_enabled():
HTTPretty.enable()
for method in HTTPretty.METHODS:
for key, value in self.backend.urls.iteritems():
@ -40,7 +47,13 @@ class MockAWS(object):
)
def stop(self):
HTTPretty.disable()
self.__class__.nested_count -= 1
if self.__class__.nested_count < 0:
raise RuntimeError('Called stop() before start().')
if self.__class__.nested_count == 0:
HTTPretty.disable()
def decorate_callable(self, func):
def wrapper(*args, **kwargs):
@ -97,6 +110,13 @@ class BaseBackend(object):
return paths
@property
def url_bases(self):
"""
A list containing the url_bases extracted from urls.py
"""
return self._url_module.url_bases
@property
def flask_paths(self):
"""

View File

@ -9,18 +9,29 @@ from moto.core.utils import camelcase_to_underscores, method_names_from_class
class BaseResponse(object):
def dispatch(self, request, full_url, headers):
querystring = {}
if hasattr(request, 'body'):
# Boto
self.body = request.body
else:
# Flask server
# FIXME: At least in Flask==0.10.1, request.data is an empty string
# and the information we want is in request.form. Keeping self.body
# definition for back-compatibility
self.body = request.data
querystring = parse_qs(urlparse(full_url).query)
querystring = {}
for key, value in request.form.iteritems():
querystring[key] = [value, ]
if not querystring:
querystring = parse_qs(self.body)
querystring.update(parse_qs(urlparse(full_url).query))
if not querystring:
querystring = headers
querystring.update(parse_qs(self.body))
if not querystring:
querystring.update(headers)
self.uri = full_url
self.path = urlparse(full_url).path
@ -64,7 +75,13 @@ def metadata_response(request, full_url, headers):
Expiration=tomorrow.strftime("%Y-%m-%dT%H:%M:%SZ")
)
path = parsed_url.path.lstrip("/latest/meta-data/")
path = parsed_url.path
meta_data_prefix = "/latest/meta-data/"
# Strip prefix if it is there
if path.startswith(meta_data_prefix):
path = path[len(meta_data_prefix):]
if path == '':
result = 'iam'
elif path == 'iam':

View File

@ -3,10 +3,10 @@ import datetime
import json
try:
from collections import OrderedDict
from collections import OrderedDict
except ImportError:
# python 2.6 or earlier, use backport
from ordereddict import OrderedDict
# python 2.6 or earlier, use backport
from ordereddict import OrderedDict
from moto.core import BaseBackend

View File

@ -1,4 +1,5 @@
import copy
import itertools
from collections import defaultdict
from boto.ec2.instance import Instance as BotoInstance, Reservation
@ -35,23 +36,23 @@ class Instance(BotoInstance):
self._state = InstanceState("running", 16)
self.user_data = user_data
def start(self):
def start(self, *args, **kwargs):
self._state.name = "running"
self._state.code = 16
def stop(self):
def stop(self, *args, **kwargs):
self._state.name = "stopped"
self._state.code = 80
def terminate(self):
def terminate(self, *args, **kwargs):
self._state.name = "terminated"
self._state.code = 48
def reboot(self):
def reboot(self, *args, **kwargs):
self._state.name = "running"
self._state.code = 16
def get_tags(self):
def get_tags(self, *args, **kwargs):
tags = ec2_backend.describe_tags(self.id)
return tags
@ -300,45 +301,50 @@ class SecurityRule(object):
class SecurityGroup(object):
def __init__(self, group_id, name, description):
def __init__(self, group_id, name, description, vpc_id=None):
self.id = group_id
self.name = name
self.description = description
self.ingress_rules = []
self.egress_rules = []
self.vpc_id = vpc_id
class SecurityGroupBackend(object):
def __init__(self):
self.groups = {}
# the key in the dict group is the vpc_id or None (non-vpc)
self.groups = defaultdict(dict)
super(SecurityGroupBackend, self).__init__()
def create_security_group(self, name, description, force=False):
def create_security_group(self, name, description, vpc_id=None, force=False):
group_id = random_security_group_id()
if not force:
existing_group = self.get_security_group_from_name(name)
existing_group = self.get_security_group_from_name(name, vpc_id)
if existing_group:
return None
group = SecurityGroup(group_id, name, description)
self.groups[group_id] = group
group = SecurityGroup(group_id, name, description, vpc_id=vpc_id)
self.groups[vpc_id][group_id] = group
return group
def describe_security_groups(self):
return self.groups.values()
return itertools.chain(*[x.values() for x in self.groups.values()])
def delete_security_group(self, name_or_group_id):
if name_or_group_id in self.groups:
# Group Id
return self.groups.pop(name_or_group_id)
else:
# Group Name
group = self.get_security_group_from_name(name_or_group_id)
def delete_security_group(self, name=None, group_id=None):
if group_id:
# loop over all the SGs, find the right one
for vpc in self.groups.values():
if group_id in vpc:
return vpc.pop(group_id)
elif name:
# Group Name. Has to be in standard EC2, VPC needs to be identified by group_id
group = self.get_security_group_from_name(name, None)
if group:
return self.groups.pop(group.id)
return self.groups[None].pop(group.id)
def get_security_group_from_name(self, name):
for group_id, group in self.groups.iteritems():
def get_security_group_from_name(self, name, vpc_id):
for group_id, group in self.groups[vpc_id].iteritems():
if group.name == name:
return group
@ -347,20 +353,20 @@ class SecurityGroupBackend(object):
default_group = ec2_backend.create_security_group("default", "The default security group", force=True)
return default_group
def authorize_security_group_ingress(self, group_name, ip_protocol, from_port, to_port, ip_ranges=None, source_group_names=None):
group = self.get_security_group_from_name(group_name)
def authorize_security_group_ingress(self, group_name, ip_protocol, from_port, to_port, ip_ranges=None, source_group_names=None, vpc_id=None):
group = self.get_security_group_from_name(group_name, vpc_id)
source_groups = []
for source_group_name in source_group_names:
source_groups.append(self.get_security_group_from_name(source_group_name))
source_groups.append(self.get_security_group_from_name(source_group_name, vpc_id))
security_rule = SecurityRule(ip_protocol, from_port, to_port, ip_ranges, source_groups)
group.ingress_rules.append(security_rule)
def revoke_security_group_ingress(self, group_name, ip_protocol, from_port, to_port, ip_ranges=None, source_group_names=None):
group = self.get_security_group_from_name(group_name)
def revoke_security_group_ingress(self, group_name, ip_protocol, from_port, to_port, ip_ranges=None, source_group_names=None, vpc_id=None):
group = self.get_security_group_from_name(group_name, vpc_id)
source_groups = []
for source_group_name in source_group_names:
source_groups.append(self.get_security_group_from_name(source_group_name))
source_groups.append(self.get_security_group_from_name(source_group_name, vpc_id))
security_rule = SecurityRule(ip_protocol, from_port, to_port, ip_ranges, source_groups)
if security_rule in group.ingress_rules:
@ -536,12 +542,12 @@ class SpotInstanceRequest(object):
self.security_groups = []
if security_groups:
for group_name in security_groups:
group = ec2_backend.get_security_group_from_name(group_name)
group = ec2_backend.get_security_group_from_name(group_name, None)
if group:
self.security_groups.append(group)
else:
# If not security groups, add the default
default_group = ec2_backend.get_security_group_from_name("default")
default_group = ec2_backend.get_security_group_from_name("default", None)
self.security_groups.append(default_group)
@ -556,7 +562,7 @@ class SpotRequestBackend(object):
instance_type, placement, kernel_id, ramdisk_id,
monitoring_enabled, subnet_id):
requests = []
for index in range(count):
for _ in range(count):
spot_request_id = random_spot_request_id()
request = SpotInstanceRequest(
spot_request_id, price, image_id, type, valid_from, valid_until,
@ -578,7 +584,7 @@ class SpotRequestBackend(object):
return requests
class ElasticAddress():
class ElasticAddress(object):
def __init__(self, domain):
self.public_ip = random_ip()
self.allocation_id = random_eip_allocation_id() if domain == "vpc" else None

View File

@ -1,5 +1,3 @@
from moto.core.responses import BaseResponse
from .amazon_dev_pay import AmazonDevPay
from .amis import AmisResponse
from .availability_zones_and_regions import AvailabilityZonesAndRegions
@ -31,7 +29,6 @@ from .windows import Windows
class EC2Response(
BaseResponse,
AmazonDevPay,
AmisResponse,
AvailabilityZonesAndRegions,

View File

@ -1,9 +1,6 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class AmazonDevPay(object):
class AmazonDevPay(BaseResponse):
def confirm_product_instance(self):
raise NotImplementedError('AmazonDevPay.confirm_product_instance is not yet implemented')

View File

@ -1,10 +1,11 @@
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import instance_ids_from_querystring, image_ids_from_querystring
class AmisResponse(object):
class AmisResponse(BaseResponse):
def create_image(self):
name = self.querystring.get('Name')[0]
if "Description" in self.querystring:

View File

@ -1,9 +1,10 @@
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
class AvailabilityZonesAndRegions(object):
class AvailabilityZonesAndRegions(BaseResponse):
def describe_availability_zones(self):
zones = ec2_backend.describe_availability_zones()
template = Template(DESCRIBE_ZONES_RESPONSE)

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class CustomerGateways(object):
class CustomerGateways(BaseResponse):
def create_customer_gateway(self):
raise NotImplementedError('CustomerGateways(AmazonVPC).create_customer_gateway is not yet implemented')

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class DHCPOptions(object):
class DHCPOptions(BaseResponse):
def associate_dhcp_options(self):
raise NotImplementedError('DHCPOptions(AmazonVPC).associate_dhcp_options is not yet implemented')

View File

@ -1,9 +1,10 @@
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
class ElasticBlockStore(object):
class ElasticBlockStore(BaseResponse):
def attach_volume(self):
volume_id = self.querystring.get('VolumeId')[0]
instance_id = self.querystring.get('InstanceId')[0]

View File

@ -1,11 +1,11 @@
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import sequence_from_querystring
class ElasticIPAddresses(object):
class ElasticIPAddresses(BaseResponse):
def allocate_address(self):
if "Domain" in self.querystring:
domain = self.querystring.get('Domain')[0]

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class ElasticNetworkInterfaces(object):
class ElasticNetworkInterfaces(BaseResponse):
def attach_network_interface(self):
raise NotImplementedError('ElasticNetworkInterfaces(AmazonVPC).attach_network_interface is not yet implemented')

View File

@ -1,10 +1,11 @@
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import instance_ids_from_querystring
class General(object):
class General(BaseResponse):
def get_console_output(self):
self.instance_ids = instance_ids_from_querystring(self.querystring)
instance_id = self.instance_ids[0]

View File

@ -1,12 +1,13 @@
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.core.utils import camelcase_to_underscores
from moto.ec2.models import ec2_backend
from moto.ec2.utils import instance_ids_from_querystring, filters_from_querystring, filter_reservations
from moto.ec2.exceptions import InvalidIdError
class InstanceResponse(object):
class InstanceResponse(BaseResponse):
def describe_instances(self):
instance_ids = instance_ids_from_querystring(self.querystring)
if instance_ids:
@ -67,12 +68,16 @@ class InstanceResponse(object):
return template.render(instance=instance, attribute=attribute, value=value)
def modify_instance_attribute(self):
attribute_key = None
for key, value in self.querystring.iteritems():
if '.Value' in key:
attribute_key = key
break
value = self.querystring.get(key)[0]
normalized_attribute = camelcase_to_underscores(key.split(".")[0])
if not attribute_key:
return
value = self.querystring.get(attribute_key)[0]
normalized_attribute = camelcase_to_underscores(attribute_key.split(".")[0])
instance_ids = instance_ids_from_querystring(self.querystring)
instance_id = instance_ids[0]
ec2_backend.modify_instance_attribute(instance_id, normalized_attribute, value)

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class InternetGateways(object):
class InternetGateways(BaseResponse):
def attach_internet_gateway(self):
raise NotImplementedError('InternetGateways(AmazonVPC).attach_internet_gateway is not yet implemented')

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class IPAddresses(object):
class IPAddresses(BaseResponse):
def assign_private_ip_addresses(self):
raise NotImplementedError('IPAddresses.assign_private_ip_addresses is not yet implemented')

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class KeyPairs(object):
class KeyPairs(BaseResponse):
def create_key_pair(self):
raise NotImplementedError('KeyPairs.create_key_pair is not yet implemented')

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class Monitoring(object):
class Monitoring(BaseResponse):
def monitor_instances(self):
raise NotImplementedError('Monitoring.monitor_instances is not yet implemented')

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class NetworkACLs(object):
class NetworkACLs(BaseResponse):
def create_network_acl(self):
raise NotImplementedError('NetworkACLs(AmazonVPC).create_network_acl is not yet implemented')

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class PlacementGroups(object):
class PlacementGroups(BaseResponse):
def create_placement_group(self):
raise NotImplementedError('PlacementGroups.create_placement_group is not yet implemented')

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class ReservedInstances(object):
class ReservedInstances(BaseResponse):
def cancel_reserved_instances_listing(self):
raise NotImplementedError('ReservedInstances.cancel_reserved_instances_listing is not yet implemented')

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class RouteTables(object):
class RouteTables(BaseResponse):
def associate_route_table(self):
raise NotImplementedError('RouteTables(AmazonVPC).associate_route_table is not yet implemented')

View File

@ -1,5 +1,6 @@
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
@ -20,7 +21,7 @@ def process_rules_from_querystring(querystring):
return (name, ip_protocol, from_port, to_port, ip_ranges, source_groups)
class SecurityGroups(object):
class SecurityGroups(BaseResponse):
def authorize_security_group_egress(self):
raise NotImplementedError('SecurityGroups.authorize_security_group_egress is not yet implemented')
@ -31,7 +32,8 @@ class SecurityGroups(object):
def create_security_group(self):
name = self.querystring.get('GroupName')[0]
description = self.querystring.get('GroupDescription')[0]
group = ec2_backend.create_security_group(name, description)
vpc_id = self.querystring.get("VpcId", [None])[0]
group = ec2_backend.create_security_group(name, description, vpc_id=vpc_id)
if not group:
# There was an exisitng group
return "There was an existing security group with name {0}".format(name), dict(status=409)
@ -40,9 +42,16 @@ class SecurityGroups(object):
def delete_security_group(self):
# TODO this should raise an error if there are instances in the group. See http://docs.aws.amazon.com/AWSEC2/latest/APIReference/ApiReference-query-DeleteSecurityGroup.html
name = self.querystring.get('GroupName')[0]
group = ec2_backend.delete_security_group(name)
name = self.querystring.get('GroupName')
sg_id = self.querystring.get('GroupId')
if name:
group = ec2_backend.delete_security_group(name[0])
elif sg_id:
group = ec2_backend.delete_security_group(group_id=sg_id[0])
# needs name or group now
if not group:
# There was no such group
return "There was no security group with name {0}".format(name), dict(status=404)
@ -83,7 +92,7 @@ DESCRIBE_SECURITY_GROUPS_RESPONSE = """<DescribeSecurityGroupsResponse xmlns="ht
<groupId>{{ group.id }}</groupId>
<groupName>{{ group.name }}</groupName>
<groupDescription>{{ group.description }}</groupDescription>
<vpcId/>
<vpcId>{{ group.vpc_id or ""}}</vpcId>
<ipPermissions>
{% for rule in group.ingress_rules %}
<item>

View File

@ -1,9 +1,10 @@
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
class SpotInstances(object):
class SpotInstances(BaseResponse):
def _get_param(self, param_name):
return self.querystring.get(param_name, [None])[0]

View File

@ -1,9 +1,10 @@
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
class Subnets(object):
class Subnets(BaseResponse):
def create_subnet(self):
vpc_id = self.querystring.get('VpcId')[0]
cidr_block = self.querystring.get('CidrBlock')[0]

View File

@ -1,10 +1,11 @@
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
class TagResponse(object):
class TagResponse(BaseResponse):
def create_tags(self):
resource_ids = resource_ids_from_querystring(self.querystring)

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class VirtualPrivateGateways(object):
class VirtualPrivateGateways(BaseResponse):
def attach_vpn_gateway(self):
raise NotImplementedError('VirtualPrivateGateways(AmazonVPC).attach_vpn_gateway is not yet implemented')

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class VMExport(object):
class VMExport(BaseResponse):
def cancel_export_task(self):
raise NotImplementedError('VMExport.cancel_export_task is not yet implemented')

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class VMImport(object):
class VMImport(BaseResponse):
def cancel_conversion_task(self):
raise NotImplementedError('VMImport.cancel_conversion_task is not yet implemented')

View File

@ -1,9 +1,10 @@
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
class VPCs(object):
class VPCs(BaseResponse):
def create_vpc(self):
cidr_block = self.querystring.get('CidrBlock')[0]
vpc = ec2_backend.create_vpc(cidr_block)

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class VPNConnections(object):
class VPNConnections(BaseResponse):
def create_vpn_connection(self):
raise NotImplementedError('VPNConnections(AmazonVPC).create_vpn_connection is not yet implemented')

View File

@ -1,10 +1,7 @@
from jinja2 import Template
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.core.responses import BaseResponse
class Windows(object):
class Windows(BaseResponse):
def bundle_instance(self):
raise NotImplementedError('Windows.bundle_instance is not yet implemented')

View File

@ -100,7 +100,7 @@ class FakeJobFlow(object):
def master_instance_type(self):
groups = self.instance_groups
if groups:
groups[0].type
return groups[0].type
else:
return self.initial_master_instance_type
@ -108,7 +108,7 @@ class FakeJobFlow(object):
def slave_instance_type(self):
groups = self.instance_groups
if groups:
groups[0].type
return groups[0].type
else:
return self.initial_slave_instance_type

View File

@ -2,7 +2,7 @@ from moto.core import BaseBackend
from moto.core.utils import get_random_hex
class FakeZone:
class FakeZone(object):
def __init__(self, name, id):
self.name = name

View File

@ -2,6 +2,6 @@ from moto.s3.models import S3Backend
class S3BucketPathBackend(S3Backend):
True
pass
s3bucket_path_backend = S3BucketPathBackend()

View File

@ -1,16 +1,57 @@
import re
import sys
import argparse
from threading import Lock
from flask import Flask
from werkzeug.routing import BaseConverter
from werkzeug.serving import run_simple
from moto.backends import BACKENDS
from moto.core.utils import convert_flask_to_httpretty_response
app = Flask(__name__)
HTTP_METHODS = ["GET", "POST", "PUT", "DELETE", "HEAD"]
class DomainDispatcherApplication(object):
"""
Dispatch requests to different applications based on the "Host:" header
value. We'll match the host header value with the url_bases of each backend.
"""
def __init__(self, create_app, service=None):
self.create_app = create_app
self.lock = Lock()
self.app_instances = {}
self.service = service
def get_backend_for_host(self, host):
if self.service:
return self.service
for backend_name, backend in BACKENDS.iteritems():
for url_base in backend.url_bases:
if re.match(url_base, 'http://%s' % host):
return backend_name
raise RuntimeError('Invalid host: "%s"' % host)
def get_application(self, host):
host = host.split(':')[0]
with self.lock:
backend = self.get_backend_for_host(host)
app = self.app_instances.get(backend, None)
if app is None:
app = self.create_app(backend)
self.app_instances[backend] = app
return app
def __call__(self, environ, start_response):
backend_app = self.get_application(environ['HTTP_HOST'])
return backend_app(environ, start_response)
class RegexConverter(BaseConverter):
# http://werkzeug.pocoo.org/docs/routing/#custom-converters
def __init__(self, url_map, *items):
@ -18,25 +59,34 @@ class RegexConverter(BaseConverter):
self.regex = items[0]
def configure_urls(service):
backend = BACKENDS[service]
def create_backend_app(service):
from werkzeug.routing import Map
# Create the backend_app
backend_app = Flask(__name__)
backend_app.debug = True
# Reset view functions to reset the app
app.view_functions = {}
app.url_map = Map()
app.url_map.converters['regex'] = RegexConverter
backend_app.view_functions = {}
backend_app.url_map = Map()
backend_app.url_map.converters['regex'] = RegexConverter
backend = BACKENDS[service]
for url_path, handler in backend.flask_paths.iteritems():
app.route(url_path, methods=HTTP_METHODS)(convert_flask_to_httpretty_response(handler))
backend_app.route(url_path, methods=HTTP_METHODS)(convert_flask_to_httpretty_response(handler))
return backend_app
def main(argv=sys.argv[1:]):
available_services = BACKENDS.keys()
parser = argparse.ArgumentParser()
# Keep this for backwards compat
parser.add_argument(
'service', type=str,
choices=available_services,
help='Choose which mechanism you want to run')
"service",
type=str,
nargs='?', # http://stackoverflow.com/a/4480202/731592
default=None)
parser.add_argument(
'-H', '--host', type=str,
help='Which host to bind',
@ -48,10 +98,11 @@ def main(argv=sys.argv[1:]):
args = parser.parse_args(argv)
configure_urls(args.service)
# Wrap the main application
main_app = DomainDispatcherApplication(create_backend_app, service=args.service)
main_app.debug = True
app.testing = True
app.run(host=args.host, port=args.port)
run_simple(args.host, args.port, main_app)
if __name__ == '__main__':
main()

View File

@ -100,7 +100,7 @@ class SQSBackend(BaseBackend):
def receive_messages(self, queue_name, count):
queue = self.get_queue(queue_name)
result = []
for index in range(count):
for _ in range(count):
if queue.messages:
result.append(queue.messages.pop(0))
return result

View File

@ -1,7 +1,7 @@
from .responses import QueueResponse, QueuesResponse
url_bases = [
"https?://(.*).amazonaws.com"
"https?://(.*?)(queue|sqs)(.*?).amazonaws.com"
]
url_paths = {

View File

@ -17,7 +17,7 @@ if sys.version_info < (2, 7):
setup(
name='moto',
version='0.2.11',
version='0.2.15',
description='A library that allows your python tests to easily'
' mock out the boto library',
author='Steve Pulec',
@ -28,6 +28,6 @@ setup(
'moto_server = moto.server:main',
],
},
packages=find_packages(),
packages=find_packages(exclude=("tests", "tests.*")),
install_requires=install_requires,
)

View File

@ -5,11 +5,12 @@ import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("autoscaling")
def test_describe_autoscaling_groups():
test_client = server.app.test_client()
backend = server.create_backend_app("autoscaling")
test_client = backend.test_client()
res = test_client.get('/?Action=DescribeLaunchConfigurations')
res.data.should.contain('<DescribeLaunchConfigurationsResponse')

View File

@ -0,0 +1,28 @@
import unittest
from boto.sqs.connection import SQSConnection
from boto.sqs.message import Message
from boto.ec2 import EC2Connection
from moto import mock_sqs, mock_ec2
class TestNestedDecorators(unittest.TestCase):
@mock_sqs
def setup_sqs_queue(self):
conn = SQSConnection()
q = conn.create_queue('some-queue')
m = Message()
m.set_body('This is my first message.')
q.write(m)
self.assertEqual(q.count(), 1)
@mock_ec2
def test_nested(self):
self.setup_sqs_queue()
conn = EC2Connection()
conn.run_instances('ami-123456')

View File

@ -13,13 +13,17 @@ def test_wrong_arguments():
pass
@patch('moto.server.app.run')
def test_right_arguments(app_run):
@patch('moto.server.run_simple')
def test_right_arguments(run_simple):
main(["s3"])
app_run.assert_called_once_with(host='0.0.0.0', port=5000)
func_call = run_simple.call_args[0]
func_call[0].should.equal("0.0.0.0")
func_call[1].should.equal(5000)
@patch('moto.server.app.run')
def test_port_argument(app_run):
@patch('moto.server.run_simple')
def test_port_argument(run_simple):
main(["s3", "--port", "8080"])
app_run.assert_called_once_with(host='0.0.0.0', port=8080)
func_call = run_simple.call_args[0]
func_call[0].should.equal("0.0.0.0")
func_call[1].should.equal(8080)

View File

@ -5,11 +5,12 @@ import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("dynamodb")
def test_table_list():
test_client = server.app.test_client()
backend = server.create_backend_app("dynamodb")
test_client = backend.test_client()
res = test_client.get('/')
res.status_code.should.equal(404)

View File

@ -20,6 +20,42 @@ def test_create_and_describe_security_group():
all_groups.should.have.length_of(1)
all_groups[0].name.should.equal('test security group')
@mock_ec2
def test_create_and_describe_vpc_security_group():
conn = boto.connect_ec2('the_key', 'the_secret')
vpc_id = 'vpc-5300000c'
security_group = conn.create_security_group('test security group', 'this is a test security group', vpc_id=vpc_id)
security_group.vpc_id.should.equal(vpc_id)
security_group.name.should.equal('test security group')
security_group.description.should.equal('this is a test security group')
# Trying to create another group with the same name in the same VPC should throw an error
conn.create_security_group.when.called_with('test security group', 'this is a test security group', vpc_id).should.throw(EC2ResponseError)
all_groups = conn.get_all_security_groups()
all_groups[0].vpc_id.should.equal(vpc_id)
all_groups.should.have.length_of(1)
all_groups[0].name.should.equal('test security group')
@mock_ec2
def test_create_two_security_groups_with_same_name_in_different_vpc():
conn = boto.connect_ec2('the_key', 'the_secret')
vpc_id = 'vpc-5300000c'
vpc_id2 = 'vpc-5300000d'
sg1 = conn.create_security_group('test security group', 'this is a test security group', vpc_id)
sg2 = conn.create_security_group('test security group', 'this is a test security group', vpc_id2)
all_groups = conn.get_all_security_groups()
all_groups.should.have.length_of(2)
all_groups[0].name.should.equal('test security group')
all_groups[1].name.should.equal('test security group')
@mock_ec2
def test_deleting_security_groups():
@ -37,9 +73,18 @@ def test_deleting_security_groups():
conn.get_all_security_groups().should.have.length_of(1)
# Delete by group id
conn.delete_security_group(security_group1.id)
conn.delete_security_group(group_id=security_group1.id)
conn.get_all_security_groups().should.have.length_of(0)
@mock_ec2
def test_delete_security_group_in_vpc():
conn = boto.connect_ec2('the_key', 'the_secret')
vpc_id = "vpc-12345"
security_group1 = conn.create_security_group('test1', 'test1', vpc_id)
# this should not throw an exception
conn.delete_security_group(group_id=security_group1.id)
@mock_ec2
def test_authorize_ip_range_and_revoke():

View File

@ -6,11 +6,12 @@ import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("ec2")
def test_ec2_server_get():
test_client = server.app.test_client()
backend = server.create_backend_app("ec2")
test_client = backend.test_client()
res = test_client.get('/?Action=RunInstances&ImageId=ami-60a54009')
groups = re.search("<instanceId>(.*)</instanceId>", res.data)

View File

@ -5,11 +5,12 @@ import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("elb")
def test_elb_describe_instances():
test_client = server.app.test_client()
backend = server.create_backend_app("elb")
test_client = backend.test_client()
res = test_client.get('/?Action=DescribeLoadBalancers')
res.data.should.contain('DescribeLoadBalancersResponse')

View File

@ -5,11 +5,12 @@ import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("emr")
def test_describe_jobflows():
test_client = server.app.test_client()
backend = server.create_backend_app("emr")
test_client = backend.test_client()
res = test_client.get('/?Action=DescribeJobFlows')
res.data.should.contain('<DescribeJobFlowsResult>')

View File

@ -5,18 +5,21 @@ import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("s3")
def test_s3_server_get():
test_client = server.app.test_client()
backend = server.create_backend_app("s3")
test_client = backend.test_client()
res = test_client.get('/')
res.data.should.contain('ListAllMyBucketsResult')
def test_s3_server_bucket_create():
test_client = server.app.test_client()
backend = server.create_backend_app("s3")
test_client = backend.test_client()
res = test_client.put('/', 'http://foobar.localhost:5000/')
res.status_code.should.equal(200)
@ -36,7 +39,9 @@ def test_s3_server_bucket_create():
def test_s3_server_post_to_bucket():
test_client = server.app.test_client()
backend = server.create_backend_app("s3")
test_client = backend.test_client()
res = test_client.put('/', 'http://foobar.localhost:5000/')
res.status_code.should.equal(200)

View File

@ -5,18 +5,21 @@ import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("s3bucket_path")
def test_s3_server_get():
test_client = server.app.test_client()
backend = server.create_backend_app("s3bucket_path")
test_client = backend.test_client()
res = test_client.get('/')
res.data.should.contain('ListAllMyBucketsResult')
def test_s3_server_bucket_create():
test_client = server.app.test_client()
backend = server.create_backend_app("s3bucket_path")
test_client = backend.test_client()
res = test_client.put('/foobar', 'http://localhost:5000')
res.status_code.should.equal(200)
@ -36,7 +39,9 @@ def test_s3_server_bucket_create():
def test_s3_server_post_to_bucket():
test_client = server.app.test_client()
backend = server.create_backend_app("s3bucket_path")
test_client = backend.test_client()
res = test_client.put('/foobar', 'http://localhost:5000/')
res.status_code.should.equal(200)

View File

@ -5,10 +5,11 @@ import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("ses")
def test_ses_list_identities():
test_client = server.app.test_client()
backend = server.create_backend_app("ses")
test_client = backend.test_client()
res = test_client.get('/?Action=ListIdentities')
res.data.should.contain("ListIdentitiesResponse")

View File

@ -6,11 +6,12 @@ import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("sqs")
def test_sqs_list_identities():
test_client = server.app.test_client()
backend = server.create_backend_app("sqs")
test_client = backend.test_client()
res = test_client.get('/?Action=ListQueues')
res.data.should.contain("ListQueuesResponse")

View File

@ -5,11 +5,12 @@ import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("sts")
def test_sts_get_session_token():
test_client = server.app.test_client()
backend = server.create_backend_app("sts")
test_client = backend.test_client()
res = test_client.get('/?Action=GetSessionToken')
res.status_code.should.equal(200)
res.data.should.contain("SessionToken")