Merge branch 'master' of https://github.com/spulec/moto into 0.4.1-threadsafe

* 'master' of https://github.com/spulec/moto: (25 commits)
  Add @zkourouma to authors.
  0.4.2
  Fix bug where listener certificate was not being saved correctly when creating an elb. Added test to cover that case.
  [dynamodb2] adds lookup method to Table class
  Add IAM list_groups and list_groups_for_user. Closes #343.
  Fix for deleting Route53 record sets with set identifiers. Closes #342.
  Use dummy date instead of an invalid date
  Adding support for comments on hosted zones.
  Add availability zone support to Subnets created via CloudFormation
  Make availability zone dynamic in Subnet Response templates
  Add filter "availabilityZone" to DescribeSubnets and add availability zone support too
  allow starting without reseting
  Fix bug with empty string for instance vpc_id. Closes #337.
  Fix default security group description.
  Update responses.py
  Add @mrucci to authors.
  Fix merge conflicts.
  Add support for ELB attributes.
  cast to int when doing math.
  General cleanup.
  ...
This commit is contained in:
Jeffrey Gelens 2015-05-29 11:35:14 +02:00
commit e722b67f36
33 changed files with 1382 additions and 168 deletions

View File

@ -37,3 +37,5 @@ Moto is written by Steve Pulec with contributions from:
* [Mike Fuller](https://github.com/mfulleratlassian)
* [Andy](https://github.com/aaltepet)
* [Mike Grima](https://github.com/mikegrima)
* [Marco Rucci](https://github.com/mrucci)
* [Zack Kourouma](https://github.com/zkourouma)

View File

@ -186,7 +186,7 @@ class FakeAutoScalingGroup(object):
if self.desired_capacity > curr_instance_count:
# Need more instances
count_needed = self.desired_capacity - curr_instance_count
count_needed = int(self.desired_capacity) - int(curr_instance_count)
reservation = self.autoscaling_backend.ec2_backend.add_instances(
self.launch_config.image_id,
count_needed,

View File

@ -29,10 +29,11 @@ class MockAWS(object):
def __exit__(self, *args):
self.stop()
def start(self):
def start(self, reset=True):
self.__class__.nested_count += 1
for backend in self.backends.values():
backend.reset()
if reset:
for backend in self.backends.values():
backend.reset()
if not HTTPretty.is_enabled():
HTTPretty.enable()

View File

@ -122,7 +122,7 @@ class Item(object):
class Table(object):
def __init__(self, table_name, schema=None, attr=None, throughput=None, indexes=None):
def __init__(self, table_name, schema=None, attr=None, throughput=None, indexes=None, global_indexes=None):
self.name = table_name
self.attr = attr
self.schema = schema
@ -143,6 +143,7 @@ class Table(object):
self.throughput = throughput
self.throughput["NumberOfDecreasesToday"] = 0
self.indexes = indexes
self.global_indexes = global_indexes if global_indexes else []
self.created_at = datetime.datetime.now()
self.items = defaultdict(dict)
@ -158,6 +159,7 @@ class Table(object):
'KeySchema': self.schema,
'ItemCount': len(self),
'CreationDateTime': unix_time(self.created_at),
'GlobalSecondaryIndexes': [index for index in self.global_indexes],
}
}
return results
@ -171,6 +173,24 @@ class Table(object):
count += 1
return count
@property
def hash_key_names(self):
keys = [self.hash_key_attr]
for index in self.global_indexes:
for key in index['KeySchema']:
if key['KeyType'] == 'HASH':
keys.append(key['AttributeName'])
return keys
@property
def range_key_names(self):
keys = [self.range_key_attr]
for index in self.global_indexes:
for key in index['KeySchema']:
if key['KeyType'] == 'RANGE':
keys.append(key['AttributeName'])
return keys
def put_item(self, item_attrs):
hash_value = DynamoType(item_attrs.get(self.hash_key_attr))
if self.has_range_key:
@ -268,6 +288,16 @@ class Table(object):
results.append(result)
return results, scanned_count, last_page
def lookup(self, *args, **kwargs):
if not self.schema:
self.describe()
for x, arg in enumerate(args):
kwargs[self.schema[x].name] = arg
ret = self.get_item(**kwargs)
if not ret.keys():
return None
return ret
class DynamoDBBackend(BaseBackend):
@ -293,12 +323,21 @@ class DynamoDBBackend(BaseBackend):
return None
return table.put_item(item_attrs)
def get_table_keys_name(self, table_name):
def get_table_keys_name(self, table_name, keys):
"""
Given a set of keys, extracts the key and range key
"""
table = self.tables.get(table_name)
if not table:
return None, None
else:
return table.hash_key_attr, table.range_key_attr
hash_key = range_key = None
for key in keys:
if key in table.hash_key_names:
hash_key = key
elif key in table.range_key_names:
range_key = key
return hash_key, range_key
def get_keys_value(self, table, keys):
if table.hash_key_attr not in keys or (table.has_range_key and table.range_key_attr not in keys):

View File

@ -99,10 +99,12 @@ class DynamoHandler(BaseResponse):
# getting attribute definition
attr = body["AttributeDefinitions"]
# getting the indexes
global_indexes = body.get("GlobalSecondaryIndexes", [])
table = dynamodb_backend2.create_table(table_name,
schema=key_schema,
throughput=throughput,
attr=attr)
attr=attr,
global_indexes=global_indexes)
return dynamo_json_dump(table.describe)
def delete_table(self):
@ -216,13 +218,14 @@ class DynamoHandler(BaseResponse):
def query(self):
name = self.body['TableName']
keys = self.body['KeyConditions']
hash_key_name, range_key_name = dynamodb_backend2.get_table_keys_name(name)
key_conditions = self.body['KeyConditions']
hash_key_name, range_key_name = dynamodb_backend2.get_table_keys_name(name, key_conditions.keys())
# hash_key_name, range_key_name = dynamodb_backend2.get_table_keys_name(name)
if hash_key_name is None:
er = "'com.amazonaws.dynamodb.v20120810#ResourceNotFoundException"
return self.error(er)
hash_key = keys[hash_key_name]['AttributeValueList'][0]
if len(keys) == 1:
hash_key = key_conditions[hash_key_name]['AttributeValueList'][0]
if len(key_conditions) == 1:
range_comparison = None
range_values = []
else:
@ -230,7 +233,7 @@ class DynamoHandler(BaseResponse):
er = "com.amazon.coral.validate#ValidationException"
return self.error(er)
else:
range_condition = keys[range_key_name]
range_condition = key_conditions[range_key_name]
if range_condition:
range_comparison = range_condition['ComparisonOperator']
range_values = range_condition['AttributeValueList']

View File

@ -1149,6 +1149,10 @@ class SecurityGroupBackend(object):
def __init__(self):
# the key in the dict group is the vpc_id or None (non-vpc)
self.groups = defaultdict(dict)
# Create the default security group
self.create_security_group("default", "default group")
super(SecurityGroupBackend, self).__init__()
def create_security_group(self, name, description, vpc_id=None, force=False):
@ -1212,11 +1216,6 @@ class SecurityGroupBackend(object):
if group.name == name:
return group
if name == 'default':
# If the request is for the default group and it does not exist, create it
default_group = self.create_security_group("default", "The default security group", vpc_id=vpc_id, force=True)
return default_group
def get_security_group_by_name_or_id(self, group_name_or_id, vpc_id):
# try searching by id, fallbacks to name search
group = self.get_security_group_from_id(group_name_or_id)
@ -1309,7 +1308,7 @@ class SecurityGroupIngress(object):
from_port = properties.get("FromPort")
source_security_group_id = properties.get("SourceSecurityGroupId")
source_security_group_name = properties.get("SourceSecurityGroupName")
source_security_owner_id = properties.get("SourceSecurityGroupOwnerId") # IGNORED AT THE MOMENT
# source_security_owner_id = properties.get("SourceSecurityGroupOwnerId") # IGNORED AT THE MOMENT
to_port = properties.get("ToPort")
assert group_id or group_name
@ -1329,7 +1328,6 @@ class SecurityGroupIngress(object):
else:
ip_ranges = []
if group_id:
security_group = ec2_backend.describe_security_groups(group_ids=[group_id])[0]
else:
@ -1697,41 +1695,66 @@ class VPCPeeringConnectionBackend(object):
class Subnet(TaggedEC2Resource):
def __init__(self, ec2_backend, subnet_id, vpc_id, cidr_block):
def __init__(self, ec2_backend, subnet_id, vpc_id, cidr_block, availability_zone):
self.ec2_backend = ec2_backend
self.id = subnet_id
self.vpc_id = vpc_id
self.cidr_block = cidr_block
self._availability_zone = availability_zone
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
vpc_id = properties['VpcId']
cidr_block = properties['CidrBlock']
availability_zone = properties.get('AvailabilityZone')
ec2_backend = ec2_backends[region_name]
subnet = ec2_backend.create_subnet(
vpc_id=vpc_id,
cidr_block=properties['CidrBlock']
cidr_block=cidr_block,
availability_zone=availability_zone,
)
return subnet
@property
def availability_zone(self):
# This could probably be smarter, but there doesn't appear to be a
# way to pull AZs for a region in boto
return self.ec2_backend.region_name + "a"
if self._availability_zone is None:
# This could probably be smarter, but there doesn't appear to be a
# way to pull AZs for a region in boto
return self.ec2_backend.region_name + "a"
else:
return self._availability_zone
@property
def physical_resource_id(self):
return self.id
def get_filter_value(self, filter_name):
"""
API Version 2014-10-01 defines the following filters for DescribeSubnets:
* availabilityZone
* available-ip-address-count
* cidrBlock
* defaultForAz
* state
* subnet-id
* tag:key=value
* tag-key
* tag-value
* vpc-id
Taken from: http://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html
"""
if filter_name in ['cidr', 'cidrBlock', 'cidr-block']:
return self.cidr_block
elif filter_name == 'vpc-id':
return self.vpc_id
elif filter_name == 'subnet-id':
return self.id
elif filter_name == 'availabilityZone':
return self.availability_zone
filter_value = super(Subnet, self).get_filter_value(filter_name)
@ -1758,9 +1781,9 @@ class SubnetBackend(object):
raise InvalidSubnetIdError(subnet_id)
return subnet
def create_subnet(self, vpc_id, cidr_block):
def create_subnet(self, vpc_id, cidr_block, availability_zone=None):
subnet_id = random_subnet_id()
subnet = Subnet(self, subnet_id, vpc_id, cidr_block)
subnet = Subnet(self, subnet_id, vpc_id, cidr_block, availability_zone)
self.get_vpc(vpc_id) # Validate VPC exists
# AWS associates a new subnet with the default Network ACL

View File

@ -103,7 +103,7 @@ CREATE_VOLUME_RESPONSE = """<CreateVolumeResponse xmlns="http://ec2.amazonaws.co
<snapshotId/>
<availabilityZone>{{ volume.zone.name }}</availabilityZone>
<status>creating</status>
<createTime>YYYY-MM-DDTHH:MM:SS.000Z</createTime>
<createTime>2013-10-04T17:38:53.000Z</createTime>
<volumeType>standard</volumeType>
</CreateVolumeResponse>"""
@ -117,7 +117,7 @@ DESCRIBE_VOLUMES_RESPONSE = """<DescribeVolumesResponse xmlns="http://ec2.amazon
<snapshotId/>
<availabilityZone>{{ volume.zone.name }}</availabilityZone>
<status>{{ volume.status }}</status>
<createTime>YYYY-MM-DDTHH:MM:SS.SSSZ</createTime>
<createTime>2013-10-04T17:38:53.000Z</createTime>
<attachmentSet>
{% if volume.attachment %}
<item>
@ -125,7 +125,7 @@ DESCRIBE_VOLUMES_RESPONSE = """<DescribeVolumesResponse xmlns="http://ec2.amazon
<instanceId>{{ volume.attachment.instance.id }}</instanceId>
<device>{{ volume.attachment.device }}</device>
<status>attached</status>
<attachTime>YYYY-MM-DDTHH:MM:SS.SSSZ</attachTime>
<attachTime>2013-10-04T17:38:53.000Z</attachTime>
<deleteOnTermination>false</deleteOnTermination>
</item>
{% endif %}
@ -157,7 +157,7 @@ ATTACHED_VOLUME_RESPONSE = """<AttachVolumeResponse xmlns="http://ec2.amazonaws.
<instanceId>{{ attachment.instance.id }}</instanceId>
<device>{{ attachment.device }}</device>
<status>attaching</status>
<attachTime>YYYY-MM-DDTHH:MM:SS.000Z</attachTime>
<attachTime>2013-10-04T17:38:53.000Z</attachTime>
</AttachVolumeResponse>"""
DETATCH_VOLUME_RESPONSE = """<DetachVolumeResponse xmlns="http://ec2.amazonaws.com/doc/2012-12-01/">
@ -166,7 +166,7 @@ DETATCH_VOLUME_RESPONSE = """<DetachVolumeResponse xmlns="http://ec2.amazonaws.c
<instanceId>{{ attachment.instance.id }}</instanceId>
<device>{{ attachment.device }}</device>
<status>detaching</status>
<attachTime>YYYY-MM-DDTHH:MM:SS.000Z</attachTime>
<attachTime>2013-10-04T17:38:53.000Z</attachTime>
</DetachVolumeResponse>"""
CREATE_SNAPSHOT_RESPONSE = """<CreateSnapshotResponse xmlns="http://ec2.amazonaws.com/doc/2012-12-01/">
@ -174,7 +174,7 @@ CREATE_SNAPSHOT_RESPONSE = """<CreateSnapshotResponse xmlns="http://ec2.amazonaw
<snapshotId>{{ snapshot.id }}</snapshotId>
<volumeId>{{ snapshot.volume.id }}</volumeId>
<status>pending</status>
<startTime>YYYY-MM-DDTHH:MM:SS.000Z</startTime>
<startTime>2013-10-04T17:38:53.000Z</startTime>
<progress>60%</progress>
<ownerId>111122223333</ownerId>
<volumeSize>{{ snapshot.volume.size }}</volumeSize>
@ -189,7 +189,7 @@ DESCRIBE_SNAPSHOTS_RESPONSE = """<DescribeSnapshotsResponse xmlns="http://ec2.am
<snapshotId>{{ snapshot.id }}</snapshotId>
<volumeId>{{ snapshot.volume.id }}</volumeId>
<status>pending</status>
<startTime>YYYY-MM-DDTHH:MM:SS.SSSZ</startTime>
<startTime>2013-10-04T17:38:53.000Z</startTime>
<progress>30%</progress>
<ownerId>111122223333</ownerId>
<volumeSize>{{ snapshot.volume.size }}</volumeSize>

View File

@ -214,8 +214,10 @@ EC2_RUN_INSTANCES = """<RunInstancesResponse xmlns="http://ec2.amazonaws.com/doc
<state>enabled</state>
</monitoring>
{% if instance.nics %}
<subnetId>{{ instance.nics[0].subnet.id }}</subnetId>
<vpcId>{{ instance.nics[0].subnet.vpc_id }}</vpcId>
{% if instance.nics[0].subnet %}
<subnetId>{{ instance.nics[0].subnet.id }}</subnetId>
<vpcId>{{ instance.nics[0].subnet.vpc_id }}</vpcId>
{% endif %}
<privateIpAddress>{{ instance.private_ip }}</privateIpAddress>
{% if instance.public_ip %}
<ipAddress>{{ instance.public_ip }}</ipAddress>
@ -245,8 +247,10 @@ EC2_RUN_INSTANCES = """<RunInstancesResponse xmlns="http://ec2.amazonaws.com/doc
{% for nic in instance.nics.values() %}
<item>
<networkInterfaceId>{{ nic.id }}</networkInterfaceId>
<subnetId>{{ nic.subnet.id }}</subnetId>
<vpcId>{{ nic.subnet.vpc_id }}</vpcId>
{% if nic.subnet %}
<subnetId>{{ nic.subnet.id }}</subnetId>
<vpcId>{{ nic.subnet.vpc_id }}</vpcId>
{% endif %}
<description>Primary network interface</description>
<ownerId>111122223333</ownerId>
<status>in-use</status>
@ -338,8 +342,10 @@ EC2_DESCRIBE_INSTANCES = """<DescribeInstancesResponse xmlns='http://ec2.amazona
<state>disabled</state>
</monitoring>
{% if instance.nics %}
<subnetId>{{ instance.nics[0].subnet.id }}</subnetId>
<vpcId>{{ instance.nics[0].subnet.vpc_id }}</vpcId>
{% if instance.nics[0].subnet %}
<subnetId>{{ instance.nics[0].subnet.id }}</subnetId>
<vpcId>{{ instance.nics[0].subnet.vpc_id }}</vpcId>
{% endif %}
<privateIpAddress>{{ instance.private_ip }}</privateIpAddress>
{% if instance.nics[0].public_ip %}
<ipAddress>{{ instance.nics[0].public_ip }}</ipAddress>
@ -390,8 +396,10 @@ EC2_DESCRIBE_INSTANCES = """<DescribeInstancesResponse xmlns='http://ec2.amazona
{% for nic in instance.nics.values() %}
<item>
<networkInterfaceId>{{ nic.id }}</networkInterfaceId>
<subnetId>{{ nic.subnet.id }}</subnetId>
<vpcId>{{ nic.subnet.vpc_id }}</vpcId>
{% if nic.subnet %}
<subnetId>{{ nic.subnet.id }}</subnetId>
<vpcId>{{ nic.subnet.vpc_id }}</vpcId>
{% endif %}
<description>Primary network interface</description>
<ownerId>111122223333</ownerId>
<status>in-use</status>

View File

@ -7,7 +7,15 @@ class Subnets(BaseResponse):
def create_subnet(self):
vpc_id = self.querystring.get('VpcId')[0]
cidr_block = self.querystring.get('CidrBlock')[0]
subnet = self.ec2_backend.create_subnet(vpc_id, cidr_block)
if 'AvailabilityZone' in self.querystring:
availability_zone = self.querystring['AvailabilityZone'][0]
else:
availability_zone = None
subnet = self.ec2_backend.create_subnet(
vpc_id,
cidr_block,
availability_zone,
)
template = self.response_template(CREATE_SUBNET_RESPONSE)
return template.render(subnet=subnet)
@ -33,7 +41,7 @@ CREATE_SUBNET_RESPONSE = """
<vpcId>{{ subnet.vpc_id }}</vpcId>
<cidrBlock>{{ subnet.cidr_block }}</cidrBlock>
<availableIpAddressCount>251</availableIpAddressCount>
<availabilityZone>us-east-1a</availabilityZone>
<availabilityZone>{{ subnet.availability_zone }}</availabilityZone>
<tagSet>
{% for tag in subnet.get_tags() %}
<item>
@ -64,7 +72,7 @@ DESCRIBE_SUBNETS_RESPONSE = """
<vpcId>{{ subnet.vpc_id }}</vpcId>
<cidrBlock>{{ subnet.cidr_block }}</cidrBlock>
<availableIpAddressCount>251</availableIpAddressCount>
<availabilityZone>us-east-1a</availabilityZone>
<availabilityZone>{{ subnet.availability_zone }}</availabilityZone>
<tagSet>
{% for tag in subnet.get_tags() %}
<item>

View File

@ -1,6 +1,13 @@
from __future__ import unicode_literals
import boto.ec2.elb
from boto.ec2.elb.attributes import (
LbAttributes,
ConnectionSettingAttribute,
ConnectionDrainingAttribute,
AccessLogAttribute,
CrossZoneLoadBalancingAttribute,
)
from moto.core import BaseBackend
@ -29,6 +36,7 @@ class FakeLoadBalancer(object):
self.instance_ids = []
self.zones = zones
self.listeners = []
self.attributes = FakeLoadBalancer.get_default_attributes()
for protocol, lb_port, instance_port, ssl_certificate_id in ports:
listener = FakeListener(
@ -73,6 +81,28 @@ class FakeLoadBalancer(object):
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "SourceSecurityGroup.OwnerAlias" ]"')
raise UnformattedGetAttTemplateException()
@classmethod
def get_default_attributes(cls):
attributes = LbAttributes()
cross_zone_load_balancing = CrossZoneLoadBalancingAttribute()
cross_zone_load_balancing.enabled = False
attributes.cross_zone_load_balancing = cross_zone_load_balancing
connection_draining = ConnectionDrainingAttribute()
connection_draining.enabled = False
attributes.connection_draining = connection_draining
access_log = AccessLogAttribute()
access_log.enabled = False
attributes.access_log = access_log
connection_settings = ConnectionSettingAttribute()
connection_settings.idle_timeout = 60
attributes.connecting_settings = connection_settings
return attributes
class ELBBackend(BaseBackend):
@ -151,6 +181,26 @@ class ELBBackend(BaseBackend):
load_balancer.instance_ids = new_instance_ids
return load_balancer
def set_cross_zone_load_balancing_attribute(self, load_balancer_name, attribute):
load_balancer = self.get_load_balancer(load_balancer_name)
load_balancer.attributes.cross_zone_load_balancing = attribute
return load_balancer
def set_access_log_attribute(self, load_balancer_name, attribute):
load_balancer = self.get_load_balancer(load_balancer_name)
load_balancer.attributes.access_log = attribute
return load_balancer
def set_connection_draining_attribute(self, load_balancer_name, attribute):
load_balancer = self.get_load_balancer(load_balancer_name)
load_balancer.attributes.connection_draining = attribute
return load_balancer
def set_connection_settings_attribute(self, load_balancer_name, attribute):
load_balancer = self.get_load_balancer(load_balancer_name)
load_balancer.attributes.connecting_settings = attribute
return load_balancer
elb_backends = {}
for region in boto.ec2.elb.regions():

View File

@ -1,4 +1,10 @@
from __future__ import unicode_literals
from boto.ec2.elb.attributes import (
ConnectionSettingAttribute,
ConnectionDrainingAttribute,
AccessLogAttribute,
CrossZoneLoadBalancingAttribute,
)
from moto.core.responses import BaseResponse
from .models import elb_backends
@ -25,7 +31,7 @@ class ELBResponse(BaseResponse):
break
lb_port = self.querystring['Listeners.member.{0}.LoadBalancerPort'.format(port_index)][0]
instance_port = self.querystring['Listeners.member.{0}.InstancePort'.format(port_index)][0]
ssl_certificate_id = self.querystring.get('Listeners.member.{0}.SSLCertificateId'.format(port_index)[0], None)
ssl_certificate_id = self.querystring.get('Listeners.member.{0}.SSLCertificateId'.format(port_index), [None])[0]
ports.append([protocol, lb_port, instance_port, ssl_certificate_id])
port_index += 1
@ -122,6 +128,64 @@ class ELBResponse(BaseResponse):
load_balancer = self.elb_backend.deregister_instances(load_balancer_name, instance_ids)
return template.render(load_balancer=load_balancer)
def describe_load_balancer_attributes(self):
load_balancer_name = self.querystring.get('LoadBalancerName')[0]
load_balancer = self.elb_backend.describe_load_balancers(load_balancer_name)[0]
template = self.response_template(DESCRIBE_ATTRIBUTES_TEMPLATE)
return template.render(attributes=load_balancer.attributes)
def modify_load_balancer_attributes(self):
load_balancer_name = self.querystring.get('LoadBalancerName')[0]
load_balancer = self.elb_backend.describe_load_balancers(load_balancer_name)[0]
def parse_attribute(attribute_name):
"""
Transform self.querystring parameters matching `LoadBalancerAttributes.attribute_name.attribute_key`
into a dictionary of (attribute_name, attribute_key)` pairs.
"""
attribute_prefix = "LoadBalancerAttributes." + attribute_name
return dict((key.lstrip(attribute_prefix), value[0]) for key, value in self.querystring.items() if key.startswith(attribute_prefix))
cross_zone = parse_attribute("CrossZoneLoadBalancing")
if cross_zone:
attribute = CrossZoneLoadBalancingAttribute()
attribute.enabled = cross_zone["Enabled"] == "true"
self.elb_backend.set_cross_zone_load_balancing_attribute(load_balancer_name, attribute)
access_log = parse_attribute("AccessLog")
if access_log:
attribute = AccessLogAttribute()
attribute.enabled = access_log["Enabled"] == "true"
attribute.s3_bucket_name = access_log["S3BucketName"]
attribute.s3_bucket_prefix = access_log["S3BucketPrefix"]
attribute.emit_interval = access_log["EmitInterval"]
self.elb_backend.set_access_log_attribute(load_balancer_name, attribute)
connection_draining = parse_attribute("ConnectionDraining")
if connection_draining:
attribute = ConnectionDrainingAttribute()
attribute.enabled = connection_draining["Enabled"] == "true"
attribute.timeout = connection_draining["Timeout"]
self.elb_backend.set_connection_draining_attribute(load_balancer_name, attribute)
connection_settings = parse_attribute("ConnectionSettings")
if connection_settings:
attribute = ConnectionSettingAttribute()
attribute.idle_timeout = connection_settings["IdleTimeout"]
self.elb_backend.set_connection_settings_attribute(load_balancer_name, attribute)
template = self.response_template(MODIFY_ATTRIBUTES_TEMPLATE)
return template.render(attributes=load_balancer.attributes)
def describe_instance_health(self):
load_balancer_name = self.querystring.get('LoadBalancerName')[0]
instance_ids = [value[0] for key, value in self.querystring.items() if "Instances.member" in key]
if len(instance_ids) == 0:
instance_ids = self.elb_backend.describe_load_balancers(load_balancer_name)[0].instance_ids
template = self.response_template(DESCRIBE_INSTANCE_HEALTH_TEMPLATE)
return template.render(instance_ids=instance_ids)
CREATE_LOAD_BALANCER_TEMPLATE = """<CreateLoadBalancerResult xmlns="http://elasticloadbalancing.amazonaws.com/doc/2012-06-01/">
<DNSName>tests.us-east-1.elb.amazonaws.com</DNSName>
</CreateLoadBalancerResult>"""
@ -253,3 +317,84 @@ DELETE_LOAD_BALANCER_LISTENERS = """<DeleteLoadBalancerListenersResponse xmlns="
<RequestId>83c88b9d-12b7-11e3-8b82-87b12EXAMPLE</RequestId>
</ResponseMetadata>
</DeleteLoadBalancerListenersResponse>"""
DESCRIBE_ATTRIBUTES_TEMPLATE = """<DescribeLoadBalancerAttributesResponse xmlns="http://elasticloadbalancing.amazonaws.com/doc/2012-06-01/">
<DescribeLoadBalancerAttributesResult>
<LoadBalancerAttributes>
<AccessLog>
<Enabled>{{ attributes.access_log.enabled }}</Enabled>
{% if attributes.access_log.enabled %}
<S3BucketName>{{ attributes.access_log.s3_bucket_name }}</S3BucketName>
<S3BucketPrefix>{{ attributes.access_log.s3_bucket_prefix }}</S3BucketPrefix>
<EmitInterval>{{ attributes.access_log.emit_interval }}</EmitInterval>
{% endif %}
</AccessLog>
<ConnectionSettings>
<IdleTimeout>{{ attributes.connecting_settings.idle_timeout }}</IdleTimeout>
</ConnectionSettings>
<CrossZoneLoadBalancing>
<Enabled>{{ attributes.cross_zone_load_balancing.enabled }}</Enabled>
</CrossZoneLoadBalancing>
<ConnectionDraining>
<Enabled>{{ attributes.connection_draining.enabled }}</Enabled>
{% if attributes.connection_draining.enabled %}
<Timeout>{{ attributes.connection_draining.timeout }}</Timeout>
{% endif %}
</ConnectionDraining>
</LoadBalancerAttributes>
</DescribeLoadBalancerAttributesResult>
<ResponseMetadata>
<RequestId>83c88b9d-12b7-11e3-8b82-87b12EXAMPLE</RequestId>
</ResponseMetadata>
</DescribeLoadBalancerAttributesResponse>
"""
MODIFY_ATTRIBUTES_TEMPLATE = """<ModifyLoadBalancerAttributesResponse xmlns="http://elasticloadbalancing.amazonaws.com/doc/2012-06-01/">
<ModifyLoadBalancerAttributesResult>
<LoadBalancerName>my-loadbalancer</LoadBalancerName>
<LoadBalancerAttributes>
<AccessLog>
<Enabled>{{ attributes.access_log.enabled }}</Enabled>
{% if attributes.access_log.enabled %}
<S3BucketName>{{ attributes.access_log.s3_bucket_name }}</S3BucketName>
<S3BucketPrefix>{{ attributes.access_log.s3_bucket_prefix }}</S3BucketPrefix>
<EmitInterval>{{ attributes.access_log.emit_interval }}</EmitInterval>
{% endif %}
</AccessLog>
<ConnectionSettings>
<IdleTimeout>{{ attributes.connecting_settings.idle_timeout }}</IdleTimeout>
</ConnectionSettings>
<CrossZoneLoadBalancing>
<Enabled>{{ attributes.cross_zone_load_balancing.enabled }}</Enabled>
</CrossZoneLoadBalancing>
<ConnectionDraining>
<Enabled>{{ attributes.connection_draining.enabled }}</Enabled>
{% if attributes.connection_draining.enabled %}
<Timeout>{{ attributes.connection_draining.timeout }}</Timeout>
{% endif %}
</ConnectionDraining>
</LoadBalancerAttributes>
</ModifyLoadBalancerAttributesResult>
<ResponseMetadata>
<RequestId>83c88b9d-12b7-11e3-8b82-87b12EXAMPLE</RequestId>
</ResponseMetadata>
</ModifyLoadBalancerAttributesResponse>
"""
DESCRIBE_INSTANCE_HEALTH_TEMPLATE = """<DescribeInstanceHealthResponse xmlns="http://elasticloadbalancing.amazonaws.com/doc/2012-06-01/">
<DescribeInstanceHealthResult>
<InstanceStates>
{% for instance_id in instance_ids %}
<member>
<Description>N/A</Description>
<InstanceId>{{ instance_id }}</InstanceId>
<State>InService</State>
<ReasonCode>N/A</ReasonCode>
</member>
{% endfor %}
</InstanceStates>
</DescribeInstanceHealthResult>
<ResponseMetadata>
<RequestId>1549581b-12b7-11e3-895e-1334aEXAMPLE</RequestId>
</ResponseMetadata>
</DescribeInstanceHealthResponse>"""

View File

@ -6,6 +6,7 @@ from .utils import random_access_key, random_alphanumeric, random_resource_id
from datetime import datetime
import base64
class Role(object):
def __init__(self, role_id, name, assume_role_policy_document, path):
@ -212,16 +213,16 @@ class User(object):
access_key_2_last_rotated = date_created.strftime(date_format)
return '{0},{1},{2},{3},{4},{5},not_supported,false,{6},{7},{8},{9},false,N/A,false,N/A'.format(self.name,
self.arn,
date_created.strftime(date_format),
password_enabled,
password_last_used,
date_created.strftime(date_format),
access_key_1_active,
access_key_1_last_rotated,
access_key_2_active,
access_key_2_last_rotated
)
self.arn,
date_created.strftime(date_format),
password_enabled,
password_last_used,
date_created.strftime(date_format),
access_key_1_active,
access_key_1_last_rotated,
access_key_2_active,
access_key_2_last_rotated
)
class IAMBackend(BaseBackend):
@ -337,6 +338,18 @@ class IAMBackend(BaseBackend):
return group
def list_groups(self):
return self.groups.values()
def get_groups_for_user(self, user_name):
user = self.get_user(user_name)
groups = []
for group in self.list_groups():
if user in group.users:
groups.append(group)
return groups
def create_user(self, user_name, path='/'):
if user_name in self.users:
raise BotoServerError(409, 'Conflict')

View File

@ -131,6 +131,18 @@ class IamResponse(BaseResponse):
template = self.response_template(GET_GROUP_TEMPLATE)
return template.render(group=group)
def list_groups(self):
groups = iam_backend.list_groups()
template = self.response_template(LIST_GROUPS_TEMPLATE)
return template.render(groups=groups)
def list_groups_for_user(self):
user_name = self._get_param('UserName')
groups = iam_backend.get_groups_for_user(user_name)
template = self.response_template(LIST_GROUPS_FOR_USER_TEMPLATE)
return template.render(groups=groups)
def create_user(self):
user_name = self._get_param('UserName')
path = self._get_param('Path')
@ -502,6 +514,45 @@ GET_GROUP_TEMPLATE = """<GetGroupResponse>
</ResponseMetadata>
</GetGroupResponse>"""
LIST_GROUPS_TEMPLATE = """<ListGroupsResponse>
<ListGroupsResult>
<Groups>
{% for group in groups %}
<member>
<Path>{{ group.path }}</Path>
<GroupName>{{ group.name }}</GroupName>
<GroupId>{{ group.id }}</GroupId>
<Arn>arn:aws:iam::123456789012:group/{{ group.path }}</Arn>
</member>
{% endfor %}
</Groups>
<IsTruncated>false</IsTruncated>
</ListGroupsResult>
<ResponseMetadata>
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
</ResponseMetadata>
</ListGroupsResponse>"""
LIST_GROUPS_FOR_USER_TEMPLATE = """<ListGroupsForUserResponse>
<ListGroupsForUserResult>
<Groups>
{% for group in groups %}
<member>
<Path>{{ group.path }}</Path>
<GroupName>{{ group.name }}</GroupName>
<GroupId>{{ group.id }}</GroupId>
<Arn>arn:aws:iam::123456789012:group/{{ group.path }}</Arn>
</member>
{% endfor %}
</Groups>
<IsTruncated>false</IsTruncated>
</ListGroupsForUserResult>
<ResponseMetadata>
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
</ResponseMetadata>
</ListGroupsForUserResponse>"""
USER_TEMPLATE = """<{{ action }}UserResponse>
<{{ action }}UserResult>
<User>
@ -640,4 +691,4 @@ LIST_INSTANCE_PROFILES_FOR_ROLE_TEMPLATE = """<ListInstanceProfilesForRoleRespon
<ResponseMetadata>
<RequestId>6a8c3992-99f4-11e1-a4c3-27EXAMPLE804</RequestId>
</ResponseMetadata>
</ListInstanceProfilesForRoleResponse>"""
</ListInstanceProfilesForRoleResponse>"""

View File

@ -106,9 +106,10 @@ class RecordSet(object):
class FakeZone(object):
def __init__(self, name, id_):
def __init__(self, name, id_, comment=None):
self.name = name
self.id = id_
self.comment = comment
self.rrsets = []
def add_rrset(self, record_set):
@ -116,9 +117,12 @@ class FakeZone(object):
self.rrsets.append(record_set)
return record_set
def delete_rrset(self, name):
def delete_rrset_by_name(self, name):
self.rrsets = [record_set for record_set in self.rrsets if record_set.name != name]
def delete_rrset_by_id(self, set_identifier):
self.rrsets = [record_set for record_set in self.rrsets if record_set.set_identifier != set_identifier]
def get_record_sets(self, type_filter, name_filter):
record_sets = list(self.rrsets) # Copy the list
if type_filter:
@ -170,9 +174,9 @@ class Route53Backend(BaseBackend):
self.zones = {}
self.health_checks = {}
def create_hosted_zone(self, name):
def create_hosted_zone(self, name, comment=None):
new_id = get_random_hex()
new_zone = FakeZone(name, new_id)
new_zone = FakeZone(name, new_id, comment=comment)
self.zones[new_id] = new_zone
return new_zone

View File

@ -9,7 +9,8 @@ def list_or_create_hostzone_response(request, full_url, headers):
if request.method == "POST":
elements = xmltodict.parse(request.body)
new_zone = route53_backend.create_hosted_zone(elements["CreateHostedZoneRequest"]["Name"])
comment = elements["CreateHostedZoneRequest"]["HostedZoneConfig"]["Comment"]
new_zone = route53_backend.create_hosted_zone(elements["CreateHostedZoneRequest"]["Name"], comment=comment)
template = Template(CREATE_HOSTED_ZONE_RESPONSE)
return 201, headers, template.render(zone=new_zone)
@ -57,7 +58,10 @@ def rrset_response(request, full_url, headers):
record_set['ResourceRecords'] = [x['Value'] for x in record_set['ResourceRecords'].values()]
the_zone.add_rrset(record_set)
elif action == "DELETE":
the_zone.delete_rrset(record_set["Name"])
if 'SetIdentifier' in record_set:
the_zone.delete_rrset_by_id(record_set["SetIdentifier"])
else:
the_zone.delete_rrset_by_name(record_set["Name"])
return 200, headers, CHANGE_RRSET_RESPONSE
@ -125,6 +129,9 @@ GET_HOSTED_ZONE_RESPONSE = """<GetHostedZoneResponse xmlns="https://route53.amaz
<Id>/hostedzone/{{ zone.id }}</Id>
<Name>{{ zone.name }}</Name>
<ResourceRecordSetCount>{{ zone.rrsets|count }}</ResourceRecordSetCount>
<Config>
<Comment>{{ zone.comment }}</Comment>
</Config>
</HostedZone>
<DelegationSet>
<NameServer>moto.test.com</NameServer>
@ -150,6 +157,9 @@ LIST_HOSTED_ZONES_RESPONSE = """<ListHostedZonesResponse xmlns="https://route53.
<HostedZone>
<Id>{{ zone.id }}</Id>
<Name>{{ zone.name }}</Name>
<Config>
<Comment>{{ zone.comment }}</Comment>
</Config>
<ResourceRecordSetCount>{{ zone.rrsets|count }}</ResourceRecordSetCount>
</HostedZone>
{% endfor %}

10
moto/sns/exceptions.py Normal file
View File

@ -0,0 +1,10 @@
from __future__ import unicode_literals
from moto.core.exceptions import RESTError
class SNSNotFoundError(RESTError):
code = 404
def __init__(self, message):
super(SNSNotFoundError, self).__init__(
"NotFound", message)

View File

@ -11,6 +11,7 @@ from moto.compat import OrderedDict
from moto.core import BaseBackend
from moto.core.utils import iso_8601_datetime_with_milliseconds
from moto.sqs import sqs_backends
from .exceptions import SNSNotFoundError
from .utils import make_arn_for_topic, make_arn_for_subscription
DEFAULT_ACCOUNT_ID = 123456789012
@ -93,10 +94,52 @@ class Subscription(object):
}
class PlatformApplication(object):
def __init__(self, region, name, platform, attributes):
self.region = region
self.name = name
self.platform = platform
self.attributes = attributes
@property
def arn(self):
return "arn:aws:sns:{region}:123456789012:app/{platform}/{name}".format(
region=self.region,
platform=self.platform,
name=self.name,
)
class PlatformEndpoint(object):
def __init__(self, region, application, custom_user_data, token, attributes):
self.region = region
self.application = application
self.custom_user_data = custom_user_data
self.token = token
self.attributes = attributes
self.id = uuid.uuid4()
@property
def arn(self):
return "arn:aws:sns:{region}:123456789012:endpoint/{platform}/{name}/{id}".format(
region=self.region,
platform=self.application.platform,
name=self.application.name,
id=self.id,
)
def publish(self, message):
message_id = six.text_type(uuid.uuid4())
# This is where we would actually send a message
return message_id
class SNSBackend(BaseBackend):
def __init__(self):
self.topics = OrderedDict()
self.subscriptions = OrderedDict()
self.applications = {}
self.platform_endpoints = {}
def create_topic(self, name):
topic = Topic(name, self)
@ -121,7 +164,10 @@ class SNSBackend(BaseBackend):
self.topics.pop(arn)
def get_topic(self, arn):
return self.topics[arn]
try:
return self.topics[arn]
except KeyError:
raise SNSNotFoundError("Topic with arn {0} not found".format(arn))
def set_topic_attribute(self, topic_arn, attribute_name, attribute_value):
topic = self.get_topic(topic_arn)
@ -144,11 +190,61 @@ class SNSBackend(BaseBackend):
else:
return self._get_values_nexttoken(self.subscriptions, next_token)
def publish(self, topic_arn, message):
topic = self.get_topic(topic_arn)
message_id = topic.publish(message)
def publish(self, arn, message):
try:
topic = self.get_topic(arn)
message_id = topic.publish(message)
except SNSNotFoundError:
endpoint = self.get_endpoint(arn)
message_id = endpoint.publish(message)
return message_id
def create_platform_application(self, region, name, platform, attributes):
application = PlatformApplication(region, name, platform, attributes)
self.applications[application.arn] = application
return application
def get_application(self, arn):
try:
return self.applications[arn]
except KeyError:
raise SNSNotFoundError("Application with arn {0} not found".format(arn))
def set_application_attributes(self, arn, attributes):
application = self.get_application(arn)
application.attributes.update(attributes)
return application
def list_platform_applications(self):
return self.applications.values()
def delete_platform_application(self, platform_arn):
self.applications.pop(platform_arn)
def create_platform_endpoint(self, region, application, custom_user_data, token, attributes):
platform_endpoint = PlatformEndpoint(region, application, custom_user_data, token, attributes)
self.platform_endpoints[platform_endpoint.arn] = platform_endpoint
return platform_endpoint
def list_endpoints_by_platform_application(self, application_arn):
return [
endpoint for endpoint
in self.platform_endpoints.values()
if endpoint.application.arn == application_arn
]
def get_endpoint(self, arn):
try:
return self.platform_endpoints[arn]
except KeyError:
raise SNSNotFoundError("Endpoint with arn {0} not found".format(arn))
def set_endpoint_attributes(self, arn, attributes):
endpoint = self.get_endpoint(arn)
endpoint.attributes.update(attributes)
return endpoint
sns_backends = {}
for region in boto.sns.regions():
sns_backends[region.name] = SNSBackend()

View File

@ -12,6 +12,14 @@ class SNSResponse(BaseResponse):
def backend(self):
return sns_backends[self.region]
def _get_attributes(self):
attributes = self._get_list_prefix('Attributes.entry')
return dict(
(attribute['key'], attribute['value'])
for attribute
in attributes
)
def create_topic(self):
name = self._get_param('Name')
topic = self.backend.create_topic(name)
@ -170,9 +178,11 @@ class SNSResponse(BaseResponse):
})
def publish(self):
target_arn = self._get_param('TargetArn')
topic_arn = self._get_param('TopicArn')
arn = target_arn if target_arn else topic_arn
message = self._get_param('Message')
message_id = self.backend.publish(topic_arn, message)
message_id = self.backend.publish(arn, message)
return json.dumps({
"PublishResponse": {
@ -185,19 +195,116 @@ class SNSResponse(BaseResponse):
}
})
def create_platform_application(self):
name = self._get_param('Name')
platform = self._get_param('Platform')
attributes = self._get_attributes()
platform_application = self.backend.create_platform_application(self.region, name, platform, attributes)
return json.dumps({
"CreatePlatformApplicationResponse": {
"CreatePlatformApplicationResult": {
"PlatformApplicationArn": platform_application.arn,
},
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937b",
}
}
})
def get_platform_application_attributes(self):
arn = self._get_param('PlatformApplicationArn')
application = self.backend.get_application(arn)
return json.dumps({
"GetPlatformApplicationAttributesResponse": {
"GetPlatformApplicationAttributesResult": {
"Attributes": application.attributes,
},
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937f",
}
}
})
def set_platform_application_attributes(self):
arn = self._get_param('PlatformApplicationArn')
attributes = self._get_attributes()
self.backend.set_application_attributes(arn, attributes)
return json.dumps({
"SetPlatformApplicationAttributesResponse": {
"ResponseMetadata": {
"RequestId": "384ac68d-3775-12df-8963-01868b7c937f",
}
}
})
def list_platform_applications(self):
applications = self.backend.list_platform_applications()
return json.dumps({
"ListPlatformApplicationsResponse": {
"ListPlatformApplicationsResult": {
"PlatformApplications": [{
"PlatformApplicationArn": application.arn,
"attributes": application.attributes,
} for application in applications],
"NextToken": None
},
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937c",
}
}
})
def delete_platform_application(self):
platform_arn = self._get_param('PlatformApplicationArn')
self.backend.delete_platform_application(platform_arn)
return json.dumps({
"DeletePlatformApplicationResponse": {
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937e",
}
}
})
def create_platform_endpoint(self):
application_arn = self._get_param('PlatformApplicationArn')
application = self.backend.get_application(application_arn)
custom_user_data = self._get_param('CustomUserData')
token = self._get_param('Token')
attributes = self._get_attributes()
platform_endpoint = self.backend.create_platform_endpoint(
self.region, application, custom_user_data, token, attributes)
return json.dumps({
"CreatePlatformEndpointResponse": {
"CreatePlatformEndpointResult": {
"EndpointArn": platform_endpoint.arn,
},
"ResponseMetadata": {
"RequestId": "384ac68d-3779-11df-8963-01868b7c937b",
}
}
})
def list_endpoints_by_platform_application(self):
application_arn = self._get_param('PlatformApplicationArn')
endpoints = self.backend.list_endpoints_by_platform_application(application_arn)
return json.dumps({
"ListEndpointsByPlatformApplicationResponse": {
"ListEndpointsByPlatformApplicationResult": {
"Endpoints": [
{
"Attributes": {
"Token": "TOKEN",
"Enabled": "true",
"CustomUserData": ""
},
"EndpointArn": "FAKE_ARN_ENDPOINT"
}
"Attributes": endpoint.attributes,
"EndpointArn": endpoint.arn,
} for endpoint in endpoints
],
"NextToken": None
},
@ -206,3 +313,32 @@ class SNSResponse(BaseResponse):
}
}
})
def get_endpoint_attributes(self):
arn = self._get_param('EndpointArn')
endpoint = self.backend.get_endpoint(arn)
return json.dumps({
"GetEndpointAttributesResponse": {
"GetEndpointAttributesResult": {
"Attributes": endpoint.attributes,
},
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937f",
}
}
})
def set_endpoint_attributes(self):
arn = self._get_param('EndpointArn')
attributes = self._get_attributes()
self.backend.set_endpoint_attributes(arn, attributes)
return json.dumps({
"SetEndpointAttributesResponse": {
"ResponseMetadata": {
"RequestId": "384bc68d-3775-12df-8963-01868b7c937f",
}
}
})

