Fix egress rules management to autorize or revoke a security group

This commit is contained in:
Yann Lambret 2016-04-20 23:01:09 +02:00
parent de68c94a0a
commit f9267cff6c
2 changed files with 52 additions and 9 deletions

View File

@ -1324,7 +1324,6 @@ class SecurityGroupBackend(object):
if security_rule in group.ingress_rules:
group.ingress_rules.remove(security_rule)
return security_rule
raise InvalidPermissionNotFoundError()
def authorize_security_group_egress(self,
@ -1333,22 +1332,33 @@ class SecurityGroupBackend(object):
from_port,
to_port,
ip_ranges,
src_group_id=None,
cidr_ip=None,
source_group_names=None,
source_group_ids=None,
vpc_id=None):
group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id)
if ip_ranges and not isinstance(ip_ranges, list):
ip_ranges = [ip_ranges]
if ip_ranges:
for cidr in ip_ranges:
if not is_valid_cidr(cidr):
raise InvalidCIDRSubnetError(cidr=cidr)
# for VPCs
source_group_names = source_group_names if source_group_names else []
source_group_ids = source_group_ids if source_group_ids else []
source_groups = []
source_group = self.get_security_group_from_id(src_group_id)
if source_group:
source_groups.append(source_group)
for source_group_name in source_group_names:
source_group = self.get_security_group_from_name(source_group_name, vpc_id)
if source_group:
source_groups.append(source_group)
# for VPCs
for source_group_id in source_group_ids:
source_group = self.get_security_group_from_id(source_group_id)
if source_group:
source_groups.append(source_group)
security_rule = SecurityRule(ip_protocol, from_port, to_port, ip_ranges, source_groups)
group.egress_rules.append(security_rule)
@ -1370,6 +1380,11 @@ class SecurityGroupBackend(object):
if source_group:
source_groups.append(source_group)
for source_group_id in source_group_ids:
source_group = self.get_security_group_from_id(source_group_id)
if source_group:
source_groups.append(source_group)
security_rule = SecurityRule(ip_protocol, from_port, to_port, ip_ranges, source_groups)
if security_rule in group.egress_rules:
group.egress_rules.remove(security_rule)

View File

@ -3,6 +3,7 @@ from __future__ import unicode_literals
import tests.backport_assert_raises # noqa
from nose.tools import assert_raises
import boto3
import boto
from boto.exception import EC2ResponseError
import sure # noqa
@ -157,6 +158,8 @@ def test_authorize_ip_range_and_revoke():
success = conn.authorize_security_group_egress(egress_security_group.id, "tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.123/32")
assert success.should.be.true
egress_security_group = conn.get_all_security_groups(groupnames='testegress')[0]
# There are two egress rules associated with the security group:
# the default outbound rule and the new one
int(egress_security_group.rules_egress[1].to_port).should.equal(2222)
egress_security_group.rules_egress[1].grants[0].cidr_ip.should.equal("123.123.123.123/32")
@ -167,6 +170,7 @@ def test_authorize_ip_range_and_revoke():
conn.revoke_security_group_egress(egress_security_group.id, "tcp", from_port="22", to_port="2222", cidr_ip="123.123.123.123/32")
egress_security_group = conn.get_all_security_groups()[0]
# There is still the default outbound rule
egress_security_group.rules_egress.should.have.length_of(1)
@ -198,6 +202,30 @@ def test_authorize_other_group_and_revoke():
security_group.rules.should.have.length_of(0)
@mock_ec2
def test_authorize_other_group_egress_and_revoke():
ec2 = boto3.resource('ec2', region_name='us-west-1')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
sg01 = ec2.create_security_group(GroupName='sg01', Description='Test security group sg01', VpcId=vpc.id)
sg02 = ec2.create_security_group(GroupName='sg02', Description='Test security group sg02', VpcId=vpc.id)
ip_permission = {
'IpProtocol': u'tcp',
'FromPort': 27017,
'ToPort': 27017,
'UserIdGroupPairs': [{'GroupId': sg02.id, 'GroupName': 'sg02', 'UserId': sg02.owner_id}],
'IpRanges': []
}
sg01.authorize_egress(IpPermissions=[ip_permission])
sg01.ip_permissions_egress.should.have.length_of(2)
sg01.ip_permissions_egress.should.contain(ip_permission)
sg01.revoke_egress(IpPermissions=[ip_permission])
sg01.ip_permissions_egress.should.have.length_of(1)
@mock_ec2
def test_authorize_group_in_vpc():
conn = boto.connect_ec2('the_key', 'the_secret')
@ -215,7 +243,7 @@ def test_authorize_group_in_vpc():
int(security_group.rules[0].to_port).should.equal(2222)
security_group.rules[0].grants[0].group_id.should.equal(other_security_group.id)
# Now revome the rule
# Now remove the rule
success = security_group.revoke(ip_protocol="tcp", from_port="22", to_port="2222", src_group=other_security_group)
success.should.be.true