Merge branch 'joekiller-master'

This commit is contained in:
Steve Pulec 2014-10-05 21:55:31 -04:00
commit ec5f5dc2a2
7 changed files with 530 additions and 78 deletions

View File

@ -64,9 +64,9 @@ class BaseResponse(object):
querystring[key] = [value, ]
if not querystring:
querystring.update(parse_qs(urlparse(full_url).query))
querystring.update(parse_qs(urlparse(full_url).query, keep_blank_values=True))
if not querystring:
querystring.update(parse_qs(self.body))
querystring.update(parse_qs(self.body, keep_blank_values=True))
if not querystring:
querystring.update(headers)

View File

@ -238,6 +238,13 @@ class InvalidParameterValueError(EC2ClientError):
.format(parameter_value))
class InvalidParameterValueErrorTagNull(EC2ClientError):
def __init__(self):
super(InvalidParameterValueErrorTagNull, self).__init__(
"InvalidParameterValue",
"Tag value cannot be null. Use empty string instead.")
class InvalidInternetGatewayIdError(EC2ClientError):
def __init__(self, internet_gateway_id):
super(InvalidInternetGatewayIdError, self).__init__(
@ -262,6 +269,21 @@ class ResourceAlreadyAssociatedError(EC2ClientError):
.format(resource_id))
class TagLimitExceeded(EC2ClientError):
def __init__(self):
super(TagLimitExceeded, self).__init__(
"TagLimitExceeded",
"The maximum number of Tags for a resource has been reached.")
class InvalidID(EC2ClientError):
def __init__(self, resource_id):
super(InvalidID, self).__init__(
"InvalidID",
"The ID '{0}' is not valid"
.format(resource_id))
ERROR_RESPONSE = u"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Errors>

View File