View File

@ -21,7 +21,7 @@ if sys.version_info < (2, 7):
setup(
name='moto',
version='0.4.1',
version='0.4.2',
description='A library that allows your python tests to easily'
' mock out the boto library',
author='Steve Pulec',

View File

@ -207,12 +207,8 @@ def test_stack_security_groups():
)
ec2_conn = boto.ec2.connect_to_region("us-west-1")
security_groups = ec2_conn.get_all_security_groups()
for group in security_groups:
if "InstanceSecurityGroup" in group.name:
instance_group = group
else:
other_group = group
instance_group = ec2_conn.get_all_security_groups(filters={'description': ['My security group']})[0]
other_group = ec2_conn.get_all_security_groups(filters={'description': ['My other group']})[0]
reservation = ec2_conn.get_all_instances()[0]
ec2_instance = reservation.instances[0]
@ -343,7 +339,7 @@ def test_vpc_single_instance_in_subnet():
eip.domain.should.equal('vpc')
eip.instance_id.should.equal(instance.id)
security_group = ec2_conn.get_all_security_groups()[0]
security_group = ec2_conn.get_all_security_groups(filters={'vpc_id': [vpc.id]})[0]
security_group.vpc_id.should.equal(vpc.id)
stack = conn.describe_stacks()[0]
@ -1018,7 +1014,7 @@ def test_vpc_peering_creation():
@mock_cloudformation
@mock_ec2
def test_security_group_ingress_separate_from_security_group_by_id():
def test_multiple_security_group_ingress_separate_from_security_group_by_id():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
@ -1078,7 +1074,6 @@ def test_security_group_ingress_separate_from_security_group_by_id():
security_group1.rules[0].to_port.should.equal('8080')
@mock_cloudformation
@mock_ec2
def test_security_group_ingress_separate_from_security_group_by_id():
@ -1194,3 +1189,32 @@ def test_security_group_ingress_separate_from_security_group_by_id_using_vpc():
security_group1.rules[0].ip_protocol.should.equal('tcp')
security_group1.rules[0].from_port.should.equal('80')
security_group1.rules[0].to_port.should.equal('8080')
@mock_cloudformation
@mock_ec2
def test_subnets_should_be_created_with_availability_zone():
vpc_conn = boto.vpc.connect_to_region('us-west-1')
vpc = vpc_conn.create_vpc("10.0.0.0/16")
subnet_template = {
"AWSTemplateFormatVersion" : "2010-09-09",
"Resources" : {
"testSubnet" : {
"Type" : "AWS::EC2::Subnet",
"Properties" : {
"VpcId" : vpc.id,
"CidrBlock" : "10.0.0.0/24",
"AvailabilityZone" : "us-west-1b",
}
}
}
}
cf_conn = boto.cloudformation.connect_to_region("us-west-1")
template_json = json.dumps(subnet_template)
cf_conn.create_stack(
"test_stack",
template_body=template_json,
)
subnet = vpc_conn.get_all_subnets(filters={'cidrBlock': '10.0.0.0/24'})[0]
subnet.availability_zone.should.equal('us-west-1b')

