Merge branch 'master' of https://github.com/silveregg/moto into 0.4.1-threadsafe

* 'master' of https://github.com/silveregg/moto: (22 commits)
  filtering the items is needed because of defaultdict is not threadsafe and returns an empty dict which results in an exception here
  add tests for list_endpoints_by_platform_application
  add mock for list_endpoints_by_platform_application method
  [S3]Only add multipart part_id to partlist if it is not already in there. Closes #324.
  Fix etag for reduced min part size.
  Add test_multipart_duplicate_upload
  Fix reduced_min_part_size so that tests run
  Fix authors
  Add @mikegrima to authors
  Fixed how parameters are passed in following clarification on GitHub comments.
  Added in test for the boto IAM method: list_instance_profiles_for_role()
  Change SecurityGroupBackend.{authorize,revoke}_security_group_ingress() methods to receive group name or id, never both
  Add support to AWS::EC2::SecurityGroupIngress creation
  Add @aaltepet to authors.
  Add publish command.
  Add support to tag filtering to Security Groups
  slight change in formatting
  fix test for ec2 instance type filter
  Update minimum support boto version.
  support 'instance_type' filter
  ...
This commit is contained in:
Jeffrey Gelens 2015-05-29 11:34:23 +02:00
commit f5c4ac0b44
19 changed files with 552 additions and 33 deletions

View File