@ -17,6 +17,7 @@ from .exceptions import (
DependencyViolationError,
MissingParameterError,
InvalidParameterValueError,
InvalidParameterValueErrorTagNull,
InvalidDHCPOptionsIdError,
MalformedDHCPOptionsIdError,
InvalidKeyPairNameError,
@ -45,9 +46,13 @@ from .exceptions import (
InvalidAllocationIdError,
InvalidAssociationIdError,
InvalidVPCPeeringConnectionIdError,
InvalidVPCPeeringConnectionStateTransitionError
InvalidVPCPeeringConnectionStateTransitionError,
TagLimitExceeded,
InvalidID
)
from .utils import (
EC2_RESOURCE_TO_PREFIX,
EC2_PREFIX_TO_RESOURCE,
random_ami_id,
random_dhcp_option_id,
random_eip_allocation_id,
@ -70,7 +75,17 @@ from .utils import (
random_volume_id,
random_vpc_id,
random_vpc_peering_connection_id,
generic_filter)
generic_filter,
is_valid_resource_id,
get_prefix,
simple_aws_filter_to_re)
def validate_resource_ids(resource_ids):
for resource_id in resource_ids:
if not is_valid_resource_id(resource_id):
raise InvalidID(resource_id=resource_id)
return True
class InstanceState(object):
@ -79,9 +94,9 @@ class InstanceState(object):
self.code = code
class TaggedEC2Instance(object):
class TaggedEC2Resource(object):
def get_tags(self, *args, **kwargs):
tags = ec2_backend.describe_tags(self.id)
tags = ec2_backend.describe_tags(filters={'resource-id': [self.id]})
return tags
def get_filter_value(self, filter_name):
@ -235,7 +250,7 @@ class NetworkInterfaceBackend(object):
eni._group_set = [group]
class Instance(BotoInstance, TaggedEC2Instance):
class Instance(BotoInstance, TaggedEC2Resource):
def __init__(self, image_id, user_data, security_groups, **kwargs):
super(Instance, self).__init__()
self.id = random_instance_id()
@ -319,7 +334,7 @@ class Instance(BotoInstance, TaggedEC2Instance):
self._state.code = 16
def get_tags(self):
tags = ec2_backend.describe_tags(self.id)
tags = ec2_backend.describe_tags(filters={'resource-id': [self.id]})
return tags
@property
@ -563,36 +578,130 @@ class KeyPairBackend(object):
class TagBackend(object):
VALID_TAG_FILTERS = ['key',
'resource-id',
'resource-type',
'value']
VALID_TAG_RESOURCE_FILTER_TYPES = ['customer-gateway',
'dhcp-options',
'image',
'instance',
'internet-gateway',
'network-acl',
'network-interface',
'reserved-instances',
'route-table',
'security-group',
'snapshot',
'spot-instances-request',
'subnet',
'volume',
'vpc',
'vpc-peering-connection'
'vpn-connection',
'vpn-gateway']
def __init__(self):
self.tags = defaultdict(dict)
super(TagBackend, self).__init__()
def create_tag(self, resource_id, key, value):
self.tags[resource_id][key] = value
return value
def create_tags(self, resource_ids, tags):
if None in set([tags[tag] for tag in tags]):
raise InvalidParameterValueErrorTagNull()
for resource_id in resource_ids:
if resource_id in self.tags:
if len(self.tags[resource_id]) + len(tags) > 10:
raise TagLimitExceeded()
elif len(tags) > 10:
raise TagLimitExceeded()
for resource_id in resource_ids:
for tag in tags:
self.tags[resource_id][tag] = tags[tag]
return True
def delete_tag(self, resource_id, key):
return self.tags[resource_id].pop(key)
def delete_tags(self, resource_ids, tags):
for resource_id in resource_ids:
for tag in tags:
if tag in self.tags[resource_id]:
if tags[tag] is None:
self.tags[resource_id].pop(tag)
elif tags[tag] == self.tags[resource_id][tag]:
self.tags[resource_id].pop(tag)
return True
def describe_tags(self, filter_resource_ids=None):
def describe_tags(self, filters=None):
import re
results = []
key_filters = []
resource_id_filters = []
resource_type_filters = []
value_filters = []
if not filters is None:
for tag_filter in filters:
if tag_filter in self.VALID_TAG_FILTERS:
if tag_filter == 'key':
for value in filters[tag_filter]:
key_filters.append(re.compile(simple_aws_filter_to_re(value)))
if tag_filter == 'resource-id':
for value in filters[tag_filter]:
resource_id_filters.append(re.compile(simple_aws_filter_to_re(value)))
if tag_filter == 'resource-type':
for value in filters[tag_filter]:
if value in self.VALID_TAG_RESOURCE_FILTER_TYPES:
resource_type_filters.append(value)
if tag_filter == 'value':
for value in filters[tag_filter]:
value_filters.append(re.compile(simple_aws_filter_to_re(value)))
for resource_id, tags in self.tags.items():
ami = 'ami' in resource_id
for key, value in tags.items():
if not filter_resource_ids or resource_id in filter_resource_ids:
# If we're not filtering, or we are filtering and this
# resource id is in the filter list, add this tag
add_result = False
if filters is None:
add_result = True
else:
key_pass = False
id_pass = False
type_pass = False
value_pass = False
if key_filters:
for pattern in key_filters:
if pattern.match(key) is not None:
key_pass = True
else:
key_pass = True
if resource_id_filters:
for pattern in resource_id_filters:
if pattern.match(resource_id) is not None:
id_pass = True
else:
id_pass = True
if resource_type_filters:
for resource_type in resource_type_filters:
if EC2_PREFIX_TO_RESOURCE[get_prefix(resource_id)] == resource_type:
type_pass = True
else:
type_pass = True
if value_filters:
for pattern in value_filters:
if pattern.match(value) is not None:
value_pass = True
else:
value_pass = True
if key_pass and id_pass and type_pass and value_pass:
add_result = True
# If we're not filtering, or we are filtering and this
if add_result:
result = {
'resource_id': resource_id,
'key': key,
'value': value,
'resource_type': 'image' if ami else 'instance',
}
'resource_type': EC2_PREFIX_TO_RESOURCE[get_prefix(resource_id)],
}
results.append(result)
return results
class Ami(TaggedEC2Instance):
class Ami(TaggedEC2Resource):
def __init__(self, ami_id, instance=None, source_ami=None, name=None, description=None):
self.id = ami_id
self.state = "available"
@ -1158,7 +1267,7 @@ class EBSBackend(object):
return True
class VPC(TaggedEC2Instance):
class VPC(TaggedEC2Resource):
def __init__(self, vpc_id, cidr_block):
self.id = vpc_id
self.cidr_block = cidr_block
@ -1276,7 +1385,7 @@ class VPCPeeringConnectionStatus(object):
self.message = 'Inactive'
class VPCPeeringConnection(TaggedEC2Instance):
class VPCPeeringConnection(TaggedEC2Resource):
def __init__(self, vpc_pcx_id, vpc, peer_vpc):
self.id = vpc_pcx_id
self.vpc = vpc
@ -1340,7 +1449,7 @@ class VPCPeeringConnectionBackend(object):
return vpc_pcx
class Subnet(TaggedEC2Instance):
class Subnet(TaggedEC2Resource):
def __init__(self, subnet_id, vpc_id, cidr_block):
self.id = subnet_id
self.vpc_id = vpc_id
@ -1437,7 +1546,7 @@ class SubnetRouteTableAssociationBackend(object):
return subnet_association
class RouteTable(TaggedEC2Instance):
class RouteTable(TaggedEC2Resource):
def __init__(self, route_table_id, vpc_id, main=False):
self.id = route_table_id
self.vpc_id = vpc_id
@ -1604,7 +1713,7 @@ class RouteBackend(object):
return deleted
class InternetGateway(TaggedEC2Instance):
class InternetGateway(TaggedEC2Resource):
def __init__(self):
self.id = random_internet_gateway_id()
self.vpc = None
@ -1697,7 +1806,7 @@ class VPCGatewayAttachmentBackend(object):
return attachment
class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Instance):
class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource):
def __init__(self, spot_request_id, price, image_id, type, valid_from,
valid_until, launch_group, availability_zone_group, key_name,
security_groups, user_data, instance_type, placement, kernel_id,
@ -1736,7 +1845,8 @@ class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Instance):
def get_filter_value(self, filter_name):
if filter_name == 'state':
return self.state
if filter_name == 'spot-instance-request-id':
return self.id
filter_value = super(SpotInstanceRequest, self).get_filter_value(filter_name)
if filter_value is None:
@ -1914,7 +2024,7 @@ class ElasticAddressBackend(object):
return True
class DHCPOptionsSet(TaggedEC2Instance):
class DHCPOptionsSet(TaggedEC2Resource):
def __init__(self, domain_name_servers=None, domain_name=None,
ntp_servers=None, netbios_name_servers=None,
netbios_node_type=None):
@ -2004,6 +2114,46 @@ class EC2Backend(BaseBackend, InstanceBackend, TagBackend, AmiBackend,
" https://github.com/spulec/moto/issues".format(blurb)
raise NotImplementedError(msg)
def do_resources_exist(self, resource_ids):
for resource_id in resource_ids:
resource_prefix = get_prefix(resource_id)
if resource_prefix == EC2_RESOURCE_TO_PREFIX['customer-gateway']:
self.raise_not_implemented_error('DescribeCustomerGateways')
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['dhcp-options']:
self.describe_dhcp_options(options_ids=[resource_id])
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['image']:
self.describe_images(ami_ids=[resource_id])
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['instance']:
self.get_instance_by_id(instance_id=resource_id)
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['internet-gateway']:
self.describe_internet_gateways(internet_gateway_ids=[resource_id])
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['network-acl']:
self.raise_not_implemented_error('DescribeNetworkAcls')
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['network-interface']:
self.describe_network_interfaces(filters={'network-interface-id': resource_id})
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['reserved-instance']:
self.raise_not_implemented_error('DescribeReservedInstances')
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['route-table']:
self.raise_not_implemented_error('DescribeRouteTables')
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['security-group']:
self.describe_security_groups(group_ids=[resource_id])
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['snapshot']:
self.get_snapshot(snapshot_id=resource_id)
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['spot-instance-request']:
self.describe_spot_instance_requests(filters={'spot-instance-request-id': resource_id})
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['subnet']:
self.get_subnet(subnet_id=resource_id)
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['volume']:
self.get_volume(volume_id=resource_id)
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['vpc']:
self.get_vpc(vpc_id=resource_id)
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['vpc-peering-connection']:
self.get_vpc_peering_connection(vpc_pcx_id=resource_id)
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['vpn-connection']:
self.raise_not_implemented_error('DescribeVpnConnections')
elif resource_prefix == EC2_RESOURCE_TO_PREFIX['vpn-gateway']:
self.raise_not_implemented_error('DescribeVpnGateways')
return True
ec2_backends = {}
for region in boto.ec2.regions():

View File

@ -3,7 +3,8 @@ from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.core.utils import camelcase_to_underscores
from moto.ec2.utils import instance_ids_from_querystring, filters_from_querystring, filter_reservations, dict_from_querystring
from moto.ec2.utils import instance_ids_from_querystring, filters_from_querystring, filter_reservations, \
dict_from_querystring
class InstanceResponse(BaseResponse):

View File

@ -2,27 +2,30 @@ from __future__ import unicode_literals
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring
from moto.ec2.models import ec2_backend, validate_resource_ids
from moto.ec2.utils import sequence_from_querystring, tags_from_query_string, filters_from_querystring
class TagResponse(BaseResponse):
def create_tags(self):
resource_ids = resource_ids_from_querystring(self.querystring)
for resource_id, tag in resource_ids.items():
ec2_backend.create_tag(resource_id, tag[0], tag[1])
resource_ids = sequence_from_querystring('ResourceId', self.querystring)
validate_resource_ids(resource_ids)
self.ec2_backend.do_resources_exist(resource_ids)
tags = tags_from_query_string(self.querystring)
self.ec2_backend.create_tags(resource_ids, tags)
return CREATE_RESPONSE
def delete_tags(self):
resource_ids = resource_ids_from_querystring(self.querystring)
for resource_id, tag in resource_ids.items():
ec2_backend.delete_tag(resource_id, tag[0])
template = Template(DELETE_RESPONSE)
return template.render(reservations=ec2_backend.all_reservations())
resource_ids = sequence_from_querystring('ResourceId', self.querystring)
validate_resource_ids(resource_ids)
tags = tags_from_query_string(self.querystring)
self.ec2_backend.delete_tags(resource_ids, tags)
return DELETE_RESPONSE
def describe_tags(self):
tags = ec2_backend.describe_tags()
filters = filters_from_querystring(querystring_dict=self.querystring)
tags = ec2_backend.describe_tags(filters=filters)
template = Template(DESCRIBE_RESPONSE)
return template.render(tags=tags)

View File

@ -3,81 +3,108 @@ import random
import re
import six
EC2_RESOURCE_TO_PREFIX = {
'customer-gateway': 'cgw',
'dhcp-options': 'dopt',
'image': 'ami',
'instance': 'i',
'internet-gateway': 'igw',
'network-acl': 'acl',
'network-interface': 'eni',
'network-interface-attachment': 'eni-attach',
'reserved-instance': 'uuid4',
'route-table': 'rtb',
'security-group': 'sg',
'snapshot': 'snap',
'spot-instance-request': 'sir',
'subnet': 'subnet',
'reservation': 'r',
'volume': 'vol',
'vpc': 'vpc',
'vpc-elastic-ip': 'eipalloc',
'vpc-elastic-ip-association': 'eipassoc',
'vpc-peering-connection': 'pcx',
'vpn-connection': 'vpn',
'vpn-gateway': 'vgw'}
EC2_PREFIX_TO_RESOURCE = dict((v, k) for (k, v) in EC2_RESOURCE_TO_PREFIX.items())
def random_id(prefix=''):
size = 8
chars = list(range(10)) + ['a', 'b', 'c', 'd', 'e', 'f']
instance_tag = ''.join(six.text_type(random.choice(chars)) for x in range(size))
return '{0}-{1}'.format(prefix, instance_tag)
resource_id = ''.join(six.text_type(random.choice(chars)) for x in range(size))
return '{0}-{1}'.format(prefix, resource_id)
def random_ami_id():
return random_id(prefix='ami')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['image'])
def random_instance_id():
return random_id(prefix='i')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['instance'])
def random_reservation_id():
return random_id(prefix='r')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['reservation'])
def random_security_group_id():
return random_id(prefix='sg')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['security-group'])
def random_snapshot_id():
return random_id(prefix='snap')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['snapshot'])
def random_spot_request_id():
return random_id(prefix='sir')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['spot-instance-request'])
def random_subnet_id():
return random_id(prefix='subnet')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['subnet'])
def random_volume_id():
return random_id(prefix='vol')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['volume'])
def random_vpc_id():
return random_id(prefix='vpc')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['vpc'])
def random_vpc_peering_connection_id():
return random_id(prefix='pcx')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['vpc-peering-connection'])
def random_eip_association_id():
return random_id(prefix='eipassoc')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['vpc-elastic-ip-association'])
def random_internet_gateway_id():
return random_id(prefix='igw')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['internet-gateway'])
def random_route_table_id():
return random_id(prefix='rtb')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['route-table'])
def random_eip_allocation_id():
return random_id(prefix='eipalloc')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['vpc-elastic-ip'])
def random_dhcp_option_id():
return random_id(prefix='dopt')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['dhcp-options'])
def random_eni_id():
return random_id(prefix='eni')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['network-interface'])
def random_eni_attach_id():
return random_id(prefix='eni-attach')
return random_id(prefix=EC2_RESOURCE_TO_PREFIX['network-interface-attachment'])
def random_public_ip():
@ -142,21 +169,19 @@ def sequence_from_querystring(parameter, querystring_dict):
return parameter_values
def resource_ids_from_querystring(querystring_dict):
prefix = 'ResourceId'
def tags_from_query_string(querystring_dict):
prefix = 'Tag'
suffix = 'Key'
response_values = {}
for key, value in querystring_dict.items():
if key.startswith(prefix):
resource_index = key.replace(prefix + ".", "")
tag_key = querystring_dict.get("Tag.{0}.Key".format(resource_index))[0]
tag_value_key = "Tag.{0}.Value".format(resource_index)
if key.startswith(prefix) and key.endswith(suffix):
tag_index = key.replace(prefix + ".", "").replace("." + suffix, "")
tag_key = querystring_dict.get("Tag.{0}.Key".format(tag_index))[0]
tag_value_key = "Tag.{0}.Value".format(tag_index)
if tag_value_key in querystring_dict:
tag_value = querystring_dict.get(tag_value_key)[0]
response_values[tag_key] = querystring_dict.get(tag_value_key)[0]
else:
tag_value = None
response_values[value[0]] = (tag_key, tag_value)
response_values[tag_key] = None
return response_values
@ -299,6 +324,14 @@ def generic_filter(filters, objects):
return objects
def simple_aws_filter_to_re(filter_string):
import fnmatch
tmp_filter = filter_string.replace('\?','[?]')
tmp_filter = tmp_filter.replace('\*','[*]')
tmp_filter = fnmatch.translate(tmp_filter)
return tmp_filter
# not really random ( http://xkcd.com/221/ )
def random_key_pair():
return {
@ -321,3 +354,29 @@ FFBjvSfpJIlJ00zbhNYS5f6GuoEDmFJl0ZxBHjJnyp378OD8uTs7fLvjx79LjSTb
NYiytVbZPQUQ5Yaxu2jXnimvw3rrszlaEXAMPLE
-----END RSA PRIVATE KEY-----"""
}
def get_prefix(resource_id):
resource_id_prefix, separator, after = resource_id.partition('-')
if resource_id_prefix == EC2_RESOURCE_TO_PREFIX['network-interface']:
if after.startswith('attach'):
resource_id_prefix = EC2_RESOURCE_TO_PREFIX['network-interface-attachment']
if not resource_id_prefix in EC2_RESOURCE_TO_PREFIX.values():
import re
uuid4hex = re.compile('[0-9a-f]{12}4[0-9a-f]{3}[89ab][0-9a-f]{15}\Z', re.I)
if uuid4hex.match(resource_id) is not None:
resource_id_prefix = EC2_RESOURCE_TO_PREFIX['reserved-instance']
else:
return None
return resource_id_prefix
def is_valid_resource_id(resource_id):
import re
valid_prefixes = EC2_RESOURCE_TO_PREFIX.values()
resource_id_prefix = get_prefix(resource_id)
if not resource_id_prefix in valid_prefixes:
return False
resource_id_pattern = resource_id_prefix + '-[0-9a-f]{8}'
resource_pattern_re = re.compile(resource_id_pattern)
return resource_pattern_re.match(resource_id) is not None

View File

@ -2,13 +2,29 @@ from __future__ import unicode_literals
import itertools
import boto
from boto.exception import EC2ResponseError
import sure # noqa
from moto import mock_ec2
from nose.tools import assert_raises
@mock_ec2
def test_instance_launch_and_terminate():
def test_add_tag():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("a key", "some value")
chain = itertools.chain.from_iterable
existing_instances = list(chain([res.instances for res in conn.get_all_instances()]))
existing_instances.should.have.length_of(1)
existing_instance = existing_instances[0]
existing_instance.tags["a key"].should.equal("some value")
@mock_ec2
def test_remove_tag():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
@ -23,16 +39,217 @@ def test_instance_launch_and_terminate():
instance.remove_tag("a key")
conn.get_all_tags().should.have.length_of(0)
instance.add_tag("a key", "some value")
conn.get_all_tags().should.have.length_of(1)
instance.remove_tag("a key", "some value")
@mock_ec2
def test_instance_launch_and_retrieve_all_instances():
def test_get_all_tags():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("a key", "some value")
chain = itertools.chain.from_iterable
existing_instances = list(chain([res.instances for res in conn.get_all_instances()]))
existing_instances.should.have.length_of(1)
existing_instance = existing_instances[0]
existing_instance.tags["a key"].should.equal("some value")
tags = conn.get_all_tags()
tag = tags[0]
tag.name.should.equal("a key")
tag.value.should.equal("some value")
@mock_ec2
def test_create_tags():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
tag_dict = {'a key': 'some value',
'another key': 'some other value',
'blank key': ''}
conn.create_tags(instance.id, tag_dict)
tags = conn.get_all_tags()
set([key for key in tag_dict]).should.equal(set([tag.name for tag in tags]))
set([tag_dict[key] for key in tag_dict]).should.equal(set([tag.value for tag in tags]))
@mock_ec2
def test_tag_limit_exceeded():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
tag_dict = {'01': '',
'02': '',
'03': '',
'04': '',
'05': '',
'06': '',
'07': '',
'08': '',
'09': '',
'10': '',
'11': ''}
with assert_raises(EC2ResponseError) as cm:
conn.create_tags(instance.id, tag_dict)
cm.exception.code.should.equal('TagLimitExceeded')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
instance.add_tag("a key", "a value")
with assert_raises(EC2ResponseError) as cm:
conn.create_tags(instance.id, tag_dict)
cm.exception.code.should.equal('TagLimitExceeded')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
tags = conn.get_all_tags()
tag = tags[0]
tags.should.have.length_of(1)
tag.name.should.equal("a key")
tag.value.should.equal("a value")
@mock_ec2
def test_invalid_parameter_tag_null():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
with assert_raises(EC2ResponseError) as cm:
instance.add_tag("a key", None)
cm.exception.code.should.equal('InvalidParameterValue')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_invalid_id():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as cm:
conn.create_tags('ami-blah', {'key': 'tag'})
cm.exception.code.should.equal('InvalidID')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
with assert_raises(EC2ResponseError) as cm:
conn.create_tags('blah-blah', {'key': 'tag'})
cm.exception.code.should.equal('InvalidID')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_get_all_tags_resource_id_filter():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("an instance key", "some value")
image_id = conn.create_image(instance.id, "test-ami", "this is a test ami")
image = conn.get_image(image_id)
image.add_tag("an image key", "some value")
tags = conn.get_all_tags(filters={'resource-id': instance.id})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(instance.id)
tag.res_type.should.equal('instance')
tag.name.should.equal("an instance key")
tag.value.should.equal("some value")
tags = conn.get_all_tags(filters={'resource-id': image_id})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(image_id)
tag.res_type.should.equal('image')
tag.name.should.equal("an image key")
tag.value.should.equal("some value")
@mock_ec2
def test_get_all_tags_resource_type_filter():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("an instance key", "some value")
image_id = conn.create_image(instance.id, "test-ami", "this is a test ami")
image = conn.get_image(image_id)
image.add_tag("an image key", "some value")
tags = conn.get_all_tags(filters={'resource-type': 'instance'})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(instance.id)
tag.res_type.should.equal('instance')
tag.name.should.equal("an instance key")
tag.value.should.equal("some value")
tags = conn.get_all_tags(filters={'resource-type': 'image'})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(image_id)
tag.res_type.should.equal('image')
tag.name.should.equal("an image key")
tag.value.should.equal("some value")
@mock_ec2
def test_get_all_tags_key_filter():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("an instance key", "some value")
image_id = conn.create_image(instance.id, "test-ami", "this is a test ami")
image = conn.get_image(image_id)
image.add_tag("an image key", "some value")
tags = conn.get_all_tags(filters={'key': 'an instance key'})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(instance.id)
tag.res_type.should.equal('instance')
tag.name.should.equal("an instance key")
tag.value.should.equal("some value")
@mock_ec2
def test_get_all_tags_value_filter():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("an instance key", "some value")
reservation_b = conn.run_instances('ami-1234abcd')
instance_b = reservation_b.instances[0]
instance_b.add_tag("an instance key", "some other value")
reservation_c = conn.run_instances('ami-1234abcd')
instance_c = reservation_c.instances[0]
instance_c.add_tag("an instance key", "other value*")
reservation_d = conn.run_instances('ami-1234abcd')
instance_d = reservation_d.instances[0]
instance_d.add_tag("an instance key", "other value**")
reservation_e = conn.run_instances('ami-1234abcd')
instance_e = reservation_e.instances[0]
instance_e.add_tag("an instance key", "other value*?")
image_id = conn.create_image(instance.id, "test-ami", "this is a test ami")
image = conn.get_image(image_id)
image.add_tag("an image key", "some value")
tags = conn.get_all_tags(filters={'value': 'some value'})
tags.should.have.length_of(2)
tags = conn.get_all_tags(filters={'value': 'some*value'})
tags.should.have.length_of(3)
tags = conn.get_all_tags(filters={'value': '*some*value'})
tags.should.have.length_of(3)
tags = conn.get_all_tags(filters={'value': '*some*value*'})
tags.should.have.length_of(3)
tags = conn.get_all_tags(filters={'value': '*value\*'})
tags.should.have.length_of(1)
tags = conn.get_all_tags(filters={'value': '*value\*\*'})
tags.should.have.length_of(1)
tags = conn.get_all_tags(filters={'value': '*value\*\?'})
tags.should.have.length_of(1)