View File

@ -48,7 +48,7 @@ def test_create_table():
},
'TableName': 'messages',
'TableSizeBytes': 0,
'TableStatus': 'ACTIVE'
'TableStatus': 'ACTIVE',
}
}
conn.describe_table('messages').should.equal(expected)

View File

@ -7,10 +7,8 @@ from moto import mock_dynamodb2
from boto.exception import JSONResponseError
from tests.helpers import requires_boto_gte
try:
from boto.dynamodb2.fields import HashKey
from boto.dynamodb2.fields import RangeKey
from boto.dynamodb2.table import Table
from boto.dynamodb2.table import Item
from boto.dynamodb2.fields import GlobalAllIndex, HashKey, RangeKey
from boto.dynamodb2.table import Item, Table
from boto.dynamodb2.exceptions import ValidationException
except ImportError:
pass
@ -53,7 +51,8 @@ def test_create_table():
{'KeyType': 'HASH', 'AttributeName': 'forum_name'},
{'KeyType': 'RANGE', 'AttributeName': 'subject'}
],
'ItemCount': 0, 'CreationDateTime': 1326499200.0
'ItemCount': 0, 'CreationDateTime': 1326499200.0,
'GlobalSecondaryIndexes': [],
}
}
table.describe().should.equal(expected)
@ -445,3 +444,112 @@ def test_get_key_fields():
table = create_table()
kf = table.get_key_fields()
kf.should.equal(['forum_name', 'subject'])
@mock_dynamodb2
def test_create_with_global_indexes():
conn = boto.dynamodb2.layer1.DynamoDBConnection()
Table.create('messages', schema=[
HashKey('subject'),
RangeKey('version'),
], global_indexes=[
GlobalAllIndex('topic-created_at-index',
parts=[
HashKey('topic'),
RangeKey('created_at', data_type='N')
],
throughput={
'read': 6,
'write': 1
}
),
])
table_description = conn.describe_table("messages")
table_description['Table']["GlobalSecondaryIndexes"].should.equal([
{
"IndexName": "topic-created_at-index",
"KeySchema": [
{
"AttributeName": "topic",
"KeyType": "HASH"
},
{
"AttributeName": "created_at",
"KeyType": "RANGE"
},
],
"Projection": {
"ProjectionType": "ALL"
},
"ProvisionedThroughput": {
"ReadCapacityUnits": 6,
"WriteCapacityUnits": 1,
}
}
])
@mock_dynamodb2
def test_query_with_global_indexes():
table = Table.create('messages', schema=[
HashKey('subject'),
RangeKey('version'),
], global_indexes=[
GlobalAllIndex('topic-created_at-index',
parts=[
HashKey('topic'),
RangeKey('created_at', data_type='N')
],
throughput={
'read': 6,
'write': 1
}
),
GlobalAllIndex('status-created_at-index',
parts=[
HashKey('status'),
RangeKey('created_at', data_type='N')
],
throughput={
'read': 2,
'write': 1
}
)
])
item_data = {
'subject': 'Check this out!',
'version': '1',
'created_at': 0,
'status': 'inactive'
}
item = Item(table, item_data)
item.save(overwrite=True)
item['version'] = '2'
item.save(overwrite=True)
results = table.query(status__eq='active')
list(results).should.have.length_of(0)
@mock_dynamodb2
def test_lookup():
from decimal import Decimal
table = Table.create('messages', schema=[
HashKey('test_hash'),
RangeKey('test_range'),
], throughput={
'read': 10,
'write': 10,
})
hash_key = 3241526475
range_key = 1234567890987
data = {'test_hash': hash_key, 'test_range': range_key}
table.put_item(data=data)
message = table.lookup(hash_key, range_key)
message.get('test_hash').should.equal(Decimal(hash_key))
message.get('test_range').should.equal(Decimal(range_key))