@ -35,3 +35,5 @@ Moto is written by Steve Pulec with contributions from:
* [Gary Dalton](https://github.com/gary-dalton)
* [Chris Henry](https://github.com/chrishenry)
* [Mike Fuller](https://github.com/mfulleratlassian)
* [Andy](https://github.com/aaltepet)
* [Mike Grima](https://github.com/mikegrima)

View File

@ -8,3 +8,5 @@ test:
rm -f .coverage
@nosetests -sv --with-coverage ./tests/
publish:
python setup.py sdist bdist_wheel upload

View File

@ -26,6 +26,7 @@ MODEL_MAP = {
"AWS::EC2::Route": ec2_models.Route,
"AWS::EC2::RouteTable": ec2_models.RouteTable,
"AWS::EC2::SecurityGroup": ec2_models.SecurityGroup,
"AWS::EC2::SecurityGroupIngress": ec2_models.SecurityGroupIngress,
"AWS::EC2::Subnet": ec2_models.Subnet,
"AWS::EC2::SubnetRouteTableAssociation": ec2_models.SubnetRouteTableAssociation,
"AWS::EC2::Volume": ec2_models.Volume,

View File

@ -220,7 +220,7 @@ class Table(object):
results = []
last_page = True # Once pagination is implemented, change this
possible_results = [item for item in list(self.all_items()) if item.hash_key == hash_key]
possible_results = [item for item in list(self.all_items()) if isinstance(item, Item) and item.hash_key == hash_key]
if range_comparison:
for result in possible_results:
if result.range_key.compare(range_comparison, range_objs):

View File

@ -92,7 +92,9 @@ from .utils import (
filter_reservations,
random_network_acl_id,
random_network_acl_subnet_association_id,
random_vpn_gateway_id)
random_vpn_gateway_id,
is_tag_filter,
)
def validate_resource_ids(resource_ids):
@ -119,6 +121,9 @@ class TaggedEC2Resource(object):
tags = self.ec2_backend.describe_tags(filters={'resource-id': [self.id]})
return tags
def add_tag(self, key, value):
self.ec2_backend.create_tags([self.id], {key: value})
def get_filter_value(self, filter_name):
tags = self.get_tags()
@ -1071,12 +1076,16 @@ class SecurityGroup(TaggedEC2Resource):
vpc_id=vpc_id,
)
for tag in properties.get("Tags", []):
tag_key = tag["Key"]
tag_value = tag["Value"]
security_group.add_tag(tag_key, tag_value)
for ingress_rule in properties.get('SecurityGroupIngress', []):
source_group_id = ingress_rule.get('SourceSecurityGroupId')
ec2_backend.authorize_security_group_ingress(
group_name=security_group.name,
group_id=security_group.id,
group_name_or_id=security_group.id,
ip_protocol=ingress_rule['IpProtocol'],
from_port=ingress_rule['FromPort'],
to_port=ingress_rule['ToPort'],
@ -1113,6 +1122,9 @@ class SecurityGroup(TaggedEC2Resource):
for ingress in self.ingress_rules:
if getattr(ingress, ingress_attr) in filter_value:
return True
elif is_tag_filter(key):
tag_value = self.get_filter_value(key)
return tag_value in filter_value
else:
attr_name = to_attr(key)
return getattr(self, attr_name) in filter_value
@ -1205,9 +1217,15 @@ class SecurityGroupBackend(object):
default_group = self.create_security_group("default", "The default security group", vpc_id=vpc_id, force=True)
return default_group
def get_security_group_by_name_or_id(self, group_name_or_id, vpc_id):
# try searching by id, fallbacks to name search
group = self.get_security_group_from_id(group_name_or_id)
if group is None:
group = self.get_security_group_from_name(group_name_or_id, vpc_id)
return group
def authorize_security_group_ingress(self,
group_name,
group_id,
group_name_or_id,
ip_protocol,
from_port,
to_port,
@ -1215,12 +1233,7 @@ class SecurityGroupBackend(object):
source_group_names=None,
source_group_ids=None,
vpc_id=None):
# to auth a group in a VPC you need the group_id the name isn't enough
if group_name:
group = self.get_security_group_from_name(group_name, vpc_id)
elif group_id:
group = self.get_security_group_from_id(group_id)
group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id)
if ip_ranges and not isinstance(ip_ranges, list):
ip_ranges = [ip_ranges]
@ -1248,8 +1261,7 @@ class SecurityGroupBackend(object):
group.ingress_rules.append(security_rule)
def revoke_security_group_ingress(self,
group_name,
group_id,
group_name_or_id,
ip_protocol,
from_port,
to_port,
@ -1258,10 +1270,7 @@ class SecurityGroupBackend(object):
source_group_ids=None,
vpc_id=None):
if group_name:
group = self.get_security_group_from_name(group_name, vpc_id)
elif group_id:
group = self.get_security_group_from_id(group_id)
group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id)
source_groups = []
for source_group_name in source_group_names:
@ -1282,6 +1291,63 @@ class SecurityGroupBackend(object):
raise InvalidPermissionNotFoundError()
class SecurityGroupIngress(object):
def __init__(self, security_group, properties):
self.security_group = security_group
self.properties = properties
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
ec2_backend = ec2_backends[region_name]
group_name = properties.get('GroupName')
group_id = properties.get('GroupId')
ip_protocol = properties.get("IpProtocol")
cidr_ip = properties.get("CidrIp")
from_port = properties.get("FromPort")
source_security_group_id = properties.get("SourceSecurityGroupId")
source_security_group_name = properties.get("SourceSecurityGroupName")
source_security_owner_id = properties.get("SourceSecurityGroupOwnerId") # IGNORED AT THE MOMENT
to_port = properties.get("ToPort")
assert group_id or group_name
assert source_security_group_name or cidr_ip or source_security_group_id
assert ip_protocol
if source_security_group_id:
source_security_group_ids = [source_security_group_id]
else:
source_security_group_ids = None
if source_security_group_name:
source_security_group_names = [source_security_group_name]
else:
source_security_group_names = None
if cidr_ip:
ip_ranges = [cidr_ip]
else:
ip_ranges = []
if group_id:
security_group = ec2_backend.describe_security_groups(group_ids=[group_id])[0]
else:
security_group = ec2_backend.describe_security_groups(groupnames=[group_name])[0]
ec2_backend.authorize_security_group_ingress(
group_name_or_id=security_group.id,
ip_protocol=ip_protocol,
from_port=from_port,
to_port=to_port,
ip_ranges=ip_ranges,
source_group_ids=source_security_group_ids,
source_group_names=source_security_group_names,
)
return cls(security_group, properties)
class VolumeAttachment(object):
def __init__(self, volume, instance, device):
self.volume = volume

View File

@ -4,14 +4,10 @@ from moto.ec2.utils import filters_from_querystring
def process_rules_from_querystring(querystring):
name = None
group_id = None
try:
name = querystring.get('GroupName')[0]
group_name_or_id = querystring.get('GroupName')[0]
except:
group_id = querystring.get('GroupId')[0]
group_name_or_id = querystring.get('GroupId')[0]
ip_protocol = querystring.get('IpPermissions.1.IpProtocol')[0]
from_port = querystring.get('IpPermissions.1.FromPort')[0]
@ -30,7 +26,7 @@ def process_rules_from_querystring(querystring):
elif 'IpPermissions.1.Groups' in key:
source_groups.append(value[0])
return (name, group_id, ip_protocol, from_port, to_port, ip_ranges, source_groups, source_group_ids)
return (group_name_or_id, ip_protocol, from_port, to_port, ip_ranges, source_groups, source_group_ids)
class SecurityGroups(BaseResponse):

View File

@ -310,7 +310,9 @@ def get_object_value(obj, attr):
def is_tag_filter(filter_name):
return filter_name.startswith('tag:')
return (filter_name.startswith('tag:') or
filter_name.startswith('tag-value') or
filter_name.startswith('tag-key'))
def get_obj_tag(obj, filter_name):
@ -318,10 +320,24 @@ def get_obj_tag(obj, filter_name):
tags = dict((tag['key'], tag['value']) for tag in obj.get_tags())
return tags.get(tag_name)
def get_obj_tag_names(obj):
tags = set((tag['key'] for tag in obj.get_tags()))
return tags
def get_obj_tag_values(obj):
tags = set((tag['value'] for tag in obj.get_tags()))
return tags
def tag_filter_matches(obj, filter_name, filter_values):
tag_value = get_obj_tag(obj, filter_name)
return tag_value in filter_values
if filter_name == 'tag-key':
tag_names = get_obj_tag_names(obj)
return len(set(filter_values).intersection(tag_names)) > 0
elif filter_name == 'tag-value':
tag_values = get_obj_tag_values(obj)
return len(set(filter_values).intersection(tag_values)) > 0
else:
tag_value = get_obj_tag(obj, filter_name)
return tag_value in filter_values
filter_dict_attribute_mapping = {
@ -331,7 +347,8 @@ filter_dict_attribute_mapping = {
'source-dest-check': 'source_dest_check',
'vpc-id': 'vpc_id',
'group-id': 'security_groups',
'instance.group-id': 'security_groups'
'instance.group-id': 'security_groups',
'instance-type': 'instance_type'
}

View File

@ -291,6 +291,16 @@ class IAMBackend(BaseBackend):
def get_instance_profiles(self):
return self.instance_profiles.values()
def get_instance_profiles_for_role(self, role_name):
found_profiles = []
for profile in self.get_instance_profiles():
if len(profile.roles) > 0:
if profile.roles[0].name == role_name:
found_profiles.append(profile)
return found_profiles
def add_role_to_instance_profile(self, profile_name, role_name):
profile = self.get_instance_profile(profile_name)
role = self.get_role(role_name)

View File

@ -87,6 +87,13 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_INSTANCE_PROFILES_TEMPLATE)
return template.render(instance_profiles=profiles)
def list_instance_profiles_for_role(self):
role_name = self._get_param('RoleName')
profiles = iam_backend.get_instance_profiles_for_role(role_name=role_name)
template = self.response_template(LIST_INSTANCE_PROFILES_FOR_ROLE_TEMPLATE)
return template.render(instance_profiles=profiles)
def upload_server_certificate(self):
cert_name = self._get_param('ServerCertificateName')
cert_body = self._get_param('CertificateBody')
@ -601,4 +608,36 @@ CREDENTIAL_REPORT = """<GetCredentialReportResponse>
<ResponseMetadata>
<RequestId>fa788a82-aa8a-11e4-a278-1786c418872b"</RequestId>
</ResponseMetadata>
</GetCredentialReportResponse>"""
</GetCredentialReportResponse>"""
LIST_INSTANCE_PROFILES_FOR_ROLE_TEMPLATE = """<ListInstanceProfilesForRoleResponse>
<ListInstanceProfilesForRoleResult>
<IsTruncated>false</IsTruncated>
<InstanceProfiles>
{% for profile in instance_profiles %}
<member>
<Id>{{ profile.id }}</Id>
<Roles>
{% for role in profile.roles %}
<member>
<Path>{{ role.path }}</Path>
<Arn>arn:aws:iam::123456789012:role{{ role.path }}S3Access</Arn>
<RoleName>{{ role.name }}</RoleName>
<AssumeRolePolicyDocument>{{ role.assume_policy_document }}</AssumeRolePolicyDocument>
<CreateDate>2012-05-09T15:45:35Z</CreateDate>
<RoleId>{{ role.id }}</RoleId>
</member>
{% endfor %}
</Roles>
<InstanceProfileName>{{ profile.name }}</InstanceProfileName>
<Path>{{ profile.path }}</Path>
<Arn>arn:aws:iam::123456789012:instance-profile{{ profile.path }}Webserver</Arn>
<CreateDate>2012-05-09T16:27:11Z</CreateDate>
</member>
{% endfor %}
</InstanceProfiles>
</ListInstanceProfilesForRoleResult>
<ResponseMetadata>
<RequestId>6a8c3992-99f4-11e1-a4c3-27EXAMPLE804</RequestId>
</ResponseMetadata>
</ListInstanceProfilesForRoleResponse>"""

View File

@ -152,7 +152,8 @@ class FakeMultipart(object):
key = FakeKey(part_id, value)
self.parts[part_id] = key
insort(self.partlist, part_id)
if part_id not in self.partlist:
insort(self.partlist, part_id)
return key
def list_parts(self):

View File

@ -184,3 +184,25 @@ class SNSResponse(BaseResponse):
}
}
})
def list_endpoints_by_platform_application(self):
return json.dumps({
"ListEndpointsByPlatformApplicationResponse": {
"ListEndpointsByPlatformApplicationResult": {
"Endpoints": [
{
"Attributes": {
"Token": "TOKEN",
"Enabled": "true",
"CustomUserData": ""
},
"EndpointArn": "FAKE_ARN_ENDPOINT"
}
],
"NextToken": None
},
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937a",
}
}
})

2
setup.cfg Normal file
View File

@ -0,0 +1,2 @@
[bdist_wheel]
universal=1

View File

@ -4,7 +4,7 @@ from setuptools import setup, find_packages
install_requires = [
"Jinja2",
"boto",
"boto>=2.20.0",
"flask",
"httpretty>=0.6.1",
"requests",

View File

@ -1014,3 +1014,183 @@ def test_vpc_peering_creation():
peering_connections = vpc_conn.get_all_vpc_peering_connections()
peering_connections.should.have.length_of(1)
@mock_cloudformation
@mock_ec2
def test_security_group_ingress_separate_from_security_group_by_id():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"test-security-group1": {
"Type": "AWS::EC2::SecurityGroup",
"Properties": {
"GroupDescription": "test security group",
"Tags": [
{
"Key": "sg-name",
"Value": "sg1"
}
]
},
},
"test-security-group2": {
"Type": "AWS::EC2::SecurityGroup",
"Properties": {
"GroupDescription": "test security group",
"Tags": [
{
"Key": "sg-name",
"Value": "sg2"
}
]
},
},
"test-sg-ingress": {
"Type": "AWS::EC2::SecurityGroupIngress",
"Properties": {
"GroupId": {"Ref": "test-security-group1"},
"IpProtocol": "tcp",
"FromPort": "80",
"ToPort": "8080",
"SourceSecurityGroupId": {"Ref": "test-security-group2"},
}
}
}
}
template_json = json.dumps(template)
cf_conn = boto.cloudformation.connect_to_region("us-west-1")
cf_conn.create_stack(
"test_stack",
template_body=template_json,
)
ec2_conn = boto.ec2.connect_to_region("us-west-1")
security_group1 = ec2_conn.get_all_security_groups(filters={"tag:sg-name": "sg1"})[0]
security_group2 = ec2_conn.get_all_security_groups(filters={"tag:sg-name": "sg2"})[0]
security_group1.rules.should.have.length_of(1)
security_group1.rules[0].grants.should.have.length_of(1)
security_group1.rules[0].grants[0].group_id.should.equal(security_group2.id)
security_group1.rules[0].ip_protocol.should.equal('tcp')
security_group1.rules[0].from_port.should.equal('80')
security_group1.rules[0].to_port.should.equal('8080')
@mock_cloudformation
@mock_ec2
def test_security_group_ingress_separate_from_security_group_by_id():
ec2_conn = boto.ec2.connect_to_region("us-west-1")
ec2_conn.create_security_group("test-security-group1", "test security group")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"test-security-group2": {
"Type": "AWS::EC2::SecurityGroup",
"Properties": {
"GroupDescription": "test security group",
"Tags": [
{
"Key": "sg-name",
"Value": "sg2"
}
]
},
},
"test-sg-ingress": {
"Type": "AWS::EC2::SecurityGroupIngress",
"Properties": {
"GroupName": "test-security-group1",
"IpProtocol": "tcp",
"FromPort": "80",
"ToPort": "8080",
"SourceSecurityGroupId": {"Ref": "test-security-group2"},
}
}
}
}
template_json = json.dumps(template)
cf_conn = boto.cloudformation.connect_to_region("us-west-1")
cf_conn.create_stack(
"test_stack",
template_body=template_json,
)
security_group1 = ec2_conn.get_all_security_groups(groupnames=["test-security-group1"])[0]
security_group2 = ec2_conn.get_all_security_groups(filters={"tag:sg-name": "sg2"})[0]
security_group1.rules.should.have.length_of(1)
security_group1.rules[0].grants.should.have.length_of(1)
security_group1.rules[0].grants[0].group_id.should.equal(security_group2.id)
security_group1.rules[0].ip_protocol.should.equal('tcp')
security_group1.rules[0].from_port.should.equal('80')
security_group1.rules[0].to_port.should.equal('8080')
@mock_cloudformation
@mock_ec2
def test_security_group_ingress_separate_from_security_group_by_id_using_vpc():
vpc_conn = boto.vpc.connect_to_region("us-west-1")
vpc = vpc_conn.create_vpc("10.0.0.0/16")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"test-security-group1": {
"Type": "AWS::EC2::SecurityGroup",
"Properties": {
"GroupDescription": "test security group",
"VpcId": vpc.id,
"Tags": [
{
"Key": "sg-name",
"Value": "sg1"
}
]
},
},
"test-security-group2": {
"Type": "AWS::EC2::SecurityGroup",
"Properties": {
"GroupDescription": "test security group",
"VpcId": vpc.id,
"Tags": [
{
"Key": "sg-name",
"Value": "sg2"
}
]
},
},
"test-sg-ingress": {
"Type": "AWS::EC2::SecurityGroupIngress",
"Properties": {
"GroupId": {"Ref": "test-security-group1"},
"VpcId": vpc.id,
"IpProtocol": "tcp",
"FromPort": "80",
"ToPort": "8080",
"SourceSecurityGroupId": {"Ref": "test-security-group2"},
}
}
}
}
template_json = json.dumps(template)
cf_conn = boto.cloudformation.connect_to_region("us-west-1")
cf_conn.create_stack(
"test_stack",
template_body=template_json,
)
security_group1 = vpc_conn.get_all_security_groups(filters={"tag:sg-name": "sg1"})[0]
security_group2 = vpc_conn.get_all_security_groups(filters={"tag:sg-name": "sg2"})[0]
security_group1.rules.should.have.length_of(1)
security_group1.rules[0].grants.should.have.length_of(1)
security_group1.rules[0].grants[0].group_id.should.equal(security_group2.id)
security_group1.rules[0].ip_protocol.should.equal('tcp')
security_group1.rules[0].from_port.should.equal('80')
security_group1.rules[0].to_port.should.equal('8080')

View File

@ -139,6 +139,48 @@ def test_get_instances_filtering_by_instance_id():
reservations = conn.get_all_instances(filters={'instance-id': 'non-existing-id'})
reservations.should.have.length_of(0)
@mock_ec2
def test_get_instances_filtering_by_instance_type():
conn = boto.connect_ec2()
reservation1 = conn.run_instances('ami-1234abcd', instance_type='m1.small')
instance1 = reservation1.instances[0]
reservation2 = conn.run_instances('ami-1234abcd', instance_type='m1.small')
instance2 = reservation2.instances[0]
reservation3 = conn.run_instances('ami-1234abcd', instance_type='t1.micro')
instance3 = reservation3.instances[0]
reservations = conn.get_all_instances(filters={'instance-type': 'm1.small'})
# get_all_instances should return instance1,2
reservations.should.have.length_of(2)
reservations[0].instances.should.have.length_of(1)
reservations[1].instances.should.have.length_of(1)
instance_ids = [ reservations[0].instances[0].id,
reservations[1].instances[0].id ]
set(instance_ids).should.equal(set([instance1.id, instance2.id]))
reservations = conn.get_all_instances(filters={'instance-type': 't1.micro'})
# get_all_instances should return one
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(1)
reservations[0].instances[0].id.should.equal(instance3.id)
reservations = conn.get_all_instances(filters={'instance-type': ['t1.micro', 'm1.small']})
reservations.should.have.length_of(3)
reservations[0].instances.should.have.length_of(1)
reservations[1].instances.should.have.length_of(1)
reservations[2].instances.should.have.length_of(1)
instance_ids = [
reservations[0].instances[0].id,
reservations[1].instances[0].id,
reservations[2].instances[0].id,
]
set(instance_ids).should.equal(set([instance1.id, instance2.id, instance3.id]))
reservations = conn.get_all_instances(filters={'instance-type': 'bogus'})
#bogus instance-type should return none
reservations.should.have.length_of(0)
@mock_ec2
def test_get_instances_filtering_by_reason_code():
conn = boto.connect_ec2()
@ -240,6 +282,73 @@ def test_get_instances_filtering_by_tag():
reservations[0].instances[0].id.should.equal(instance1.id)
reservations[0].instances[1].id.should.equal(instance3.id)
@mock_ec2
def test_get_instances_filtering_by_tag_value():
conn = boto.connect_ec2()
reservation = conn.run_instances('ami-1234abcd', min_count=3)
instance1, instance2, instance3 = reservation.instances
instance1.add_tag('tag1', 'value1')
instance1.add_tag('tag2', 'value2')
instance2.add_tag('tag1', 'value1')
instance2.add_tag('tag2', 'wrong value')
instance3.add_tag('tag2', 'value2')
reservations = conn.get_all_instances(filters={'tag-value' : 'value0'})
# get_all_instances should return no instances
reservations.should.have.length_of(0)
reservations = conn.get_all_instances(filters={'tag-value' : 'value1'})
# get_all_instances should return both instances with this tag value
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(2)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations[0].instances[1].id.should.equal(instance2.id)
reservations = conn.get_all_instances(filters={'tag-value' : ['value2', 'value1']})
# get_all_instances should return both instances with one of the acceptable tag values
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(3)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations[0].instances[1].id.should.equal(instance2.id)
reservations[0].instances[2].id.should.equal(instance3.id)
reservations = conn.get_all_instances(filters={'tag-value' : ['value2', 'bogus']})
# get_all_instances should return both instances with one of the acceptable tag values
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(2)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations[0].instances[1].id.should.equal(instance3.id)
@mock_ec2
def test_get_instances_filtering_by_tag_name():
conn = boto.connect_ec2()
reservation = conn.run_instances('ami-1234abcd', min_count=3)
instance1, instance2, instance3 = reservation.instances
instance1.add_tag('tag1')
instance1.add_tag('tag2')
instance2.add_tag('tag1')
instance2.add_tag('tag2X')
instance3.add_tag('tag3')
reservations = conn.get_all_instances(filters={'tag-key' : 'tagX'})
# get_all_instances should return no instances
reservations.should.have.length_of(0)
reservations = conn.get_all_instances(filters={'tag-key' : 'tag1'})
# get_all_instances should return both instances with this tag value
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(2)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations[0].instances[1].id.should.equal(instance2.id)
reservations = conn.get_all_instances(filters={'tag-key' : ['tag1', 'tag3']})
# get_all_instances should return both instances with one of the acceptable tag values
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(3)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations[0].instances[1].id.should.equal(instance2.id)
reservations[0].instances[2].id.should.equal(instance3.id)
@mock_ec2
def test_instance_start_and_stop():
conn = boto.connect_ec2('the_key', 'the_secret')

View File

@ -248,3 +248,13 @@ def test_security_group_tagging():
group = conn.get_all_security_groups("test-sg")[0]
group.tags.should.have.length_of(1)
group.tags["Test"].should.equal("Tag")
@mock_ec2
def test_security_group_tag_filtering():
conn = boto.connect_ec2()
sg = conn.create_security_group("test-sg", "Test SG")
sg.add_tag("test-tag", "test-value")
groups = conn.get_all_security_groups(filters={"tag:test-tag": "test-value"})
groups.should.have.length_of(1)

View File

@ -62,6 +62,34 @@ def test_create_role_and_instance_profile():
conn.list_roles().roles[0].role_name.should.equal('my-role')
conn.list_instance_profiles().instance_profiles[0].instance_profile_name.should.equal("my-profile")
@mock_iam()
def test_list_instance_profiles_for_role():
conn = boto.connect_iam()
conn.create_role(role_name="my-role", assume_role_policy_document="some policy", path="my-path")
conn.create_role(role_name="my-role2", assume_role_policy_document="some policy2", path="my-path2")
profile_name_list = ['my-profile', 'my-profile2']
profile_path_list = ['my-path', 'my-path2']
for profile_count in range(0,2):
conn.create_instance_profile(profile_name_list[profile_count], path=profile_path_list[profile_count])
for profile_count in range(0,2):
conn.add_role_to_instance_profile(profile_name_list[profile_count], "my-role")
profile_dump = conn.list_instance_profiles_for_role(role_name="my-role")
profile_list = profile_dump['list_instance_profiles_for_role_response']['list_instance_profiles_for_role_result']['instance_profiles']
for profile_count in range(0,len(profile_list)):
profile_name_list.remove(profile_list[profile_count]["instance_profile_name"])
profile_path_list.remove(profile_list[profile_count]["path"])
profile_list[profile_count]["roles"]["member"]["role_name"].should.equal("my-role")
len(profile_name_list).should.equal(0)
len(profile_path_list).should.equal(0)
profile_dump2 = conn.list_instance_profiles_for_role(role_name="my-role2")
profile_list = profile_dump2['list_instance_profiles_for_role_response']['list_instance_profiles_for_role_result']['instance_profiles']
len(profile_list).should.equal(0)
@mock_iam()
def test_list_role_policies():

View File

@ -3,6 +3,7 @@
from __future__ import unicode_literals
from six.moves.urllib.request import urlopen
from six.moves.urllib.error import HTTPError
from functools import wraps
from io import BytesIO
import boto
@ -29,6 +30,7 @@ def reduced_min_part_size(f):
import moto.s3.models as s3model
orig_size = s3model.UPLOAD_PART_MIN_SIZE
@wraps(f)
def wrapped(*args, **kwargs):
try:
s3model.UPLOAD_PART_MIN_SIZE = REDUCED_PART_SIZE
@ -186,7 +188,7 @@ def test_multipart_etag():
multipart.complete_upload()
# we should get both parts as the key contents
bucket.get_key("the-key").etag.should.equal(
'"140f92a6df9f9e415f74a1463bcee9bb-2"')
'"66d1a1a2ed08fd05c137f316af4ff255-2"')
@mock_s3
@ -208,7 +210,23 @@ def test_multipart_invalid_order():
bucket.complete_multipart_upload.when.called_with(
multipart.key_name, multipart.id, xml).should.throw(S3ResponseError)
@mock_s3
@reduced_min_part_size
def test_multipart_duplicate_upload():
conn = boto.connect_s3('the_key', 'the_secret')
bucket = conn.create_bucket("foobar")
multipart = bucket.initiate_multipart_upload("the-key")
part1 = b'0' * REDUCED_PART_SIZE
multipart.upload_part_from_file(BytesIO(part1), 1)
# same part again
multipart.upload_part_from_file(BytesIO(part1), 1)
part2 = b'1' * 1024
multipart.upload_part_from_file(BytesIO(part2), 2)
multipart.complete_upload()
# We should get only one copy of part 1.
bucket.get_key("the-key").get_contents_as_string().should.equal(part1 + part2)
@mock_s3
def test_list_multiparts():
# Create Bucket so that test can run

View File

@ -0,0 +1,16 @@
from __future__ import unicode_literals
import boto
from moto import mock_sns
@mock_sns
def test_get_list_endpoints_by_platform_application():
conn = boto.connect_sns()
endpoint_list = conn.list_endpoints_by_platform_application(
platform_application_arn='fake_arn'
)['ListEndpointsByPlatformApplicationResponse']['ListEndpointsByPlatformApplicationResult']['Endpoints']
endpoint_list.should.have.length_of(1)
endpoint_list[0]['Attributes']['Enabled'].should.equal('true')
endpoint_list[0]['EndpointArn'].should.equal('FAKE_ARN_ENDPOINT')