Fix conflict with subnet/network ACL association during subnet creation.

This commit is contained in:
Tyler Sanders 2014-11-17 09:37:38 -06:00
commit 2a6c4c89bf
54 changed files with 616 additions and 303 deletions

View File

@ -6,7 +6,6 @@ env:
matrix:
- BOTO_VERSION=2.34.0
- BOTO_VERSION=2.25.0
- BOTO_VERSION=2.7
matrix:
include:
- python: "3.3"

View File

@ -30,3 +30,4 @@ Moto is written by Steve Pulec with contributions from:
* [Omer Katz](https://github.com/thedrow)
* [Joseph Lawson](https://github.com/joekiller)
* [Trey Tacon](https://github.com/ttacon)
* [Peter](https://github.com/pvbouwel)

View File

@ -2,19 +2,19 @@ from __future__ import unicode_literals
import logging
logging.getLogger('boto').setLevel(logging.CRITICAL)
from .autoscaling import mock_autoscaling
from .cloudformation import mock_cloudformation
from .cloudwatch import mock_cloudwatch
from .dynamodb import mock_dynamodb
from .dynamodb2 import mock_dynamodb2
from .ec2 import mock_ec2
from .elb import mock_elb
from .emr import mock_emr
from .iam import mock_iam
from .s3 import mock_s3
from .s3bucket_path import mock_s3bucket_path
from .ses import mock_ses
from .sns import mock_sns
from .sqs import mock_sqs
from .sts import mock_sts
from .route53 import mock_route53
from .autoscaling import mock_autoscaling # flake8: noqa
from .cloudformation import mock_cloudformation # flake8: noqa
from .cloudwatch import mock_cloudwatch # flake8: noqa
from .dynamodb import mock_dynamodb # flake8: noqa
from .dynamodb2 import mock_dynamodb2 # flake8: noqa
from .ec2 import mock_ec2 # flake8: noqa
from .elb import mock_elb # flake8: noqa
from .emr import mock_emr # flake8: noqa
from .iam import mock_iam # flake8: noqa
from .s3 import mock_s3 # flake8: noqa
from .s3bucket_path import mock_s3bucket_path # flake8: noqa
from .ses import mock_ses # flake8: noqa
from .sns import mock_sns # flake8: noqa
from .sqs import mock_sqs # flake8: noqa
from .sts import mock_sts # flake8: noqa
from .route53 import mock_route53 # flake8: noqa

View File

@ -1,7 +1,9 @@
from __future__ import unicode_literals
from .models import autoscaling_backend, autoscaling_backends
from .models import autoscaling_backends
from ..core.models import MockAWS
autoscaling_backend = autoscaling_backends['us-east-1']
def mock_autoscaling(func=None):
if func:

View File

@ -47,12 +47,13 @@ class FakeLaunchConfiguration(object):
self.block_device_mapping_dict = block_device_mapping_dict
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
instance_profile_name = properties.get("IamInstanceProfile")
config = default_autoscaling_backend.create_launch_configuration(
backend = autoscaling_backends[region_name]
config = backend.create_launch_configuration(
name=resource_name,
image_id=properties.get("ImageId"),
key_name=properties.get("KeyName"),
@ -128,13 +129,14 @@ class FakeAutoScalingGroup(object):
self.set_desired_capacity(desired_capacity)
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
launch_config_name = properties.get("LaunchConfigurationName")
load_balancer_names = properties.get("LoadBalancerNames", [])
group = default_autoscaling_backend.create_autoscaling_group(
backend = autoscaling_backends[region_name]
group = backend.create_autoscaling_group(
name=resource_name,
availability_zones=properties.get("AvailabilityZones", []),
desired_capacity=properties.get("DesiredCapacity"),
@ -357,7 +359,3 @@ class AutoScalingBackend(BaseBackend):
autoscaling_backends = {}
for region, ec2_backend in ec2_backends.items():
autoscaling_backends[region] = AutoScalingBackend(ec2_backend)
autoscaling_backend = autoscaling_backends['us-east-1']
default_autoscaling_backend = autoscaling_backend

View File

@ -1,3 +1,12 @@
from __future__ import unicode_literals
from .models import cloudformation_backend
mock_cloudformation = cloudformation_backend.decorator
from .models import cloudformation_backends
from ..core.models import MockAWS
cloudformation_backend = cloudformation_backends['us-east-1']
def mock_cloudformation(func=None):
if func:
return MockAWS(cloudformation_backends)(func)
else:
return MockAWS(cloudformation_backends)

View File

@ -3,7 +3,6 @@ from boto.exception import BotoServerError
from jinja2 import Template
class UnformattedGetAttTemplateException(Exception):
description = 'Template error: resource {0} does not support attribute type {1} in Fn::GetAtt'
status_code = 400

View File

@ -1,6 +1,7 @@
from __future__ import unicode_literals
import json
import boto.cloudformation
from moto.core import BaseBackend
from .parsing import ResourceMap, OutputMap
@ -9,9 +10,10 @@ from .exceptions import ValidationError
class FakeStack(object):
def __init__(self, stack_id, name, template, notification_arns=None):
def __init__(self, stack_id, name, template, region_name, notification_arns=None):
self.stack_id = stack_id
self.name = name
self.region_name = region_name
self.notification_arns = notification_arns if notification_arns else []
self.template = template
self.status = 'CREATE_COMPLETE'
@ -19,7 +21,7 @@ class FakeStack(object):
template_dict = json.loads(self.template)
self.description = template_dict.get('Description')
self.resource_map = ResourceMap(stack_id, name, template_dict)
self.resource_map = ResourceMap(stack_id, name, region_name, template_dict)
self.resource_map.create()
self.output_map = OutputMap(self.resource_map, template_dict)
@ -40,9 +42,15 @@ class CloudFormationBackend(BaseBackend):
self.stacks = {}
self.deleted_stacks = {}
def create_stack(self, name, template, notification_arns=None):
def create_stack(self, name, template, region_name, notification_arns=None):
stack_id = generate_stack_id(name)
new_stack = FakeStack(stack_id=stack_id, name=name, template=template, notification_arns=notification_arns)
new_stack = FakeStack(
stack_id=stack_id,
name=name,
template=template,
region_name=region_name,
notification_arns=notification_arns,
)
self.stacks[stack_id] = new_stack
return new_stack
@ -90,4 +98,6 @@ class CloudFormationBackend(BaseBackend):
self.delete_stack(stack_to_delete.stack_id)
cloudformation_backend = CloudFormationBackend()
cloudformation_backends = {}
for region in boto.cloudformation.regions():
cloudformation_backends[region.name] = CloudFormationBackend()

View File

@ -120,7 +120,7 @@ def resource_name_property_from_type(resource_type):
return NAME_TYPE_MAP.get(resource_type)
def parse_resource(logical_id, resource_json, resources_map):
def parse_resource(logical_id, resource_json, resources_map, region_name):
resource_type = resource_json['Type']
resource_class = resource_class_from_type(resource_type)
if not resource_class:
@ -129,9 +129,9 @@ def parse_resource(logical_id, resource_json, resources_map):
resource_json = clean_json(resource_json, resources_map)
resource_name_property = resource_name_property_from_type(resource_type)
if resource_name_property:
if not 'Properties' in resource_json:
if 'Properties' not in resource_json:
resource_json['Properties'] = dict()
if not resource_name_property in resource_json['Properties']:
if resource_name_property not in resource_json['Properties']:
resource_json['Properties'][resource_name_property] = '{0}-{1}-{2}'.format(
resources_map.get('AWS::StackName'),
logical_id,
@ -142,7 +142,7 @@ def parse_resource(logical_id, resource_json, resources_map):
logical_id,
random_suffix())
resource = resource_class.create_from_cloudformation_json(resource_name, resource_json)
resource = resource_class.create_from_cloudformation_json(resource_name, resource_json, region_name)
resource.type = resource_type
resource.logical_resource_id = logical_id
return resource
@ -164,9 +164,10 @@ class ResourceMap(collections.Mapping):
each resources is passed this lazy map that it can grab dependencies from.
"""
def __init__(self, stack_id, stack_name, template):
def __init__(self, stack_id, stack_name, region_name, template):
self._template = template
self._resource_json_map = template['Resources']
self._region_name = region_name
# Create the default resources
self._parsed_resources = {
@ -183,7 +184,7 @@ class ResourceMap(collections.Mapping):
return self._parsed_resources[resource_logical_id]
else:
resource_json = self._resource_json_map.get(resource_logical_id)
new_resource = parse_resource(resource_logical_id, resource_json, self)
new_resource = parse_resource(resource_logical_id, resource_json, self, self._region_name)
self._parsed_resources[resource_logical_id] = new_resource
return new_resource
@ -214,7 +215,7 @@ class ResourceMap(collections.Mapping):
self[resource]
if isinstance(self[resource], ec2_models.TaggedEC2Resource):
tags['aws:cloudformation:logical-id'] = resource
ec2_models.ec2_backend.create_tags([self[resource].physical_resource_id],tags)
ec2_models.ec2_backends[self._region_name].create_tags([self[resource].physical_resource_id], tags)
class OutputMap(collections.Mapping):

View File

@ -4,19 +4,24 @@ import json
from jinja2 import Template
from moto.core.responses import BaseResponse
from .models import cloudformation_backend
from .models import cloudformation_backends
class CloudFormationResponse(BaseResponse):
@property
def cloudformation_backend(self):
return cloudformation_backends[self.region]
def create_stack(self):
stack_name = self._get_param('StackName')
stack_body = self._get_param('TemplateBody')
stack_notification_arns = self._get_multi_param('NotificationARNs.member')
stack = cloudformation_backend.create_stack(
stack = self.cloudformation_backend.create_stack(
name=stack_name,
template=stack_body,
region_name=self.region,
notification_arns=stack_notification_arns
)
stack_body = {
@ -32,34 +37,34 @@ class CloudFormationResponse(BaseResponse):
stack_name_or_id = None
if self._get_param('StackName'):
stack_name_or_id = self.querystring.get('StackName')[0]
stacks = cloudformation_backend.describe_stacks(stack_name_or_id)
stacks = self.cloudformation_backend.describe_stacks(stack_name_or_id)
template = Template(DESCRIBE_STACKS_TEMPLATE)
return template.render(stacks=stacks)
def describe_stack_resources(self):
stack_name = self._get_param('StackName')
stack = cloudformation_backend.get_stack(stack_name)
stack = self.cloudformation_backend.get_stack(stack_name)
template = Template(LIST_STACKS_RESOURCES_RESPONSE)
return template.render(stack=stack)
def list_stacks(self):
stacks = cloudformation_backend.list_stacks()
stacks = self.cloudformation_backend.list_stacks()
template = Template(LIST_STACKS_RESPONSE)
return template.render(stacks=stacks)
def get_template(self):
name_or_stack_id = self.querystring.get('StackName')[0]
stack = cloudformation_backend.get_stack(name_or_stack_id)
stack = self.cloudformation_backend.get_stack(name_or_stack_id)
return stack.template
# def update_stack(self):
# stack_name = self._get_param('StackName')
# stack_body = self._get_param('TemplateBody')
# stack = cloudformation_backend.update_stack(
# stack = self.cloudformation_backend.update_stack(
# name=stack_name,
# template=stack_body,
# )
@ -75,7 +80,7 @@ class CloudFormationResponse(BaseResponse):
def delete_stack(self):
name_or_stack_id = self.querystring.get('StackName')[0]
cloudformation_backend.delete_stack(name_or_stack_id)
self.cloudformation_backend.delete_stack(name_or_stack_id)
return json.dumps({
'DeleteStackResponse': {
'DeleteStackResult': {},

View File

@ -1,2 +1,2 @@
from __future__ import unicode_literals
from .models import BaseBackend
from .models import BaseBackend # flake8: noqa

View File

@ -1,5 +1,7 @@
from __future__ import unicode_literals
import functools
import inspect
import re
from httpretty import HTTPretty
@ -17,6 +19,8 @@ class MockAWS(object):
HTTPretty.reset()
def __call__(self, func):
if inspect.isclass(func):
return self.decorate_class(func)
return self.decorate_callable(func)
def __enter__(self):
@ -67,6 +71,26 @@ class MockAWS(object):
wrapper.__wrapped__ = func
return wrapper
def decorate_class(self, klass):
for attr in dir(klass):
if attr.startswith("_"):
continue
attr_value = getattr(klass, attr)
if not hasattr(attr_value, "__call__"):
continue
# Check if this is a classmethod. If so, skip patching
if inspect.ismethod(attr_value) and attr_value.__self__ is klass:
continue
try:
setattr(klass, attr, self(attr_value))
except TypeError:
# Sometimes we can't set this for built-in types
continue
return klass
class Model(type):
def __new__(self, clsname, bases, namespace):

View File

@ -43,7 +43,8 @@ def _decode_dict(d):
class BaseResponse(object):
region = 'us-east-1'
default_region = 'us-east-1'
region_regex = r'\.(.+?)\.amazonaws\.com'
def dispatch(self, request, full_url, headers):
querystring = {}
@ -76,9 +77,11 @@ class BaseResponse(object):
self.path = urlparse(full_url).path
self.querystring = querystring
self.method = request.method
region = re.search(r'\.(.+?)\.amazonaws\.com', full_url)
region = re.search(self.region_regex, full_url)
if region:
self.region = region.group(1)
else:
self.region = self.default_region
self.headers = dict(request.headers)
self.response_headers = headers

View File

@ -69,6 +69,7 @@ class DynamoType(object):
comparison_func = get_comparison_func(range_comparison)
return comparison_func(self.value, *range_values)
class Item(object):
def __init__(self, hash_key, hash_key_type, range_key, range_key_type, attrs):
self.hash_key = hash_key
@ -104,6 +105,7 @@ class Item(object):
"Item": included
}
class Table(object):
def __init__(self, table_name, schema=None, attr=None, throughput=None, indexes=None):
@ -122,7 +124,7 @@ class Table(object):
self.range_key_attr = elem["AttributeName"]
self.range_key_type = elem["KeyType"]
if throughput is None:
self.throughput = {u'WriteCapacityUnits': 10, u'ReadCapacityUnits': 10}
self.throughput = {'WriteCapacityUnits': 10, 'ReadCapacityUnits': 10}
else:
self.throughput = throughput
self.throughput["NumberOfDecreasesToday"] = 0
@ -141,7 +143,7 @@ class Table(object):
'TableStatus': 'ACTIVE',
'KeySchema': self.schema,
'ItemCount': len(self),
'CreationDateTime': unix_time(self.created_at)
'CreationDateTime': unix_time(self.created_at),
}
}
return results
@ -285,7 +287,7 @@ class DynamoDBBackend(BaseBackend):
return table.hash_key_attr, table.range_key_attr
def get_keys_value(self, table, keys):
if not table.hash_key_attr in keys or (table.has_range_key and not table.range_key_attr in keys):
if table.hash_key_attr not in keys or (table.has_range_key and table.range_key_attr not in keys):
raise ValueError("Table has a range key, but no range key was passed into get_item")
hash_key = DynamoType(keys[table.hash_key_attr])
range_key = DynamoType(keys[table.range_key_attr]) if table.has_range_key else None

View File

@ -169,6 +169,7 @@ class DynamoHandler(BaseResponse):
}
return dynamo_json_dump(response)
def get_item(self):
name = self.body['TableName']
key = self.body['Key']
@ -198,7 +199,6 @@ class DynamoHandler(BaseResponse):
}
for table_name, table_request in table_batches.items():
items = []
keys = table_request['Keys']
attributes_to_get = table_request.get('AttributesToGet')
results["Responses"][table_name] = []
@ -226,7 +226,7 @@ class DynamoHandler(BaseResponse):
range_comparison = None
range_values = []
else:
if range_key_name == None:
if range_key_name is None:
er = "com.amazon.coral.validate#ValidationException"
return self.error(er)
else:
@ -247,7 +247,7 @@ class DynamoHandler(BaseResponse):
items = items[:limit]
reversed = self.body.get("ScanIndexForward")
if reversed != False:
if reversed is not False:
items.reverse()
result = {

View File

@ -1,7 +1,10 @@
from __future__ import unicode_literals
from .models import ec2_backends, ec2_backend
from .models import ec2_backends
from ..core.models import MockAWS
ec2_backend = ec2_backends['us-east-1']
def mock_ec2(func=None):
if func:
return MockAWS(ec2_backends)(func)

View File

@ -1,16 +1,17 @@
from __future__ import unicode_literals
import copy
import itertools
from collections import defaultdict
import copy
from datetime import datetime
import itertools
import re
import six
import boto
from boto.ec2.instance import Instance as BotoInstance, Reservation
from boto.ec2.blockdevicemapping import BlockDeviceMapping, BlockDeviceType
from boto.ec2.spotinstancerequest import SpotInstanceRequest as BotoSpotRequest
from boto.ec2.launchspecification import LaunchSpecification
import six
from moto.core import BaseBackend
from moto.core.models import Model
@ -103,6 +104,7 @@ class InstanceState(object):
self.name = name
self.code = code
class StateReason(object):
def __init__(self, message="", code=""):
self.message = message
@ -165,12 +167,13 @@ class NetworkInterface(object):
self._group_set.append(group)
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
security_group_ids = properties.get('SecurityGroups', [])
subnet_id = properties['SubnetId']
ec2_backend = ec2_backends[region_name]
subnet = ec2_backend.get_subnet(subnet_id)
private_ip_address = properties.get('PrivateIpAddress', None)
@ -247,7 +250,6 @@ class NetworkInterfaceBackend(object):
original_enis = enis
enis = []
for eni in original_enis:
group_ids = []
for group in eni.group_set:
if group.id in _filter_value:
enis.append(eni)
@ -320,14 +322,15 @@ class Instance(BotoInstance, TaggedEC2Resource):
self.vpc_id = subnet.vpc_id
self.prep_nics(kwargs.get("nics", {}),
subnet_id=kwargs.get("subnet_id",None),
private_ip=kwargs.get("private_ip",None),
associate_public_ip=kwargs.get("associate_public_ip",None))
subnet_id=kwargs.get("subnet_id"),
private_ip=kwargs.get("private_ip"),
associate_public_ip=kwargs.get("associate_public_ip"))
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
ec2_backend = ec2_backends[region_name]
security_group_ids = properties.get('SecurityGroups', [])
group_names = [ec2_backend.get_security_group_from_id(group_id).name for group_id in security_group_ids]
@ -385,10 +388,6 @@ class Instance(BotoInstance, TaggedEC2Resource):
self._reason = ""
self._state_reason = StateReason()
def get_tags(self):
tags = self.ec2_backend.describe_tags(filters={'resource-id': [self.id]})
return tags
@property
def dynamic_group_list(self):
if self.nics:
@ -416,12 +415,9 @@ class Instance(BotoInstance, TaggedEC2Resource):
# Flesh out data structures and associations
for nic in nic_spec.values():
use_eni = None
security_group_ids = []
device_index = int(nic.get('DeviceIndex'))
nic_id = nic.get('NetworkInterfaceId', None)
nic_id = nic.get('NetworkInterfaceId')
if nic_id:
# If existing NIC found, use it.
use_nic = self.ec2_backend.get_network_interface(nic_id)
@ -435,11 +431,11 @@ class Instance(BotoInstance, TaggedEC2Resource):
subnet = self.ec2_backend.get_subnet(nic['SubnetId'])
group_id = nic.get('SecurityGroupId',None)
group_id = nic.get('SecurityGroupId')
group_ids = [group_id] if group_id else []
use_nic = self.ec2_backend.create_network_interface(subnet,
nic.get('PrivateIpAddress',None),
nic.get('PrivateIpAddress'),
device_index=device_index,
public_ip_auto_assign=nic.get('AssociatePublicIpAddress', False),
group_ids=group_ids)
@ -717,7 +713,7 @@ class TagBackend(object):
resource_id_filters = []
resource_type_filters = []
value_filters = []
if not filters is None:
if filters is not None:
for tag_filter in filters:
if tag_filter in self.VALID_TAG_FILTERS:
if tag_filter == 'key':
@ -862,7 +858,6 @@ class AmiBackend(object):
def describe_images(self, ami_ids=(), filters=None):
if filters:
images = self.amis.values()
return generic_filter(filters, images)
else:
images = []
@ -984,9 +979,10 @@ class SecurityGroup(object):
self.vpc_id = vpc_id
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
ec2_backend = ec2_backends[region_name]
vpc_id = properties.get('VpcId')
security_group = ec2_backend.create_security_group(
name=resource_name,
@ -1014,10 +1010,7 @@ class SecurityGroup(object):
def physical_resource_id(self):
return self.id
def matches_filter(self, key, filter_value):
result = True
def to_attr(filter_name):
attr = None
@ -1037,7 +1030,7 @@ class SecurityGroup(object):
ingress_attr = to_attr(match.groups()[0])
for ingress in self.ingress_rules:
if getattr(ingress, ingress_attr) in filters[key]:
if getattr(ingress, ingress_attr) in filter_value:
return True
else:
attr_name = to_attr(key)
@ -1215,12 +1208,13 @@ class VolumeAttachment(object):
self.device = device
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
instance_id = properties['InstanceId']
volume_id = properties['VolumeId']
ec2_backend = ec2_backends[region_name]
attachment = ec2_backend.attach_volume(
volume_id=volume_id,
instance_id=instance_id,
@ -1229,17 +1223,19 @@ class VolumeAttachment(object):
return attachment
class Volume(object):
def __init__(self, volume_id, size, zone):
class Volume(TaggedEC2Resource):
def __init__(self, ec2_backend, volume_id, size, zone):
self.id = volume_id
self.size = size
self.zone = zone
self.attachment = None
self.ec2_backend = ec2_backend
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
ec2_backend = ec2_backends[region_name]
volume = ec2_backend.create_volume(
size=properties.get('Size'),
zone_name=properties.get('AvailabilityZone'),
@ -1258,12 +1254,13 @@ class Volume(object):
return 'available'
class Snapshot(object):
def __init__(self, snapshot_id, volume, description):
class Snapshot(TaggedEC2Resource):
def __init__(self, ec2_backend, snapshot_id, volume, description):
self.id = snapshot_id
self.volume = volume
self.description = description
self.create_volume_permission_groups = set()
self.ec2_backend = ec2_backend
class EBSBackend(object):
@ -1276,7 +1273,7 @@ class EBSBackend(object):
def create_volume(self, size, zone_name):
volume_id = random_volume_id()
zone = self.get_zone_by_name(zone_name)
volume = Volume(volume_id, size, zone)
volume = Volume(self, volume_id, size, zone)
self.volumes[volume_id] = volume
return volume
@ -1306,7 +1303,7 @@ class EBSBackend(object):
def detach_volume(self, volume_id, instance_id, device_path):
volume = self.get_volume(volume_id)
instance = self.get_instance(instance_id)
self.get_instance(instance_id)
old_attachment = volume.attachment
if not old_attachment:
@ -1318,7 +1315,7 @@ class EBSBackend(object):
def create_snapshot(self, volume_id, description):
snapshot_id = random_snapshot_id()
volume = self.get_volume(volume_id)
snapshot = Snapshot(snapshot_id, volume, description)
snapshot = Snapshot(self, snapshot_id, volume, description)
self.snapshots[snapshot_id] = snapshot
return snapshot
@ -1370,9 +1367,10 @@ class VPC(TaggedEC2Resource):
self.state = 'available'
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
ec2_backend = ec2_backends[region_name]
vpc = ec2_backend.create_vpc(
cidr_block=properties['CidrBlock'],
)
@ -1410,7 +1408,7 @@ class VPCBackend(object):
self.vpcs[vpc_id] = vpc
# AWS creates a default main route table and security group.
main_route_table = self.create_route_table(vpc_id, main=True)
self.create_route_table(vpc_id, main=True)
# AWS creates a default Network ACL
default_network_acl = self.create_network_acl(vpc_id, default=True)
@ -1492,11 +1490,12 @@ class VPCPeeringConnection(TaggedEC2Resource):
self._status = VPCPeeringConnectionStatus()
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
vpc = get_vpc(properties['VpcId'])
peer_vpc = get_vpc(properties['PeerVpcId'])
ec2_backend = ec2_backends[region_name]
vpc = ec2_backend.get_vpc(properties['VpcId'])
peer_vpc = ec2_backend.get_vpc(properties['PeerVpcId'])
vpc_pcx = ec2_backend.create_vpc_peering_connection(vpc, peer_vpc)
@ -1556,10 +1555,11 @@ class Subnet(TaggedEC2Resource):
self.cidr_block = cidr_block
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
vpc_id = properties['VpcId']
ec2_backend = ec2_backends[region_name]
subnet = ec2_backend.create_subnet(
vpc_id=vpc_id,
cidr_block=properties['CidrBlock']
@ -1606,11 +1606,10 @@ class SubnetBackend(object):
def create_subnet(self, vpc_id, cidr_block):
subnet_id = random_subnet_id()
subnet = Subnet(self, subnet_id, vpc_id, cidr_block)
vpc = self.get_vpc(vpc_id) # Validate VPC exists
self.get_vpc(vpc_id) # Validate VPC exists
# AWS associates a new subnet with the default Network ACL
self.associate_default_network_acl_with_subnet(subnet_id)
self.subnets[subnet_id] = subnet
return subnet
@ -1632,12 +1631,13 @@ class SubnetRouteTableAssociation(object):
self.subnet_id = subnet_id
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
route_table_id = properties['RouteTableId']
subnet_id = properties['SubnetId']
ec2_backend = ec2_backends[region_name]
subnet_association = ec2_backend.create_subnet_association(
route_table_id=route_table_id,
subnet_id=subnet_id,
@ -1666,10 +1666,11 @@ class RouteTable(TaggedEC2Resource):
self.routes = {}
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
vpc_id = properties['VpcId']
ec2_backend = ec2_backends[region_name]
route_table = ec2_backend.create_route_table(
vpc_id=vpc_id,
)
@ -1757,7 +1758,7 @@ class RouteTableBackend(object):
# Association does not yet exist, so create it.
route_table = self.get_route_table(route_table_id)
subnet = self.get_subnet(subnet_id) # Validate subnet exists
self.get_subnet(subnet_id) # Validate subnet exists
association_id = random_subnet_association_id()
route_table.associations[association_id] = subnet_id
return association_id
@ -1798,7 +1799,7 @@ class Route(object):
self.vpc_pcx = vpc_pcx
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
gateway_id = properties.get('GatewayId')
@ -1807,6 +1808,7 @@ class Route(object):
pcx_id = properties.get('VpcPeeringConnectionId')
route_table_id = properties['RouteTableId']
ec2_backend = ec2_backends[region_name]
route_table = ec2_backend.create_route(
route_table_id=route_table_id,
destination_cidr_block=properties['DestinationCidrBlock'],
@ -1877,7 +1879,8 @@ class InternetGateway(TaggedEC2Resource):
self.vpc = None
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
ec2_backend = ec2_backends[region_name]
return ec2_backend.create_internet_gateway()
@property
@ -1952,9 +1955,10 @@ class VPCGatewayAttachment(object):
self.vpc_id = vpc_id
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
ec2_backend = ec2_backends[region_name]
return ec2_backend.create_vpc_gateway_attachment(
gateway_id=properties['InternetGatewayId'],
vpc_id=properties['VpcId'],
@ -2045,8 +2049,7 @@ class SpotRequestBackend(object):
spot_request_id, price, image_id, type, valid_from, valid_until,
launch_group, availability_zone_group, key_name, security_groups,
user_data, instance_type, placement, kernel_id, ramdisk_id,
monitoring_enabled, subnet_id
)
monitoring_enabled, subnet_id)
self.spot_instance_requests[spot_request_id] = request
requests.append(request)
return requests
@ -2074,7 +2077,9 @@ class ElasticAddress(object):
self.association_id = None
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
ec2_backend = ec2_backends[region_name]
properties = cloudformation_json.get('Properties')
instance_id = None
if properties:
@ -2474,5 +2479,3 @@ class EC2Backend(BaseBackend, InstanceBackend, TagBackend, AmiBackend,
ec2_backends = {}
for region in boto.ec2.regions():
ec2_backends[region.name] = EC2Backend()
ec2_backend = ec2_backends['us-east-1']

View File

@ -35,12 +35,12 @@ class ElasticBlockStore(BaseResponse):
def delete_snapshot(self):
snapshot_id = self.querystring.get('SnapshotId')[0]
success = self.ec2_backend.delete_snapshot(snapshot_id)
self.ec2_backend.delete_snapshot(snapshot_id)
return DELETE_SNAPSHOT_RESPONSE
def delete_volume(self):
volume_id = self.querystring.get('VolumeId')[0]
success = self.ec2_backend.delete_volume(volume_id)
self.ec2_backend.delete_volume(volume_id)
return DELETE_VOLUME_RESPONSE
def describe_snapshots(self):
@ -132,6 +132,16 @@ DESCRIBE_VOLUMES_RESPONSE = """<DescribeVolumesResponse xmlns="http://ec2.amazon
</item>
{% endif %}
</attachmentSet>
<tagSet>
{% for tag in volume.get_tags() %}
<item>
<resourceId>{{ tag.resource_id }}</resourceId>
<resourceType>{{ tag.resource_type }}</resourceType>
<key>{{ tag.key }}</key>
<value>{{ tag.value }}</value>
</item>
{% endfor %}
</tagSet>
<volumeType>standard</volumeType>
</item>
{% endfor %}
@ -187,6 +197,14 @@ DESCRIBE_SNAPSHOTS_RESPONSE = """<DescribeSnapshotsResponse xmlns="http://ec2.am
<volumeSize>{{ snapshot.volume.size }}</volumeSize>
<description>{{ snapshot.description }}</description>
<tagSet>
{% for tag in snapshot.get_tags() %}
<item>
<resourceId>{{ tag.resource_id }}</resourceId>
<resourceType>{{ tag.resource_type }}</resourceType>
<key>{{ tag.key }}</key>
<value>{{ tag.value }}</value>
</item>
{% endfor %}
</tagSet>
</item>
{% endfor %}

View File

@ -59,9 +59,9 @@ class ElasticIPAddresses(BaseResponse):
def disassociate_address(self):
if "PublicIp" in self.querystring:
disassociated = self.ec2_backend.disassociate_address(address=self.querystring['PublicIp'][0])
self.ec2_backend.disassociate_address(address=self.querystring['PublicIp'][0])
elif "AssociationId" in self.querystring:
disassociated = self.ec2_backend.disassociate_address(association_id=self.querystring['AssociationId'][0])
self.ec2_backend.disassociate_address(association_id=self.querystring['AssociationId'][0])
else:
self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AssociationId parameter.")
@ -69,9 +69,9 @@ class ElasticIPAddresses(BaseResponse):
def release_address(self):
if "PublicIp" in self.querystring:
released = self.ec2_backend.release_address(address=self.querystring['PublicIp'][0])
self.ec2_backend.release_address(address=self.querystring['PublicIp'][0])
elif "AllocationId" in self.querystring:
released = self.ec2_backend.release_address(allocation_id=self.querystring['AllocationId'][0])
self.ec2_backend.release_address(allocation_id=self.querystring['AllocationId'][0])
else:
self.ec2_backend.raise_error("MissingParameter", "Invalid request, expect PublicIp/AllocationId parameter.")

View File

@ -112,4 +112,3 @@ DETACH_INTERNET_GATEWAY_RESPONSE = u"""<DetachInternetGatewayResponse xmlns="htt
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<return>true</return>
</DetachInternetGatewayResponse>"""

View File

@ -22,7 +22,7 @@ class RouteTables(BaseResponse):
interface_id = optional_from_querystring('NetworkInterfaceId', self.querystring)
pcx_id = optional_from_querystring('VpcPeeringConnectionId', self.querystring)
route = self.ec2_backend.create_route(route_table_id, destination_cidr_block,
self.ec2_backend.create_route(route_table_id, destination_cidr_block,
gateway_id=internet_gateway_id,
instance_id=instance_id,
interface_id=interface_id,
@ -72,7 +72,7 @@ class RouteTables(BaseResponse):
interface_id = optional_from_querystring('NetworkInterfaceId', self.querystring)
pcx_id = optional_from_querystring('VpcPeeringConnectionId', self.querystring)
route = self.ec2_backend.replace_route(route_table_id, destination_cidr_block,
self.ec2_backend.replace_route(route_table_id, destination_cidr_block,
gateway_id=internet_gateway_id,
instance_id=instance_id,
interface_id=interface_id,

View File

@ -31,7 +31,7 @@ class VPCPeeringConnections(BaseResponse):
def reject_vpc_peering_connection(self):
vpc_pcx_id = self.querystring.get('VpcPeeringConnectionId')[0]
vpc_pcx = self.ec2_backend.reject_vpc_peering_connection(vpc_pcx_id)
self.ec2_backend.reject_vpc_peering_connection(vpc_pcx_id)
template = Template(REJECT_VPC_PEERING_CONNECTION_RESPONSE)
return template.render()
@ -125,4 +125,3 @@ REJECT_VPC_PEERING_CONNECTION_RESPONSE = """
<return>true</return>
</RejectVpcPeeringConnectionResponse>
"""

View File

@ -139,7 +139,7 @@ def generate_route_id(route_table_id, cidr_block):
def split_route_id(route_id):
values = string.split(route_id, '~')
values = route_id.split('~')
return values[0], values[1]
@ -442,7 +442,7 @@ def get_prefix(resource_id):
if resource_id_prefix == EC2_RESOURCE_TO_PREFIX['network-interface']:
if after.startswith('attach'):
resource_id_prefix = EC2_RESOURCE_TO_PREFIX['network-interface-attachment']
if not resource_id_prefix in EC2_RESOURCE_TO_PREFIX.values():
if resource_id_prefix not in EC2_RESOURCE_TO_PREFIX.values():
uuid4hex = re.compile('[0-9a-f]{12}4[0-9a-f]{3}[89ab][0-9a-f]{15}\Z', re.I)
if uuid4hex.match(resource_id) is not None:
resource_id_prefix = EC2_RESOURCE_TO_PREFIX['reserved-instance']
@ -454,7 +454,7 @@ def get_prefix(resource_id):
def is_valid_resource_id(resource_id):
valid_prefixes = EC2_RESOURCE_TO_PREFIX.values()
resource_id_prefix = get_prefix(resource_id)
if not resource_id_prefix in valid_prefixes:
if resource_id_prefix not in valid_prefixes:
return False
resource_id_pattern = resource_id_prefix + '-[0-9a-f]{8}'
resource_pattern_re = re.compile(resource_id_pattern)

View File

@ -1,3 +1,12 @@
from __future__ import unicode_literals
from .models import elb_backend
mock_elb = elb_backend.decorator
from .models import elb_backends
from ..core.models import MockAWS
elb_backend = elb_backends['us-east-1']
def mock_elb(func=None):
if func:
return MockAWS(elb_backends)(func)
else:
return MockAWS(elb_backends)

View File

@ -1,4 +1,6 @@
from __future__ import unicode_literals
import boto.ec2.elb
from moto.core import BaseBackend
@ -38,9 +40,10 @@ class FakeLoadBalancer(object):
self.listeners.append(listener)
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
elb_backend = elb_backends[region_name]
new_elb = elb_backend.create_load_balancer(
name=properties.get('LoadBalancerName', resource_name),
zones=properties.get('AvailabilityZones'),
@ -148,4 +151,7 @@ class ELBBackend(BaseBackend):
load_balancer.instance_ids = new_instance_ids
return load_balancer
elb_backend = ELBBackend()
elb_backends = {}
for region in boto.ec2.elb.regions():
elb_backends[region.name] = ELBBackend()

View File

@ -2,11 +2,15 @@ from __future__ import unicode_literals
from jinja2 import Template
from moto.core.responses import BaseResponse
from .models import elb_backend
from .models import elb_backends
class ELBResponse(BaseResponse):
@property
def elb_backend(self):
return elb_backends[self.region]
def create_load_balancer(self):
"""
u'Scheme': [u'internet-facing'],
@ -26,7 +30,7 @@ class ELBResponse(BaseResponse):
ports.append([protocol, lb_port, instance_port, ssl_certificate_id])
port_index += 1
elb_backend.create_load_balancer(
self.elb_backend.create_load_balancer(
name=load_balancer_name,
zones=availability_zones,
ports=ports,
@ -49,14 +53,14 @@ class ELBResponse(BaseResponse):
ports.append([protocol, lb_port, instance_port, ssl_certificate_id])
port_index += 1
elb_backend.create_load_balancer_listeners(name=load_balancer_name, ports=ports)
self.elb_backend.create_load_balancer_listeners(name=load_balancer_name, ports=ports)
template = Template(CREATE_LOAD_BALANCER_LISTENERS_TEMPLATE)
return template.render()
def describe_load_balancers(self):
names = [value[0] for key, value in self.querystring.items() if "LoadBalancerNames.member" in key]
load_balancers = elb_backend.describe_load_balancers(names)
load_balancers = self.elb_backend.describe_load_balancers(names)
template = Template(DESCRIBE_LOAD_BALANCERS_TEMPLATE)
return template.render(load_balancers=load_balancers)
@ -73,18 +77,18 @@ class ELBResponse(BaseResponse):
port_index += 1
ports.append(int(port))
elb_backend.delete_load_balancer_listeners(load_balancer_name, ports)
self.elb_backend.delete_load_balancer_listeners(load_balancer_name, ports)
template = Template(DELETE_LOAD_BALANCER_LISTENERS)
return template.render()
def delete_load_balancer(self):
load_balancer_name = self.querystring.get('LoadBalancerName')[0]
elb_backend.delete_load_balancer(load_balancer_name)
self.elb_backend.delete_load_balancer(load_balancer_name)
template = Template(DELETE_LOAD_BALANCER_TEMPLATE)
return template.render()
def configure_health_check(self):
check = elb_backend.configure_health_check(
check = self.elb_backend.configure_health_check(
load_balancer_name=self.querystring.get('LoadBalancerName')[0],
timeout=self.querystring.get('HealthCheck.Timeout')[0],
healthy_threshold=self.querystring.get('HealthCheck.HealthyThreshold')[0],
@ -99,7 +103,7 @@ class ELBResponse(BaseResponse):
load_balancer_name = self.querystring.get('LoadBalancerName')[0]
instance_ids = [value[0] for key, value in self.querystring.items() if "Instances.member" in key]
template = Template(REGISTER_INSTANCES_TEMPLATE)
load_balancer = elb_backend.register_instances(load_balancer_name, instance_ids)
load_balancer = self.elb_backend.register_instances(load_balancer_name, instance_ids)
return template.render(load_balancer=load_balancer)
def set_load_balancer_listener_sslcertificate(self):
@ -107,7 +111,7 @@ class ELBResponse(BaseResponse):
ssl_certificate_id = self.querystring['SSLCertificateId'][0]
lb_port = self.querystring['LoadBalancerPort'][0]
elb_backend.set_load_balancer_listener_sslcertificate(load_balancer_name, lb_port, ssl_certificate_id)
self.elb_backend.set_load_balancer_listener_sslcertificate(load_balancer_name, lb_port, ssl_certificate_id)
template = Template(SET_LOAD_BALANCER_SSL_CERTIFICATE)
return template.render()
@ -116,7 +120,7 @@ class ELBResponse(BaseResponse):
load_balancer_name = self.querystring.get('LoadBalancerName')[0]
instance_ids = [value[0] for key, value in self.querystring.items() if "Instances.member" in key]
template = Template(DEREGISTER_INSTANCES_TEMPLATE)
load_balancer = elb_backend.deregister_instances(load_balancer_name, instance_ids)
load_balancer = self.elb_backend.deregister_instances(load_balancer_name, instance_ids)
return template.render(load_balancer=load_balancer)
CREATE_LOAD_BALANCER_TEMPLATE = """<CreateLoadBalancerResult xmlns="http://elasticloadbalancing.amazonaws.com/doc/2012-06-01/">

View File

@ -16,7 +16,7 @@ class Role(object):
self.policies = policies
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
return iam_backend.create_role(
@ -45,7 +45,7 @@ class InstanceProfile(object):
self.roles = roles if roles else []
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
role_ids = properties['Roles']
@ -272,7 +272,7 @@ class IAMBackend(BaseBackend):
return user
def create_login_profile(self, user_name, password):
if not user_name in self.users:
if user_name not in self.users:
raise BotoServerError(404, 'Not Found')
# This does not currently deal with PasswordPolicyViolation.

View File

@ -1,3 +1,12 @@
from __future__ import unicode_literals
from .models import sns_backend
mock_sns = sns_backend.decorator
from .models import sns_backends
from ..core.models import MockAWS
sns_backend = sns_backends['us-east-1']
def mock_sns(func=None):
if func:
return MockAWS(sns_backends)(func)
else:
return MockAWS(sns_backends)

View File

@ -1,20 +1,24 @@
from __future__ import unicode_literals
import datetime
import requests
import uuid
import boto.sns
import requests
import six
from moto.core import BaseBackend
from moto.core.utils import iso_8601_datetime
from moto.sqs.models import sqs_backend
from moto.sqs import sqs_backends
from .utils import make_arn_for_topic, make_arn_for_subscription
DEFAULT_ACCOUNT_ID = 123456789012
class Topic(object):
def __init__(self, name):
def __init__(self, name, sns_backend):
self.name = name
self.sns_backend = sns_backend
self.account_id = DEFAULT_ACCOUNT_ID
self.display_name = ""
self.policy = DEFAULT_TOPIC_POLICY
@ -28,7 +32,7 @@ class Topic(object):
def publish(self, message):
message_id = six.text_type(uuid.uuid4())
subscriptions = sns_backend.list_subscriptions(self.arn)
subscriptions = self.sns_backend.list_subscriptions(self.arn)
for subscription in subscriptions:
subscription.publish(message, message_id)
return message_id
@ -50,7 +54,8 @@ class Subscription(object):
def publish(self, message, message_id):
if self.protocol == 'sqs':
queue_name = self.endpoint.split(":")[-1]
sqs_backend.send_message(queue_name, message)
region = self.endpoint.split(":")[3]
sqs_backends[region].send_message(queue_name, message)
elif self.protocol in ['http', 'https']:
post_data = self.get_post_data(message, message_id)
requests.post(self.endpoint, data=post_data)
@ -76,7 +81,7 @@ class SNSBackend(BaseBackend):
self.subscriptions = {}
def create_topic(self, name):
topic = Topic(name)
topic = Topic(name, self)
self.topics[topic.arn] = topic
return topic
@ -114,8 +119,9 @@ class SNSBackend(BaseBackend):
message_id = topic.publish(message)
return message_id
sns_backend = SNSBackend()
sns_backends = {}
for region in boto.sns.regions():
sns_backends[region.name] = SNSBackend()
DEFAULT_TOPIC_POLICY = {

View File

@ -3,14 +3,18 @@ import json
from moto.core.responses import BaseResponse
from moto.core.utils import camelcase_to_underscores
from .models import sns_backend
from .models import sns_backends
class SNSResponse(BaseResponse):
@property
def backend(self):
return sns_backends[self.region]
def create_topic(self):
name = self._get_param('Name')
topic = sns_backend.create_topic(name)
topic = self.backend.create_topic(name)
return json.dumps({
'CreateTopicResponse': {
@ -24,7 +28,7 @@ class SNSResponse(BaseResponse):
})
def list_topics(self):
topics = sns_backend.list_topics()
topics = self.backend.list_topics()
return json.dumps({
'ListTopicsResponse': {
@ -40,7 +44,7 @@ class SNSResponse(BaseResponse):
def delete_topic(self):
topic_arn = self._get_param('TopicArn')
sns_backend.delete_topic(topic_arn)
self.backend.delete_topic(topic_arn)
return json.dumps({
'DeleteTopicResponse': {
@ -52,7 +56,7 @@ class SNSResponse(BaseResponse):
def get_topic_attributes(self):
topic_arn = self._get_param('TopicArn')
topic = sns_backend.get_topic(topic_arn)
topic = self.backend.get_topic(topic_arn)
return json.dumps({
"GetTopicAttributesResponse": {
@ -80,7 +84,7 @@ class SNSResponse(BaseResponse):
attribute_name = self._get_param('AttributeName')
attribute_name = camelcase_to_underscores(attribute_name)
attribute_value = self._get_param('AttributeValue')
sns_backend.set_topic_attribute(topic_arn, attribute_name, attribute_value)
self.backend.set_topic_attribute(topic_arn, attribute_name, attribute_value)
return json.dumps({
"SetTopicAttributesResponse": {
@ -94,7 +98,7 @@ class SNSResponse(BaseResponse):
topic_arn = self._get_param('TopicArn')
endpoint = self._get_param('Endpoint')
protocol = self._get_param('Protocol')
subscription = sns_backend.subscribe(topic_arn, endpoint, protocol)
subscription = self.backend.subscribe(topic_arn, endpoint, protocol)
return json.dumps({
"SubscribeResponse": {
@ -109,7 +113,7 @@ class SNSResponse(BaseResponse):
def unsubscribe(self):
subscription_arn = self._get_param('SubscriptionArn')
sns_backend.unsubscribe(subscription_arn)
self.backend.unsubscribe(subscription_arn)
return json.dumps({
"UnsubscribeResponse": {
@ -120,7 +124,7 @@ class SNSResponse(BaseResponse):
})
def list_subscriptions(self):
subscriptions = sns_backend.list_subscriptions()
subscriptions = self.backend.list_subscriptions()
return json.dumps({
"ListSubscriptionsResponse": {
@ -142,7 +146,7 @@ class SNSResponse(BaseResponse):
def list_subscriptions_by_topic(self):
topic_arn = self._get_param('TopicArn')
subscriptions = sns_backend.list_subscriptions(topic_arn)
subscriptions = self.backend.list_subscriptions(topic_arn)
return json.dumps({
"ListSubscriptionsByTopicResponse": {
@ -165,7 +169,7 @@ class SNSResponse(BaseResponse):
def publish(self):
topic_arn = self._get_param('TopicArn')
message = self._get_param('Message')
message_id = sns_backend.publish(topic_arn, message)
message_id = self.backend.publish(topic_arn, message)
return json.dumps({
"PublishResponse": {

View File

@ -1,3 +1,12 @@
from __future__ import unicode_literals
from .models import sqs_backend
mock_sqs = sqs_backend.decorator
from .models import sqs_backends
from ..core.models import MockAWS
sqs_backend = sqs_backends['us-east-1']
def mock_sqs(func=None):
if func:
return MockAWS(sqs_backends)(func)
else:
return MockAWS(sqs_backends)

View File

@ -1,4 +1,6 @@
from __future__ import unicode_literals
class MessageNotInflight(Exception):
description = "The message referred to is not in flight."
status_code = 400

View File

@ -1,10 +1,11 @@
from __future__ import unicode_literals
import base64
import hashlib
import time
import re
from xml.sax.saxutils import escape
import boto.sqs
from moto.core import BaseBackend
from moto.core.utils import camelcase_to_underscores, get_random_message_id
@ -120,9 +121,10 @@ class Queue(object):
self.receive_message_wait_time_seconds = 0
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
sqs_backend = sqs_backends[region_name]
return sqs_backend.create_queue(
name=properties['QueueName'],
visibility_timeout=properties.get('VisibilityTimeout'),
@ -272,4 +274,6 @@ class SQSBackend(BaseBackend):
return
raise ReceiptHandleIsInvalid
sqs_backend = SQSBackend()
sqs_backends = {}
for region in boto.sqs.regions():
sqs_backends[region.name] = SQSBackend()

View File

@ -4,7 +4,7 @@ from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.core.utils import camelcase_to_underscores
from .utils import parse_message_attributes
from .models import sqs_backend
from .models import sqs_backends
from .exceptions import (
MessageAttributesInvalid,
MessageNotInflight,
@ -12,23 +12,30 @@ from .exceptions import (
)
MAXIMUM_VISIBILTY_TIMEOUT = 43200
SQS_REGION_REGEX = r'://(.+?)\.queue\.amazonaws\.com'
class QueuesResponse(BaseResponse):
region_regex = SQS_REGION_REGEX
@property
def sqs_backend(self):
return sqs_backends[self.region]
def create_queue(self):
visibility_timeout = None
if 'Attribute.1.Name' in self.querystring and self.querystring.get('Attribute.1.Name')[0] == 'VisibilityTimeout':
visibility_timeout = self.querystring.get("Attribute.1.Value")[0]
queue_name = self.querystring.get("QueueName")[0]
queue = sqs_backend.create_queue(queue_name, visibility_timeout=visibility_timeout)
queue = self.sqs_backend.create_queue(queue_name, visibility_timeout=visibility_timeout)
template = Template(CREATE_QUEUE_RESPONSE)
return template.render(queue=queue)
def get_queue_url(self):
queue_name = self.querystring.get("QueueName")[0]
queue = sqs_backend.get_queue(queue_name)
queue = self.sqs_backend.get_queue(queue_name)
if queue:
template = Template(GET_QUEUE_URL_RESPONSE)
return template.render(queue=queue)
@ -37,12 +44,19 @@ class QueuesResponse(BaseResponse):
def list_queues(self):
queue_name_prefix = self.querystring.get("QueueNamePrefix", [None])[0]
queues = sqs_backend.list_queues(queue_name_prefix)
queues = self.sqs_backend.list_queues(queue_name_prefix)
template = Template(LIST_QUEUES_RESPONSE)
return template.render(queues=queues)
class QueueResponse(BaseResponse):
region_regex = SQS_REGION_REGEX
@property
def sqs_backend(self):
return sqs_backends[self.region]
def change_message_visibility(self):
queue_name = self.path.split("/")[-1]
receipt_handle = self.querystring.get("ReceiptHandle")[0]
@ -54,7 +68,7 @@ class QueueResponse(BaseResponse):
), dict(status=400)
try:
sqs_backend.change_message_visibility(
self.sqs_backend.change_message_visibility(
queue_name=queue_name,
receipt_handle=receipt_handle,
visibility_timeout=visibility_timeout
@ -67,7 +81,7 @@ class QueueResponse(BaseResponse):
def get_queue_attributes(self):
queue_name = self.path.split("/")[-1]
queue = sqs_backend.get_queue(queue_name)
queue = self.sqs_backend.get_queue(queue_name)
template = Template(GET_QUEUE_ATTRIBUTES_RESPONSE)
return template.render(queue=queue)
@ -75,12 +89,12 @@ class QueueResponse(BaseResponse):
queue_name = self.path.split("/")[-1]
key = camelcase_to_underscores(self.querystring.get('Attribute.Name')[0])
value = self.querystring.get('Attribute.Value')[0]
sqs_backend.set_queue_attribute(queue_name, key, value)
self.sqs_backend.set_queue_attribute(queue_name, key, value)
return SET_QUEUE_ATTRIBUTE_RESPONSE
def delete_queue(self):
queue_name = self.path.split("/")[-1]
queue = sqs_backend.delete_queue(queue_name)
queue = self.sqs_backend.delete_queue(queue_name)
if not queue:
return "A queue with name {0} does not exist".format(queue_name), dict(status=404)
template = Template(DELETE_QUEUE_RESPONSE)
@ -101,7 +115,7 @@ class QueueResponse(BaseResponse):
return e.description, dict(status=e.status_code)
queue_name = self.path.split("/")[-1]
message = sqs_backend.send_message(
message = self.sqs_backend.send_message(
queue_name,
message,
message_attributes=message_attributes,
@ -137,7 +151,7 @@ class QueueResponse(BaseResponse):
message_user_id = self.querystring.get(message_user_id_key)[0]
delay_key = 'SendMessageBatchRequestEntry.{0}.DelaySeconds'.format(index)
delay_seconds = self.querystring.get(delay_key, [None])[0]
message = sqs_backend.send_message(queue_name, message_body[0], delay_seconds=delay_seconds)
message = self.sqs_backend.send_message(queue_name, message_body[0], delay_seconds=delay_seconds)
message.user_id = message_user_id
message_attributes = parse_message_attributes(self.querystring, base='SendMessageBatchRequestEntry.{0}.'.format(index), value_namespace='')
@ -153,7 +167,7 @@ class QueueResponse(BaseResponse):
def delete_message(self):
queue_name = self.path.split("/")[-1]
receipt_handle = self.querystring.get("ReceiptHandle")[0]
sqs_backend.delete_message(queue_name, receipt_handle)
self.sqs_backend.delete_message(queue_name, receipt_handle)
template = Template(DELETE_MESSAGE_RESPONSE)
return template.render()
@ -178,7 +192,7 @@ class QueueResponse(BaseResponse):
# Found all messages
break
sqs_backend.delete_message(queue_name, receipt_handle[0])
self.sqs_backend.delete_message(queue_name, receipt_handle[0])
message_user_id_key = 'DeleteMessageBatchRequestEntry.{0}.Id'.format(index)
message_user_id = self.querystring.get(message_user_id_key)[0]
@ -190,7 +204,7 @@ class QueueResponse(BaseResponse):
def receive_message(self):
queue_name = self.path.split("/")[-1]
message_count = int(self.querystring.get("MaxNumberOfMessages")[0])
messages = sqs_backend.receive_messages(queue_name, message_count)
messages = self.sqs_backend.receive_messages(queue_name, message_count)
template = Template(RECEIVE_MESSAGE_RESPONSE)
output = template.render(messages=messages)
return output

View File

@ -4,3 +4,4 @@ nose
sure<1.2.4
coverage
freezegun
flask

View File

@ -22,7 +22,7 @@ if sys.version_info < (2, 7):
setup(
name='moto',
version='0.3.8',
version='0.3.9',
description='A library that allows your python tests to easily'
' mock out the boto library',
author='Steve Pulec',

View File

@ -1,9 +1,12 @@
from __future__ import unicode_literals
import json
import boto
import boto.cloudformation
import sure # noqa
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
import tests.backport_assert_raises # noqa
from nose.tools import assert_raises
from moto import mock_cloudformation
@ -38,6 +41,18 @@ def test_create_stack():
stack.get_template().should.equal(dummy_template)
@mock_cloudformation
def test_creating_stacks_across_regions():
west1_conn = boto.cloudformation.connect_to_region("us-west-1")
west1_conn.create_stack("test_stack", template_body=dummy_template_json)
west2_conn = boto.cloudformation.connect_to_region("us-west-2")
west2_conn.create_stack("test_stack", template_body=dummy_template_json)
list(west1_conn.describe_stacks()).should.have.length_of(1)
list(west2_conn.describe_stacks()).should.have.length_of(1)
@mock_cloudformation
def test_create_stack_with_notification_arn():
conn = boto.connect_cloudformation()

View File

@ -2,6 +2,12 @@ from __future__ import unicode_literals
import json
import boto
import boto.cloudformation
import boto.ec2
import boto.ec2.autoscale
import boto.ec2.elb
import boto.iam
import boto.vpc
import sure # noqa
from moto import (
@ -38,7 +44,7 @@ def test_stack_sqs_integration():
}
sqs_template_json = json.dumps(sqs_template)
conn = boto.connect_cloudformation()
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack(
"test_stack",
template_body=sqs_template_json,
@ -68,13 +74,13 @@ def test_stack_ec2_integration():
}
ec2_template_json = json.dumps(ec2_template)
conn = boto.connect_cloudformation()
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack(
"ec2_stack",
template_body=ec2_template_json,
)
ec2_conn = boto.connect_ec2()
ec2_conn = boto.ec2.connect_to_region("us-west-1")
reservation = ec2_conn.get_all_instances()[0]
ec2_instance = reservation.instances[0]
@ -111,16 +117,16 @@ def test_stack_elb_integration_with_attached_ec2_instances():
}
elb_template_json = json.dumps(elb_template)
conn = boto.connect_cloudformation()
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack(
"elb_stack",
template_body=elb_template_json,
)
elb_conn = boto.connect_elb()
elb_conn = boto.ec2.elb.connect_to_region("us-west-1")
load_balancer = elb_conn.get_all_load_balancers()[0]
ec2_conn = boto.connect_ec2()
ec2_conn = boto.ec2.connect_to_region("us-west-1")
reservation = ec2_conn.get_all_instances()[0]
ec2_instance = reservation.instances[0]
instance_id = ec2_instance.id
@ -183,13 +189,13 @@ def test_stack_security_groups():
}
security_group_template_json = json.dumps(security_group_template)
conn = boto.connect_cloudformation()
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack(
"security_group_stack",
template_body=security_group_template_json,
)
ec2_conn = boto.connect_ec2()
ec2_conn = boto.ec2.connect_to_region("us-west-1")
security_groups = ec2_conn.get_all_security_groups()
for group in security_groups:
if "InstanceSecurityGroup" in group.name:
@ -266,13 +272,13 @@ def test_autoscaling_group_with_elb():
web_setup_template_json = json.dumps(web_setup_template)
conn = boto.connect_cloudformation()
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack(
"web_stack",
template_body=web_setup_template_json,
)
autoscale_conn = boto.connect_autoscale()
autoscale_conn = boto.ec2.autoscale.connect_to_region("us-west-1")
autoscale_group = autoscale_conn.get_all_groups()[0]
autoscale_group.launch_config_name.should.contain("my-launch-config")
autoscale_group.load_balancers[0].should.equal('my-elb')
@ -281,7 +287,7 @@ def test_autoscaling_group_with_elb():
autoscale_conn.get_all_launch_configurations().should.have.length_of(1)
# Confirm the ELB was actually created
elb_conn = boto.connect_elb()
elb_conn = boto.ec2.elb.connect_to_region("us-west-1")
elb_conn.get_all_load_balancers().should.have.length_of(1)
stack = conn.describe_stacks()[0]
@ -301,13 +307,13 @@ def test_autoscaling_group_with_elb():
def test_vpc_single_instance_in_subnet():
template_json = json.dumps(vpc_single_instance_in_subnet.template)
conn = boto.connect_cloudformation()
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack(
"test_stack",
template_body=template_json,
)
vpc_conn = boto.connect_vpc()
vpc_conn = boto.vpc.connect_to_region("us-west-1")
vpc = vpc_conn.get_all_vpcs()[0]
vpc.cidr_block.should.equal("10.0.0.0/16")
@ -317,7 +323,7 @@ def test_vpc_single_instance_in_subnet():
subnet = vpc_conn.get_all_subnets()[0]
subnet.vpc_id.should.equal(vpc.id)
ec2_conn = boto.connect_ec2()
ec2_conn = boto.ec2.connect_to_region("us-west-1")
reservation = ec2_conn.get_all_instances()[0]
instance = reservation.instances[0]
# Check that the EIP is attached the the EC2 instance
@ -426,13 +432,13 @@ def test_iam_roles():
}
iam_template_json = json.dumps(iam_template)
conn = boto.connect_cloudformation()
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack(
"test_stack",
template_body=iam_template_json,
)
iam_conn = boto.connect_iam()
iam_conn = boto.iam.connect_to_region("us-west-1")
role_result = iam_conn.list_roles()['list_roles_response']['list_roles_result']['roles'][0]
role = iam_conn.get_role(role_result.role_name)
@ -446,7 +452,7 @@ def test_iam_roles():
instance_profile.path.should.equal("my-path")
instance_profile.role_id.should.equal(role.role_id)
autoscale_conn = boto.connect_autoscale()
autoscale_conn = boto.ec2.autoscale.connect_to_region("us-west-1")
launch_config = autoscale_conn.get_all_launch_configurations()[0]
launch_config.instance_profile_name.should.contain("my-instance-profile")
@ -464,13 +470,13 @@ def test_iam_roles():
def test_single_instance_with_ebs_volume():
template_json = json.dumps(single_instance_with_ebs_volume.template)
conn = boto.connect_cloudformation()
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack(
"test_stack",
template_body=template_json,
)
ec2_conn = boto.connect_ec2()
ec2_conn = boto.ec2.connect_to_region("us-west-1")
reservation = ec2_conn.get_all_instances()[0]
ec2_instance = reservation.instances[0]
@ -489,12 +495,9 @@ def test_single_instance_with_ebs_volume():
def test_classic_eip():
template_json = json.dumps(ec2_classic_eip.template)
conn = boto.connect_cloudformation()
conn.create_stack(
"test_stack",
template_body=template_json,
)
ec2_conn = boto.connect_ec2()
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack("test_stack", template_body=template_json)
ec2_conn = boto.ec2.connect_to_region("us-west-1")
eip = ec2_conn.get_all_addresses()[0]
stack = conn.describe_stacks()[0]
@ -508,12 +511,9 @@ def test_classic_eip():
def test_vpc_eip():
template_json = json.dumps(vpc_eip.template)
conn = boto.connect_cloudformation()
conn.create_stack(
"test_stack",
template_body=template_json,
)
ec2_conn = boto.connect_ec2()
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack("test_stack", template_body=template_json)
ec2_conn = boto.ec2.connect_to_region("us-west-1")
eip = ec2_conn.get_all_addresses()[0]
stack = conn.describe_stacks()[0]
@ -527,12 +527,9 @@ def test_vpc_eip():
def test_fn_join():
template_json = json.dumps(fn_join.template)
conn = boto.connect_cloudformation()
conn.create_stack(
"test_stack",
template_body=template_json,
)
ec2_conn = boto.connect_ec2()
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack("test_stack", template_body=template_json)
ec2_conn = boto.ec2.connect_to_region("us-west-1")
eip = ec2_conn.get_all_addresses()[0]
stack = conn.describe_stacks()[0]

View File

@ -82,7 +82,7 @@ def test_parse_stack_resources():
stack_id="test_id",
name="test_stack",
template=dummy_template_json,
)
region_name='us-west-1')
stack.resource_map.should.have.length_of(1)
list(stack.resource_map.keys())[0].should.equal('Queue')
@ -101,7 +101,8 @@ def test_parse_stack_with_name_type_resource():
stack = FakeStack(
stack_id="test_id",
name="test_stack",
template=name_type_template_json)
template=name_type_template_json,
region_name='us-west-1')
stack.resource_map.should.have.length_of(1)
list(stack.resource_map.keys())[0].should.equal('Queue')
@ -113,7 +114,8 @@ def test_parse_stack_with_outputs():
stack = FakeStack(
stack_id="test_id",
name="test_stack",
template=output_type_template_json)
template=output_type_template_json,
region_name='us-west-1')
stack.output_map.should.have.length_of(1)
list(stack.output_map.keys())[0].should.equal('Output1')
@ -126,7 +128,8 @@ def test_parse_stack_with_get_attribute_outputs():
stack = FakeStack(
stack_id="test_id",
name="test_stack",
template=get_attribute_outputs_template_json)
template=get_attribute_outputs_template_json,
region_name='us-west-1')
stack.output_map.should.have.length_of(1)
list(stack.output_map.keys())[0].should.equal('Output1')
@ -137,4 +140,4 @@ def test_parse_stack_with_get_attribute_outputs():
def test_parse_stack_with_bad_get_attribute_outputs():
FakeStack.when.called_with(
"test_id", "test_stack", bad_output_template_json).should.throw(BotoServerError)
"test_id", "test_stack", bad_output_template_json, "us-west-1").should.throw(BotoServerError)

View File

@ -2,7 +2,7 @@ from __future__ import unicode_literals
import boto
from boto.exception import EC2ResponseError
import sure # noqa
import tests.backport_assert_raises
import tests.backport_assert_raises # noqa
from nose.tools import assert_raises
from moto import mock_ec2
@ -57,3 +57,14 @@ def test_decorater_wrapped_gets_set():
Moto decorator's __wrapped__ should get set to the tests function
"""
test_decorater_wrapped_gets_set.__wrapped__.__name__.should.equal('test_decorater_wrapped_gets_set')
@mock_ec2
class Tester(object):
def test_the_class(self):
conn = boto.connect_ec2()
list(conn.get_all_instances()).should.have.length_of(0)
def test_still_the_same(self):
conn = boto.connect_ec2()
list(conn.get_all_instances()).should.have.length_of(0)

View File

@ -1,9 +1,10 @@
from __future__ import unicode_literals
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
import tests.backport_assert_raises # noqa
from nose.tools import assert_raises
import boto
import boto.ec2
from boto.exception import EC2ResponseError
import sure # noqa
@ -55,7 +56,7 @@ def test_ami_create_and_delete():
@requires_boto_gte("2.14.0")
@mock_ec2
def test_ami_copy():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-west-1")
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
@ -183,6 +184,7 @@ def test_ami_filters():
amis_by_name = conn.get_all_images(filters={'name': imageA.name})
set([ami.id for ami in amis_by_name]).should.equal(set([imageA.id]))
@mock_ec2
def test_ami_filtering_via_tag():
conn = boto.connect_vpc('the_key', 'the_secret')
@ -309,4 +311,3 @@ def test_ami_attribute():
attribute='launchPermission',
operation='remove',
user_ids=['user']).should.throw(NotImplementedError)

View File

@ -190,7 +190,7 @@ def test_modify_attribute_blockDeviceMapping():
[0] https://github.com/spulec/moto/issues/160
"""
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
reservation = conn.run_instances('ami-1234abcd')

View File

@ -4,6 +4,8 @@ import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
import boto.cloudformation
import boto.ec2
from boto.exception import EC2ResponseError
import sure # noqa
@ -151,12 +153,12 @@ def test_elastic_network_interfaces_filtering():
def test_elastic_network_interfaces_cloudformation():
template = vpc_eni.template
template_json = json.dumps(template)
conn = boto.connect_cloudformation()
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack(
"test_stack",
template_body=template_json,
)
ec2_conn = boto.connect_ec2()
ec2_conn = boto.ec2.connect_to_region("us-west-1")
eni = ec2_conn.get_all_network_interfaces()[0]
stack = conn.describe_stacks()[0]

View File

@ -106,7 +106,7 @@ def test_request_spot_instances_fulfilled():
"""
Test that moto correctly fullfills a spot instance request
"""
conn = boto.connect_ec2()
conn = boto.ec2.connect_to_region("us-east-1")
request = conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234',
@ -184,7 +184,7 @@ def test_get_all_spot_instance_requests_filtering():
@mock_ec2
def test_request_spot_instances_setting_instance_id():
conn = boto.connect_ec2()
conn = boto.ec2.connect_to_region("us-east-1")
request = conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234')

View File

@ -3,6 +3,7 @@ import itertools
import boto
from boto.exception import EC2ResponseError
from boto.ec2.instance import Reservation
import sure # noqa
from moto import mock_ec2
@ -253,3 +254,79 @@ def test_get_all_tags_value_filter():
tags = conn.get_all_tags(filters={'value': '*value\*\?'})
tags.should.have.length_of(1)
@mock_ec2
def test_retrieved_instances_must_contain_their_tags():
tag_key = 'Tag name'
tag_value = 'Tag value'
tags_to_be_set = {tag_key: tag_value}
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
reservation.should.be.a(Reservation)
reservation.instances.should.have.length_of(1)
instance = reservation.instances[0]
reservations = conn.get_all_instances()
reservations.should.have.length_of(1)
reservations[0].id.should.equal(reservation.id)
instances = reservations[0].instances
instances.should.have.length_of(1)
instances[0].id.should.equal(instance.id)
conn.create_tags([instance.id], tags_to_be_set)
reservations = conn.get_all_instances()
instance = reservations[0].instances[0]
retrieved_tags = instance.tags
#Cleanup of instance
conn.terminate_instances([instances[0].id])
#Check whether tag is present with correct value
retrieved_tags[tag_key].should.equal(tag_value)
@mock_ec2
def test_retrieved_volumes_must_contain_their_tags():
tag_key = 'Tag name'
tag_value = 'Tag value'
tags_to_be_set = {tag_key: tag_value}
conn = boto.connect_ec2('the_key', 'the_secret')
volume = conn.create_volume(80, "us-east-1a")
all_volumes = conn.get_all_volumes()
volume = all_volumes[0]
conn.create_tags([volume.id], tags_to_be_set)
#Fetch the volume again
all_volumes = conn.get_all_volumes()
volume = all_volumes[0]
retrieved_tags = volume.tags
volume.delete()
#Check whether tag is present with correct value
retrieved_tags[tag_key].should.equal(tag_value)
@mock_ec2
def test_retrieved_snapshots_must_contain_their_tags():
tag_key = 'Tag name'
tag_value = 'Tag value'
tags_to_be_set = {tag_key: tag_value}
conn = boto.connect_ec2(aws_access_key_id='the_key', aws_secret_access_key='the_secret')
volume = conn.create_volume(80, "eu-west-1a")
snapshot = conn.create_snapshot(volume.id)
conn.create_tags([snapshot.id], tags_to_be_set)
#Fetch the snapshot again
all_snapshots = conn.get_all_snapshots()
snapshot = all_snapshots[0]
retrieved_tags = snapshot.tags
conn.delete_snapshot(snapshot.id)
volume.delete()
#Check whether tag is present with correct value
retrieved_tags[tag_key].should.equal(tag_value)

View File

@ -1,5 +1,6 @@
from __future__ import unicode_literals
import boto
import boto.ec2.elb
from boto.ec2.elb import HealthCheck
import sure # noqa
@ -28,6 +29,21 @@ def test_create_load_balancer():
listener2.protocol.should.equal("TCP")
@mock_elb
def test_create_elb_in_multiple_region():
zones = ['us-east-1a', 'us-east-1b']
ports = [(80, 8080, 'http'), (443, 8443, 'tcp')]
west1_conn = boto.ec2.elb.connect_to_region("us-west-1")
west1_conn.create_load_balancer('my-lb', zones, ports)
west2_conn = boto.ec2.elb.connect_to_region("us-west-2")
west2_conn.create_load_balancer('my-lb', zones, ports)
list(west1_conn.get_all_load_balancers()).should.have.length_of(1)
list(west2_conn.get_all_load_balancers()).should.have.length_of(1)
@mock_elb
def test_add_listener():
conn = boto.connect_elb()

View File

@ -29,6 +29,26 @@ def test_publish_to_sqs():
message.get_body().should.equal('my message')
@mock_sqs
@mock_sns
def test_publish_to_sqs_in_different_region():
conn = boto.sns.connect_to_region("us-west-1")
conn.create_topic("some-topic")
topics_json = conn.get_all_topics()
topic_arn = topics_json["ListTopicsResponse"]["ListTopicsResult"]["Topics"][0]['TopicArn']
sqs_conn = boto.sqs.connect_to_region("us-west-2")
sqs_conn.create_queue("test-queue")
conn.subscribe(topic_arn, "sqs", "arn:aws:sqs:us-west-2:123456789012:test-queue")
conn.publish(topic=topic_arn, message="my message")
queue = sqs_conn.get_queue("test-queue")
message = queue.read(1)
message.get_body().should.equal('my message')
@freeze_time("2013-01-01")
@mock_sns
def test_publish_to_http():

View File

@ -27,6 +27,18 @@ def test_create_and_delete_topic():
topics.should.have.length_of(0)
@mock_sns
def test_create_topic_in_multiple_regions():
west1_conn = boto.sns.connect_to_region("us-west-1")
west1_conn.create_topic("some-topic")
west2_conn = boto.sns.connect_to_region("us-west-2")
west2_conn.create_topic("some-topic")
list(west1_conn.get_all_topics()["ListTopicsResponse"]["ListTopicsResult"]["Topics"]).should.have.length_of(1)
list(west2_conn.get_all_topics()["ListTopicsResponse"]["ListTopicsResult"]["Topics"]).should.have.length_of(1)
@mock_sns
def test_topic_attributes():
conn = boto.connect_sns()

View File

@ -10,6 +10,7 @@ import time
from moto import mock_sqs
from tests.helpers import requires_boto_gte
@mock_sqs
def test_create_queue():
conn = boto.connect_sqs('the_key', 'the_secret')
@ -21,6 +22,18 @@ def test_create_queue():
all_queues[0].get_timeout().should.equal(60)
@mock_sqs
def test_create_queues_in_multiple_region():
west1_conn = boto.sqs.connect_to_region("us-west-1")
west1_conn.create_queue("test-queue")
west2_conn = boto.sqs.connect_to_region("us-west-2")
west2_conn.create_queue("test-queue")
list(west1_conn.get_all_queues()).should.have.length_of(1)
list(west2_conn.get_all_queues()).should.have.length_of(1)
@mock_sqs
def test_get_queue():
conn = boto.connect_sqs('the_key', 'the_secret')

View File

@ -6,3 +6,6 @@ deps = -r{toxinidir}/requirements.txt
commands =
{envpython} setup.py test
nosetests
[flake8]
ignore = E128,E501