View File

@ -43,7 +43,8 @@ def test_create_table():
'KeySchema': [
{'KeyType': 'HASH', 'AttributeName': 'forum_name'}
],
'ItemCount': 0, 'CreationDateTime': 1326499200.0
'ItemCount': 0, 'CreationDateTime': 1326499200.0,
'GlobalSecondaryIndexes': [],
}
}
conn = boto.dynamodb2.connect_to_region(

View File

@ -54,6 +54,7 @@ def test_instance_launch_and_terminate():
instances[0].id.should.equal(instance.id)
instances[0].state.should.equal('running')
instances[0].launch_time.should.equal("2014-01-01T05:00:00")
instances[0].vpc_id.should.equal(None)
root_device_name = instances[0].root_device_name
instances[0].block_device_mapping[root_device_name].status.should.equal('attached')
@ -155,7 +156,7 @@ def test_get_instances_filtering_by_instance_type():
reservations.should.have.length_of(2)
reservations[0].instances.should.have.length_of(1)
reservations[1].instances.should.have.length_of(1)
instance_ids = [ reservations[0].instances[0].id,
instance_ids = [ reservations[0].instances[0].id,
reservations[1].instances[0].id ]
set(instance_ids).should.equal(set([instance1.id, instance2.id]))
@ -311,7 +312,7 @@ def test_get_instances_filtering_by_tag_value():
reservations[0].instances[0].id.should.equal(instance1.id)
reservations[0].instances[1].id.should.equal(instance2.id)
reservations[0].instances[2].id.should.equal(instance3.id)
reservations = conn.get_all_instances(filters={'tag-value' : ['value2', 'bogus']})
# get_all_instances should return both instances with one of the acceptable tag values
reservations.should.have.length_of(1)

View File

@ -26,8 +26,9 @@ def test_create_and_describe_security_group():
cm.exception.request_id.should_not.be.none
all_groups = conn.get_all_security_groups()
all_groups.should.have.length_of(1)
all_groups[0].name.should.equal('test security group')
all_groups.should.have.length_of(2) # The default group gets created automatically
group_names = [group.name for group in all_groups]
set(group_names).should.equal(set(["default", "test security group"]))
@mock_ec2
@ -41,6 +42,14 @@ def test_create_security_group_without_description_raises_error():
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_default_security_group():
conn = boto.ec2.connect_to_region('us-east-1')
groups = conn.get_all_security_groups()
groups.should.have.length_of(1)
groups[0].name.should.equal("default")
@mock_ec2
def test_create_and_describe_vpc_security_group():
conn = boto.connect_ec2('the_key', 'the_secret')
@ -59,7 +68,7 @@ def test_create_and_describe_vpc_security_group():
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
all_groups = conn.get_all_security_groups()
all_groups = conn.get_all_security_groups(filters={'vpc_id': [vpc_id]})
all_groups[0].vpc_id.should.equal(vpc_id)
@ -78,9 +87,10 @@ def test_create_two_security_groups_with_same_name_in_different_vpc():
all_groups = conn.get_all_security_groups()
all_groups.should.have.length_of(2)
all_groups[0].name.should.equal('test security group')
all_groups[1].name.should.equal('test security group')
all_groups.should.have.length_of(3)
group_names = [group.name for group in all_groups]
# The default group is created automatically
set(group_names).should.equal(set(["default", "test security group"]))
@mock_ec2
@ -89,7 +99,7 @@ def test_deleting_security_groups():
security_group1 = conn.create_security_group('test1', 'test1')
conn.create_security_group('test2', 'test2')
conn.get_all_security_groups().should.have.length_of(2)
conn.get_all_security_groups().should.have.length_of(3) # We need to include the default security group
# Deleting a group that doesn't exist should throw an error
with assert_raises(EC2ResponseError) as cm:
@ -100,11 +110,11 @@ def test_deleting_security_groups():
# Delete by name
conn.delete_security_group('test2')
conn.get_all_security_groups().should.have.length_of(1)
conn.get_all_security_groups().should.have.length_of(2)
# Delete by group id
conn.delete_security_group(group_id=security_group1.id)
conn.get_all_security_groups().should.have.length_of(0)
conn.get_all_security_groups().should.have.length_of(1)
@mock_ec2
@ -125,7 +135,7 @@ def test_authorize_ip_range_and_revoke():
success = security_group.authorize(ip_protocol="tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.123/32")
assert success.should.be.true
security_group = conn.get_all_security_groups()[0]
security_group = conn.get_all_security_groups(groupnames=['test'])[0]
int(security_group.rules[0].to_port).should.equal(2222)
security_group.rules[0].grants[0].cidr_ip.should.equal("123.123.123.123/32")
@ -220,7 +230,7 @@ def test_get_all_security_groups():
resp[0].id.should.equal(sg1.id)
resp = conn.get_all_security_groups()
resp.should.have.length_of(2)
resp.should.have.length_of(3) # We need to include the default group here
@mock_ec2

View File

@ -4,6 +4,7 @@ import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
import boto.vpc
from boto.exception import EC2ResponseError
import sure # noqa
@ -61,13 +62,21 @@ def test_subnet_tagging():
@mock_ec2
def test_get_subnets_filtering():
conn = boto.connect_vpc('the_key', 'the_secret')
def test_subnet_should_have_proper_availability_zone_set():
conn = boto.vpc.connect_to_region('us-west-1')
vpcA = conn.create_vpc("10.0.0.0/16")
subnetA = conn.create_subnet(vpcA.id, "10.0.0.0/24")
subnetA = conn.create_subnet(vpcA.id, "10.0.0.0/24", availability_zone='us-west-1b')
subnetA.availability_zone.should.equal('us-west-1b')
@mock_ec2
def test_get_subnets_filtering():
conn = boto.vpc.connect_to_region('us-west-1')
vpcA = conn.create_vpc("10.0.0.0/16")
subnetA = conn.create_subnet(vpcA.id, "10.0.0.0/24", availability_zone='us-west-1a')
vpcB = conn.create_vpc("10.0.0.0/16")
subnetB1 = conn.create_subnet(vpcB.id, "10.0.0.0/24")
subnetB2 = conn.create_subnet(vpcB.id, "10.0.1.0/24")
subnetB1 = conn.create_subnet(vpcB.id, "10.0.0.0/24", availability_zone='us-west-1a')
subnetB2 = conn.create_subnet(vpcB.id, "10.0.1.0/24", availability_zone='us-west-1b')
all_subnets = conn.get_all_subnets()
all_subnets.should.have.length_of(3)
@ -100,5 +109,10 @@ def test_get_subnets_filtering():
subnets_by_id.should.have.length_of(1)
set([subnet.id for subnet in subnets_by_id]).should.equal(set([subnetA.id]))
# Filter by availabilityZone
subnets_by_az = conn.get_all_subnets(filters={'availabilityZone': 'us-west-1a', 'vpc-id': vpcB.id})
subnets_by_az.should.have.length_of(1)
set([subnet.id for subnet in subnets_by_az]).should.equal(set([subnetB1.id]))
# Unsupported filter
conn.get_all_subnets.when.called_with(filters={'not-implemented-filter': 'foobar'}).should.throw(NotImplementedError)

View File

@ -1,6 +1,6 @@
from __future__ import unicode_literals
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
import tests.backport_assert_raises # flake8: noqa
from nose.tools import assert_raises
import boto
@ -41,13 +41,13 @@ def test_vpc_defaults():
conn.get_all_vpcs().should.have.length_of(1)
conn.get_all_route_tables().should.have.length_of(1)
conn.get_all_security_groups().should.have.length_of(1)
conn.get_all_security_groups(filters={'vpc-id': [vpc.id]}).should.have.length_of(1)
vpc.delete()
conn.get_all_vpcs().should.have.length_of(0)
conn.get_all_route_tables().should.have.length_of(0)
conn.get_all_security_groups().should.have.length_of(0)
conn.get_all_security_groups(filters={'vpc-id': [vpc.id]}).should.have.length_of(0)
@mock_ec2
@ -72,7 +72,7 @@ def test_vpc_get_by_id():
conn = boto.connect_vpc()
vpc1 = conn.create_vpc("10.0.0.0/16")
vpc2 = conn.create_vpc("10.0.0.0/16")
vpc3 = conn.create_vpc("10.0.0.0/16")
conn.create_vpc("10.0.0.0/16")
vpcs = conn.get_all_vpcs(vpc_ids=[vpc1.id, vpc2.id])
vpcs.should.have.length_of(2)
@ -86,7 +86,7 @@ def test_vpc_get_by_cidr_block():
conn = boto.connect_vpc()
vpc1 = conn.create_vpc("10.0.0.0/16")
vpc2 = conn.create_vpc("10.0.0.0/16")
vpc3 = conn.create_vpc("10.0.0.0/24")
conn.create_vpc("10.0.0.0/24")
vpcs = conn.get_all_vpcs(filters={'cidr': '10.0.0.0/16'})
vpcs.should.have.length_of(2)
@ -101,7 +101,7 @@ def test_vpc_get_by_dhcp_options_id():
dhcp_options = conn.create_dhcp_options(SAMPLE_DOMAIN_NAME, SAMPLE_NAME_SERVERS)
vpc1 = conn.create_vpc("10.0.0.0/16")
vpc2 = conn.create_vpc("10.0.0.0/16")
vpc3 = conn.create_vpc("10.0.0.0/24")
conn.create_vpc("10.0.0.0/24")
conn.associate_dhcp_options(dhcp_options.id, vpc1.id)
conn.associate_dhcp_options(dhcp_options.id, vpc2.id)
@ -196,7 +196,7 @@ def test_vpc_get_by_tag_value_subset():
conn = boto.connect_vpc()
vpc1 = conn.create_vpc("10.0.0.0/16")
vpc2 = conn.create_vpc("10.0.0.0/16")
vpc3 = conn.create_vpc("10.0.0.0/24")
conn.create_vpc("10.0.0.0/24")
vpc1.add_tag('Name', 'TestVPC')
vpc1.add_tag('Key', 'TestVPC2')

View File

@ -2,6 +2,11 @@ from __future__ import unicode_literals
import boto
import boto.ec2.elb
from boto.ec2.elb import HealthCheck
from boto.ec2.elb.attributes import (
ConnectionSettingAttribute,
ConnectionDrainingAttribute,
AccessLogAttribute,
)
import sure # noqa
from moto import mock_elb, mock_ec2
@ -43,6 +48,24 @@ def test_create_elb_in_multiple_region():
list(west1_conn.get_all_load_balancers()).should.have.length_of(1)
list(west2_conn.get_all_load_balancers()).should.have.length_of(1)
@mock_elb
def test_create_load_balancer_with_certificate():
conn = boto.connect_elb()
zones = ['us-east-1a']
ports = [(443, 8443, 'https', 'arn:aws:iam:123456789012:server-certificate/test-cert')]
conn.create_load_balancer('my-lb', zones, ports)
balancers = conn.get_all_load_balancers()
balancer = balancers[0]
balancer.name.should.equal("my-lb")
set(balancer.availability_zones).should.equal(set(['us-east-1a']))
listener = balancer.listeners[0]
listener.load_balancer_port.should.equal(443)
listener.instance_port.should.equal(8443)
listener.protocol.should.equal("HTTPS")
listener.ssl_certificate_id.should.equal('arn:aws:iam:123456789012:server-certificate/test-cert')
@mock_elb
def test_add_listener():
@ -193,3 +216,131 @@ def test_deregister_instances():
balancer.instances.should.have.length_of(1)
balancer.instances[0].id.should.equal(instance_id2)
@mock_elb
def test_default_attributes():
conn = boto.connect_elb()
ports = [(80, 8080, 'http'), (443, 8443, 'tcp')]
lb = conn.create_load_balancer('my-lb', [], ports)
attributes = lb.get_attributes()
attributes.cross_zone_load_balancing.enabled.should.be.false
attributes.connection_draining.enabled.should.be.false
attributes.access_log.enabled.should.be.false
attributes.connecting_settings.idle_timeout.should.equal(60)
@mock_elb
def test_cross_zone_load_balancing_attribute():
conn = boto.connect_elb()
ports = [(80, 8080, 'http'), (443, 8443, 'tcp')]
lb = conn.create_load_balancer('my-lb', [], ports)
conn.modify_lb_attribute("my-lb", "CrossZoneLoadBalancing", True)
attributes = lb.get_attributes(force=True)
attributes.cross_zone_load_balancing.enabled.should.be.true
conn.modify_lb_attribute("my-lb", "CrossZoneLoadBalancing", False)
attributes = lb.get_attributes(force=True)
attributes.cross_zone_load_balancing.enabled.should.be.false
@mock_elb
def test_connection_draining_attribute():
conn = boto.connect_elb()
ports = [(80, 8080, 'http'), (443, 8443, 'tcp')]
lb = conn.create_load_balancer('my-lb', [], ports)
connection_draining = ConnectionDrainingAttribute()
connection_draining.enabled = True
connection_draining.timeout = 60
conn.modify_lb_attribute("my-lb", "ConnectionDraining", connection_draining)
attributes = lb.get_attributes(force=True)
attributes.connection_draining.enabled.should.be.true
attributes.connection_draining.timeout.should.equal(60)
connection_draining.timeout = 30
conn.modify_lb_attribute("my-lb", "ConnectionDraining", connection_draining)
attributes = lb.get_attributes(force=True)
attributes.connection_draining.timeout.should.equal(30)
connection_draining.enabled = False
conn.modify_lb_attribute("my-lb", "ConnectionDraining", connection_draining)
attributes = lb.get_attributes(force=True)
attributes.connection_draining.enabled.should.be.false
@mock_elb
def test_access_log_attribute():
conn = boto.connect_elb()
ports = [(80, 8080, 'http'), (443, 8443, 'tcp')]
lb = conn.create_load_balancer('my-lb', [], ports)
access_log = AccessLogAttribute()
access_log.enabled = True
access_log.s3_bucket_name = 'bucket'
access_log.s3_bucket_prefix = 'prefix'
access_log.emit_interval = 60
conn.modify_lb_attribute("my-lb", "AccessLog", access_log)
attributes = lb.get_attributes(force=True)
attributes.access_log.enabled.should.be.true
attributes.access_log.s3_bucket_name.should.equal("bucket")
attributes.access_log.s3_bucket_prefix.should.equal("prefix")
attributes.access_log.emit_interval.should.equal(60)
access_log.enabled = False
conn.modify_lb_attribute("my-lb", "AccessLog", access_log)
attributes = lb.get_attributes(force=True)
attributes.access_log.enabled.should.be.false
@mock_elb
def test_connection_settings_attribute():
conn = boto.connect_elb()
ports = [(80, 8080, 'http'), (443, 8443, 'tcp')]
lb = conn.create_load_balancer('my-lb', [], ports)
connection_settings = ConnectionSettingAttribute(conn)
connection_settings.idle_timeout = 120
conn.modify_lb_attribute("my-lb", "ConnectingSettings", connection_settings)
attributes = lb.get_attributes(force=True)
attributes.connecting_settings.idle_timeout.should.equal(120)
connection_settings.idle_timeout = 60
conn.modify_lb_attribute("my-lb", "ConnectingSettings", connection_settings)
attributes = lb.get_attributes(force=True)
attributes.connecting_settings.idle_timeout.should.equal(60)
@mock_ec2
@mock_elb
def test_describe_instance_health():
ec2_conn = boto.connect_ec2()
reservation = ec2_conn.run_instances('ami-1234abcd', 2)
instance_id1 = reservation.instances[0].id
instance_id2 = reservation.instances[1].id
conn = boto.connect_elb()
zones = ['us-east-1a', 'us-east-1b']
ports = [(80, 8080, 'http'), (443, 8443, 'tcp')]
lb = conn.create_load_balancer('my-lb', zones, ports)
instances_health = conn.describe_instance_health('my-lb')
instances_health.should.be.empty
lb.register_instances([instance_id1, instance_id2])
instances_health = conn.describe_instance_health('my-lb')
instances_health.should.have.length_of(2)
for instance_health in instances_health:
instance_health.instance_id.should.be.within([instance_id1, instance_id2])
instance_health.state.should.equal('InService')
instances_health = conn.describe_instance_health('my-lb', [instance_id1])
instances_health.should.have.length_of(1)
instances_health[0].instance_id.should.equal(instance_id1)
instances_health[0].state.should.equal('InService')

View File

@ -1,7 +1,6 @@
from __future__ import unicode_literals
import boto
import sure # noqa
import re
from nose.tools import assert_raises, assert_equals, assert_not_equals
from boto.exception import BotoServerError
@ -62,6 +61,7 @@ def test_create_role_and_instance_profile():
conn.list_roles().roles[0].role_name.should.equal('my-role')
conn.list_instance_profiles().instance_profiles[0].instance_profile_name.should.equal("my-profile")
@mock_iam()
def test_list_instance_profiles_for_role():
conn = boto.connect_iam()
@ -71,15 +71,15 @@ def test_list_instance_profiles_for_role():
profile_name_list = ['my-profile', 'my-profile2']
profile_path_list = ['my-path', 'my-path2']
for profile_count in range(0,2):
for profile_count in range(0, 2):
conn.create_instance_profile(profile_name_list[profile_count], path=profile_path_list[profile_count])
for profile_count in range(0,2):
for profile_count in range(0, 2):
conn.add_role_to_instance_profile(profile_name_list[profile_count], "my-role")
profile_dump = conn.list_instance_profiles_for_role(role_name="my-role")
profile_list = profile_dump['list_instance_profiles_for_role_response']['list_instance_profiles_for_role_result']['instance_profiles']
for profile_count in range(0,len(profile_list)):
for profile_count in range(0, len(profile_list)):
profile_name_list.remove(profile_list[profile_count]["instance_profile_name"])
profile_path_list.remove(profile_list[profile_count]["path"])
profile_list[profile_count]["roles"]["member"]["role_name"].should.equal("my-role")
@ -91,6 +91,7 @@ def test_list_instance_profiles_for_role():
profile_list = profile_dump2['list_instance_profiles_for_role_response']['list_instance_profiles_for_role_result']['instance_profiles']
len(profile_list).should.equal(0)
@mock_iam()
def test_list_role_policies():
conn = boto.connect_iam()
@ -118,23 +119,6 @@ def test_update_assume_role_policy():
role.assume_role_policy_document.should.equal("my-policy")
@mock_iam()
def test_create_group():
conn = boto.connect_iam()
conn.create_group('my-group')
with assert_raises(BotoServerError):
conn.create_group('my-group')
@mock_iam()
def test_get_group():
conn = boto.connect_iam()
conn.create_group('my-group')
conn.get_group('my-group')
with assert_raises(BotoServerError):
conn.get_group('not-group')
@mock_iam()
def test_create_user():
conn = boto.connect_iam()
@ -163,31 +147,6 @@ def test_create_login_profile():
conn.create_login_profile('my-user', 'my-pass')
@mock_iam()
def test_add_user_to_group():
conn = boto.connect_iam()
with assert_raises(BotoServerError):
conn.add_user_to_group('my-group', 'my-user')
conn.create_group('my-group')
with assert_raises(BotoServerError):
conn.add_user_to_group('my-group', 'my-user')
conn.create_user('my-user')
conn.add_user_to_group('my-group', 'my-user')
@mock_iam()
def test_remove_user_from_group():
conn = boto.connect_iam()
with assert_raises(BotoServerError):
conn.remove_user_from_group('my-group', 'my-user')
conn.create_group('my-group')
conn.create_user('my-user')
with assert_raises(BotoServerError):
conn.remove_user_from_group('my-group', 'my-user')
conn.add_user_to_group('my-group', 'my-user')
conn.remove_user_from_group('my-group', 'my-user')
@mock_iam()
def test_create_access_key():
conn = boto.connect_iam()
@ -230,6 +189,7 @@ def test_delete_user():
conn.create_user('my-user')
conn.delete_user('my-user')
@mock_iam()
def test_generate_credential_report():
conn = boto.connect_iam()
@ -238,6 +198,7 @@ def test_generate_credential_report():
result = conn.generate_credential_report()
result['generate_credential_report_response']['generate_credential_report_result']['state'].should.equal('COMPLETE')
@mock_iam()
def test_get_credential_report():
conn = boto.connect_iam()
@ -249,4 +210,4 @@ def test_get_credential_report():
result = conn.generate_credential_report()
result = conn.get_credential_report()
report = base64.b64decode(result['get_credential_report_response']['get_credential_report_result']['content'].encode('ascii')).decode('ascii')
report.should.match(r'.*my-user.*')
report.should.match(r'.*my-user.*')

View File

@ -0,0 +1,72 @@
from __future__ import unicode_literals
import boto
import sure # noqa
from nose.tools import assert_raises
from boto.exception import BotoServerError
from moto import mock_iam
@mock_iam()
def test_create_group():
conn = boto.connect_iam()
conn.create_group('my-group')
with assert_raises(BotoServerError):
conn.create_group('my-group')
@mock_iam()
def test_get_group():
conn = boto.connect_iam()
conn.create_group('my-group')
conn.get_group('my-group')
with assert_raises(BotoServerError):
conn.get_group('not-group')
@mock_iam()
def test_get_all_groups():
conn = boto.connect_iam()
conn.create_group('my-group1')
conn.create_group('my-group2')
groups = conn.get_all_groups()['list_groups_response']['list_groups_result']['groups']
groups.should.have.length_of(2)
@mock_iam()
def test_add_user_to_group():
conn = boto.connect_iam()
with assert_raises(BotoServerError):
conn.add_user_to_group('my-group', 'my-user')
conn.create_group('my-group')
with assert_raises(BotoServerError):
conn.add_user_to_group('my-group', 'my-user')
conn.create_user('my-user')
conn.add_user_to_group('my-group', 'my-user')
@mock_iam()
def test_remove_user_from_group():
conn = boto.connect_iam()
with assert_raises(BotoServerError):
conn.remove_user_from_group('my-group', 'my-user')
conn.create_group('my-group')
conn.create_user('my-user')
with assert_raises(BotoServerError):
conn.remove_user_from_group('my-group', 'my-user')
conn.add_user_to_group('my-group', 'my-user')
conn.remove_user_from_group('my-group', 'my-user')
@mock_iam()
def test_get_groups_for_user():
conn = boto.connect_iam()
conn.create_group('my-group1')
conn.create_group('my-group2')
conn.create_group('other-group')
conn.create_user('my-user')
conn.add_user_to_group('my-group1', 'my-user')
conn.add_user_to_group('my-group2', 'my-user')
groups = conn.get_groups_for_user('my-user')['list_groups_for_user_response']['list_groups_for_user_result']['groups']
groups.should.have.length_of(2)

View File

@ -166,3 +166,40 @@ def test_use_health_check_in_resource_record_set():
record_sets = conn.get_all_rrsets(zone_id)
record_sets[0].health_check.should.equal(check_id)
@mock_route53
def test_hosted_zone_comment_preserved():
conn = boto.connect_route53('the_key', 'the_secret')
firstzone = conn.create_hosted_zone("testdns.aws.com.", comment="test comment")
zone_id = firstzone["CreateHostedZoneResponse"]["HostedZone"]["Id"].split("/")[-1]
hosted_zone = conn.get_hosted_zone(zone_id)
hosted_zone["GetHostedZoneResponse"]["HostedZone"]["Config"]["Comment"].should.equal("test comment")
hosted_zones = conn.get_all_hosted_zones()
hosted_zones["ListHostedZonesResponse"]["HostedZones"][0]["Config"]["Comment"].should.equal("test comment")
zone = conn.get_zone("testdns.aws.com.")
zone.config["Comment"].should.equal("test comment")
@mock_route53
def test_deleting_weighted_route():
conn = boto.connect_route53()
conn.create_hosted_zone("testdns.aws.com.")
zone = conn.get_zone("testdns.aws.com.")
zone.add_cname("cname.testdns.aws.com", "example.com", identifier=('success-test-foo', '50'))
zone.add_cname("cname.testdns.aws.com", "example.com", identifier=('success-test-bar', '50'))
cnames = zone.get_cname('cname.testdns.aws.com.', all=True)
cnames.should.have.length_of(2)
foo_cname = [cname for cname in cnames if cname.identifier == 'success-test-foo'][0]
zone.delete_record(foo_cname)
cname = zone.get_cname('cname.testdns.aws.com.', all=True)
# When get_cname only had one result, it returns just that result instead of a list.
cname.identifier.should.equal('success-test-bar')

View File

@ -1,16 +1,245 @@
from __future__ import unicode_literals
import boto
import boto
from boto.exception import BotoServerError
from moto import mock_sns
import sure # noqa
@mock_sns
def test_create_platform_application():
conn = boto.connect_sns()
platform_application = conn.create_platform_application(
name="my-application",
platform="APNS",
attributes={
"PlatformCredential": "platform_credential",
"PlatformPrincipal": "platform_principal",
},
)
application_arn = platform_application['CreatePlatformApplicationResponse']['CreatePlatformApplicationResult']['PlatformApplicationArn']
application_arn.should.equal('arn:aws:sns:us-east-1:123456789012:app/APNS/my-application')
@mock_sns
def test_get_platform_application_attributes():
conn = boto.connect_sns()
platform_application = conn.create_platform_application(
name="my-application",
platform="APNS",
attributes={
"PlatformCredential": "platform_credential",
"PlatformPrincipal": "platform_principal",
},
)
arn = platform_application['CreatePlatformApplicationResponse']['CreatePlatformApplicationResult']['PlatformApplicationArn']
attributes = conn.get_platform_application_attributes(arn)['GetPlatformApplicationAttributesResponse']['GetPlatformApplicationAttributesResult']['Attributes']
attributes.should.equal({
"PlatformCredential": "platform_credential",
"PlatformPrincipal": "platform_principal",
})
@mock_sns
def test_get_missing_platform_application_attributes():
conn = boto.connect_sns()
conn.get_platform_application_attributes.when.called_with("a-fake-arn").should.throw(BotoServerError)
@mock_sns
def test_set_platform_application_attributes():
conn = boto.connect_sns()
platform_application = conn.create_platform_application(
name="my-application",
platform="APNS",
attributes={
"PlatformCredential": "platform_credential",
"PlatformPrincipal": "platform_principal",
},
)
arn = platform_application['CreatePlatformApplicationResponse']['CreatePlatformApplicationResult']['PlatformApplicationArn']
conn.set_platform_application_attributes(arn,
{"PlatformPrincipal": "other"}
)
attributes = conn.get_platform_application_attributes(arn)['GetPlatformApplicationAttributesResponse']['GetPlatformApplicationAttributesResult']['Attributes']
attributes.should.equal({
"PlatformCredential": "platform_credential",
"PlatformPrincipal": "other",
})
@mock_sns
def test_list_platform_applications():
conn = boto.connect_sns()
conn.create_platform_application(
name="application1",
platform="APNS",
)
conn.create_platform_application(
name="application2",
platform="APNS",
)
applications_repsonse = conn.list_platform_applications()
applications = applications_repsonse['ListPlatformApplicationsResponse']['ListPlatformApplicationsResult']['PlatformApplications']
applications.should.have.length_of(2)
@mock_sns
def test_delete_platform_application():
conn = boto.connect_sns()
conn.create_platform_application(
name="application1",
platform="APNS",
)
conn.create_platform_application(
name="application2",
platform="APNS",
)
applications_repsonse = conn.list_platform_applications()
applications = applications_repsonse['ListPlatformApplicationsResponse']['ListPlatformApplicationsResult']['PlatformApplications']
applications.should.have.length_of(2)
application_arn = applications[0]['PlatformApplicationArn']
conn.delete_platform_application(application_arn)
applications_repsonse = conn.list_platform_applications()
applications = applications_repsonse['ListPlatformApplicationsResponse']['ListPlatformApplicationsResult']['PlatformApplications']
applications.should.have.length_of(1)
@mock_sns
def test_create_platform_endpoint():
conn = boto.connect_sns()
platform_application = conn.create_platform_application(
name="my-application",
platform="APNS",
)
application_arn = platform_application['CreatePlatformApplicationResponse']['CreatePlatformApplicationResult']['PlatformApplicationArn']
endpoint = conn.create_platform_endpoint(
platform_application_arn=application_arn,
token="some_unique_id",
custom_user_data="some user data",
attributes={
"Enabled": False,
},
)
endpoint_arn = endpoint['CreatePlatformEndpointResponse']['CreatePlatformEndpointResult']['EndpointArn']
endpoint_arn.should.contain("arn:aws:sns:us-east-1:123456789012:endpoint/APNS/my-application/")
@mock_sns
def test_get_list_endpoints_by_platform_application():
conn = boto.connect_sns()
platform_application = conn.create_platform_application(
name="my-application",
platform="APNS",
)
application_arn = platform_application['CreatePlatformApplicationResponse']['CreatePlatformApplicationResult']['PlatformApplicationArn']
endpoint = conn.create_platform_endpoint(
platform_application_arn=application_arn,
token="some_unique_id",
custom_user_data="some user data",
attributes={
"CustomUserData": "some data",
},
)
endpoint_arn = endpoint['CreatePlatformEndpointResponse']['CreatePlatformEndpointResult']['EndpointArn']
endpoint_list = conn.list_endpoints_by_platform_application(
platform_application_arn='fake_arn'
platform_application_arn=application_arn
)['ListEndpointsByPlatformApplicationResponse']['ListEndpointsByPlatformApplicationResult']['Endpoints']
endpoint_list.should.have.length_of(1)
endpoint_list[0]['Attributes']['Enabled'].should.equal('true')
endpoint_list[0]['EndpointArn'].should.equal('FAKE_ARN_ENDPOINT')
endpoint_list[0]['Attributes']['CustomUserData'].should.equal('some data')
endpoint_list[0]['EndpointArn'].should.equal(endpoint_arn)
@mock_sns
def test_get_endpoint_attributes():
conn = boto.connect_sns()
platform_application = conn.create_platform_application(
name="my-application",
platform="APNS",
)
application_arn = platform_application['CreatePlatformApplicationResponse']['CreatePlatformApplicationResult']['PlatformApplicationArn']
endpoint = conn.create_platform_endpoint(
platform_application_arn=application_arn,
token="some_unique_id",
custom_user_data="some user data",
attributes={
"Enabled": False,
"CustomUserData": "some data",
},
)
endpoint_arn = endpoint['CreatePlatformEndpointResponse']['CreatePlatformEndpointResult']['EndpointArn']
attributes = conn.get_endpoint_attributes(endpoint_arn)['GetEndpointAttributesResponse']['GetEndpointAttributesResult']['Attributes']
attributes.should.equal({
"Enabled": 'False',
"CustomUserData": "some data",
})
@mock_sns
def test_get_missing_endpoint_attributes():
conn = boto.connect_sns()
conn.get_endpoint_attributes.when.called_with("a-fake-arn").should.throw(BotoServerError)
@mock_sns
def test_set_endpoint_attributes():
conn = boto.connect_sns()
platform_application = conn.create_platform_application(
name="my-application",
platform="APNS",
)
application_arn = platform_application['CreatePlatformApplicationResponse']['CreatePlatformApplicationResult']['PlatformApplicationArn']
endpoint = conn.create_platform_endpoint(
platform_application_arn=application_arn,
token="some_unique_id",
custom_user_data="some user data",
attributes={
"Enabled": False,
"CustomUserData": "some data",
},
)
endpoint_arn = endpoint['CreatePlatformEndpointResponse']['CreatePlatformEndpointResult']['EndpointArn']
conn.set_endpoint_attributes(endpoint_arn,
{"CustomUserData": "other data"}
)
attributes = conn.get_endpoint_attributes(endpoint_arn)['GetEndpointAttributesResponse']['GetEndpointAttributesResult']['Attributes']
attributes.should.equal({
"Enabled": 'False',
"CustomUserData": "other data",
})
@mock_sns
def test_publish_to_platform_endpoint():
conn = boto.connect_sns()
platform_application = conn.create_platform_application(
name="my-application",
platform="APNS",
)
application_arn = platform_application['CreatePlatformApplicationResponse']['CreatePlatformApplicationResult']['PlatformApplicationArn']
endpoint = conn.create_platform_endpoint(
platform_application_arn=application_arn,
token="some_unique_id",
custom_user_data="some user data",
attributes={
"Enabled": False,
},
)
endpoint_arn = endpoint['CreatePlatformEndpointResponse']['CreatePlatformEndpointResult']['EndpointArn']
conn.publish(message="some message", message_structure="json", target_arn=endpoint_arn)

View File

@ -4,6 +4,7 @@ import six
import sure # noqa
from boto.exception import BotoServerError
from moto import mock_sns
from moto.sns.models import DEFAULT_TOPIC_POLICY, DEFAULT_EFFECTIVE_DELIVERY_POLICY, DEFAULT_PAGE_SIZE
@ -27,6 +28,12 @@ def test_create_and_delete_topic():
topics.should.have.length_of(0)
@mock_sns
def test_get_missing_topic():
conn = boto.connect_sns()
conn.get_topic_attributes.when.called_with("a-fake-arn").should.throw(BotoServerError)
@mock_sns
def test_create_topic_in_multiple_regions():
west1_conn = boto.sns.connect_to_region("us-west-1")