Added support to get_all_security_groups endpoint to actually filter groups.

- Filters by groupnames, group_ids and a filters. However, the filters option doesn't
   support owner-id and tags since neither attribute was readily available via the SecurityGroup object.

 - Also included a basic test to confirm it works.
This commit is contained in:
Rory-Finnegan 2014-08-21 19:04:48 -05:00 committed by Rory-Finnegan
parent a3b02f3f8b
commit 665beda466
3 changed files with 97 additions and 3 deletions

View File

@ -544,6 +544,43 @@ class SecurityGroup(object):
def physical_resource_id(self):
return self.id
def matches_filters(self, filters):
result = True
def to_attr(filter_name):
attr = None
if attr == 'group-name':
attr = 'name'
elif attr == 'group-id':
attr = 'id'
else:
attr = filter_name.replace('-', '_')
return attr
for key, value in filters.iteritems():
ret = False
if key.startswith('ip-permission'):
match = re.search(r"ip-permission.(*)", key)
ingress_attr = to_attr(match.groups()[0])
for ingress in self.ingress_rules:
if getattr(ingress, ingress_attr) in filters[key]:
ret = True
break
else:
attr_name = to_attr(key)
ret = getattr(self, attr_name) in filters[key]
if not ret:
break
else:
result = False
return result
class SecurityGroupBackend(object):
@ -566,8 +603,20 @@ class SecurityGroupBackend(object):
self.groups[vpc_id][group_id] = group
return group
def describe_security_groups(self):
return itertools.chain(*[x.values() for x in self.groups.values()])
def describe_security_groups(self, group_ids=None, groupnames=None, filters=None):
all_groups = itertools.chain(*[x.values() for x in self.groups.values()])
groups = []
if group_ids or groupnames or filters:
for group in all_groups:
if ((group_ids and group.id in group_ids) or
(groupnames and group.name in groupnames) or
(filters and group.matches_filters(filters))):
groups.append(group)
else:
groups = all_groups
return groups
def delete_security_group(self, name=None, group_id=None):
if group_id:

View File

@ -3,6 +3,7 @@ from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backend
from moto.ec2.utils import filters_from_querystring
def process_rules_from_querystring(querystring):
@ -35,6 +36,22 @@ def process_rules_from_querystring(querystring):
return (name, group_id, ip_protocol, from_port, to_port, ip_ranges, source_groups, source_group_ids)
def process_group_ids_from_querystring(querystring):
group_ids = []
for key, value in querystring.iteritems():
if 'GroupId' in key:
group_ids.append(value[0])
return group_ids
def process_groupnames_from_querystring(querystring):
groupnames = []
for key, value in querystring.iteritems():
if 'GroupName' in key:
groupnames.append(value[0])
return groupnames
class SecurityGroups(BaseResponse):
def authorize_security_group_egress(self):
raise NotImplementedError('SecurityGroups.authorize_security_group_egress is not yet implemented')
@ -65,7 +82,16 @@ class SecurityGroups(BaseResponse):
return DELETE_GROUP_RESPONSE
def describe_security_groups(self):
groups = ec2_backend.describe_security_groups()
groupnames = process_groupnames_from_querystring(self.querystring)
group_ids = process_group_ids_from_querystring(self.querystring)
filters = filters_from_querystring(self.querystring)
groups = ec2_backend.describe_security_groups(
group_ids=group_ids,
groupnames=groupnames,
filters=filters
)
template = Template(DESCRIBE_SECURITY_GROUPS_RESPONSE)
return template.render(groups=groups)

View File

@ -195,3 +195,22 @@ def test_authorize_group_in_vpc():
# And check that it gets revoked
security_group = [group for group in conn.get_all_security_groups() if group.name == 'test1'][0]
security_group.rules.should.have.length_of(0)
@mock_ec2
def test_get_all_security_groups():
conn = boto.connect_ec2()
conn.create_security_group(name='test1', description='test1', vpc_id='vpc-mjm05d27')
conn.create_security_group(name='test2', description='test2')
resp = conn.get_all_security_groups(groupnames=['test1'])
resp.should.have.length_of(1)
resp = conn.get_all_security_groups(filters={'vpc_id': ['vpc-mjm05d27']})
resp.should.have.length_of(1)
resp = conn.get_all_security_groups(filters={'description': ['test1']})
resp.should.have.length_of(1)
resp = conn.get_all_security_groups()
resp.should.have.length_of(2)