Raise InvalidGroup.NotFound in DescribeSecurityGroups

This commit is contained in:
Nuwan Goonasekera 2017-09-18 19:51:01 +05:30
parent 2937cf4c45
commit 298772ca92
2 changed files with 33 additions and 15 deletions

View File

@ -1360,22 +1360,25 @@ class SecurityGroupBackend(object):
return group
def describe_security_groups(self, group_ids=None, groupnames=None, filters=None):
all_groups = itertools.chain(*[x.values()
for x in self.groups.values()])
groups = []
matches = itertools.chain(*[x.values()
for x in self.groups.values()])
if group_ids:
matches = [grp for grp in matches
if grp.id in group_ids]
if len(group_ids) > len(matches):
unknown_ids = set(group_ids) - set(matches)
raise InvalidSecurityGroupNotFoundError(unknown_ids)
if groupnames:
matches = [grp for grp in matches
if grp.name in groupnames]
if len(groupnames) > len(matches):
unknown_names = set(groupnames) - set(matches)
raise InvalidSecurityGroupNotFoundError(unknown_names)
if filters:
matches = [grp for grp in matches
if grp.matches_filters(filters)]
if group_ids or groupnames or filters:
for group in all_groups:
if ((group_ids and group.id not in group_ids) or
(groupnames and group.name not in groupnames)):
continue
if filters and not group.matches_filters(filters):
continue
groups.append(group)
else:
groups = all_groups
return groups
return matches
def _delete_security_group(self, vpc_id, group_id):
if self.groups[vpc_id][group_id].enis:

View File

@ -348,6 +348,15 @@ def test_get_all_security_groups():
resp.should.have.length_of(1)
resp[0].id.should.equal(sg1.id)
with assert_raises(EC2ResponseError) as cm:
conn.get_all_security_groups(groupnames=['does_not_exist'])
cm.exception.code.should.equal('InvalidGroup.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
resp.should.have.length_of(1)
resp[0].id.should.equal(sg1.id)
resp = conn.get_all_security_groups(filters={'vpc-id': ['vpc-mjm05d27']})
resp.should.have.length_of(1)
resp[0].id.should.equal(sg1.id)
@ -681,3 +690,9 @@ def test_get_all_security_groups_filter_with_same_vpc_id():
security_groups = conn.get_all_security_groups(
group_ids=[security_group.id], filters={'vpc-id': [vpc_id]})
security_groups.should.have.length_of(1)
with assert_raises(EC2ResponseError) as cm:
conn.get_all_security_groups(group_ids=['does_not_exist'])
cm.exception.code.should.equal('InvalidGroup.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none