Merge pull request #109 from StartTheShift/fix_vpc_grant_permission_issue
Fix vpc grant permission to groups issue
This commit is contained in:
		
						commit
						a9b48ed836
					
				@ -373,6 +373,16 @@ class SecurityGroupBackend(object):
 | 
			
		||||
            if group:
 | 
			
		||||
                return self.groups[None].pop(group.id)
 | 
			
		||||
 | 
			
		||||
    def get_security_group_from_id(self, group_id):
 | 
			
		||||
        # 2 levels of chaining necessary since it's a complex structure
 | 
			
		||||
        all_groups = itertools.chain.from_iterable([x.values() for x in self.groups.values()])
 | 
			
		||||
 | 
			
		||||
        for group in all_groups:
 | 
			
		||||
            if group.id == group_id:
 | 
			
		||||
                return group
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def get_security_group_from_name(self, name, vpc_id):
 | 
			
		||||
        for group_id, group in self.groups[vpc_id].iteritems():
 | 
			
		||||
            if group.name == name:
 | 
			
		||||
@ -383,25 +393,66 @@ class SecurityGroupBackend(object):
 | 
			
		||||
            default_group = ec2_backend.create_security_group("default", "The default security group", force=True)
 | 
			
		||||
            return default_group
 | 
			
		||||
 | 
			
		||||
    def authorize_security_group_ingress(self, group_name, ip_protocol, from_port, to_port, ip_ranges=None, source_group_names=None, vpc_id=None):
 | 
			
		||||
    def authorize_security_group_ingress(self,
 | 
			
		||||
                                         group_name,
 | 
			
		||||
                                         group_id,
 | 
			
		||||
                                         ip_protocol,
 | 
			
		||||
                                         from_port,
 | 
			
		||||
                                         to_port,
 | 
			
		||||
                                         ip_ranges=None,
 | 
			
		||||
                                         source_group_names=None,
 | 
			
		||||
                                         source_group_ids=None,
 | 
			
		||||
                                         vpc_id=None):
 | 
			
		||||
        # to auth a group in a VPC you need the group_id the name isn't enough
 | 
			
		||||
 | 
			
		||||
        if group_name:
 | 
			
		||||
            group = self.get_security_group_from_name(group_name, vpc_id)
 | 
			
		||||
        elif group_id:
 | 
			
		||||
            group = self.get_security_group_from_id(group_id)
 | 
			
		||||
 | 
			
		||||
        source_groups = []
 | 
			
		||||
        for source_group_name in source_group_names:
 | 
			
		||||
            source_group = self.get_security_group_from_name(source_group_name, vpc_id)
 | 
			
		||||
            if source_group:
 | 
			
		||||
                source_groups.append(source_group)
 | 
			
		||||
 | 
			
		||||
        # for VPCs
 | 
			
		||||
        for source_group_id in source_group_ids:
 | 
			
		||||
            source_group = self.get_security_group_from_id(source_group_id)
 | 
			
		||||
            if source_group:
 | 
			
		||||
                source_groups.append(source_group)
 | 
			
		||||
 | 
			
		||||
        security_rule = SecurityRule(ip_protocol, from_port, to_port, ip_ranges, source_groups)
 | 
			
		||||
        group.ingress_rules.append(security_rule)
 | 
			
		||||
 | 
			
		||||
    def revoke_security_group_ingress(self, group_name, ip_protocol, from_port, to_port, ip_ranges=None, source_group_names=None, vpc_id=None):
 | 
			
		||||
    def revoke_security_group_ingress(self,
 | 
			
		||||
                                      group_name,
 | 
			
		||||
                                      group_id,
 | 
			
		||||
                                      ip_protocol,
 | 
			
		||||
                                      from_port,
 | 
			
		||||
                                      to_port,
 | 
			
		||||
                                      ip_ranges=None,
 | 
			
		||||
                                      source_group_names=None,
 | 
			
		||||
                                      source_group_ids=None,
 | 
			
		||||
                                      vpc_id=None):
 | 
			
		||||
 | 
			
		||||
        if group_name:
 | 
			
		||||
            group = self.get_security_group_from_name(group_name, vpc_id)
 | 
			
		||||
        elif group_id:
 | 
			
		||||
            group = self.get_security_group_from_id(group_id)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        source_groups = []
 | 
			
		||||
        for source_group_name in source_group_names:
 | 
			
		||||
            source_group = self.get_security_group_from_name(source_group_name, vpc_id)
 | 
			
		||||
            if source_group:
 | 
			
		||||
                source_groups.append(source_group)
 | 
			
		||||
 | 
			
		||||
        for source_group_id in source_group_ids:
 | 
			
		||||
            source_group = self.get_security_group_from_id(source_group_id)
 | 
			
		||||
            if source_group:
 | 
			
		||||
                source_groups.append(source_group)
 | 
			
		||||
 | 
			
		||||
        security_rule = SecurityRule(ip_protocol, from_port, to_port, ip_ranges, source_groups)
 | 
			
		||||
        if security_rule in group.ingress_rules:
 | 
			
		||||
            group.ingress_rules.remove(security_rule)
 | 
			
		||||
 | 
			
		||||
@ -5,7 +5,15 @@ from moto.ec2.models import ec2_backend
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def process_rules_from_querystring(querystring):
 | 
			
		||||
 | 
			
		||||
    name = None
 | 
			
		||||
    group_id = None
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        name = querystring.get('GroupName')[0]
 | 
			
		||||
    except:
 | 
			
		||||
        group_id = querystring.get('GroupId')[0]
 | 
			
		||||
 | 
			
		||||
    ip_protocol = querystring.get('IpPermissions.1.IpProtocol')[0]
 | 
			
		||||
    from_port = querystring.get('IpPermissions.1.FromPort')[0]
 | 
			
		||||
    to_port = querystring.get('IpPermissions.1.ToPort')[0]
 | 
			
		||||
@ -14,11 +22,17 @@ def process_rules_from_querystring(querystring):
 | 
			
		||||
        if 'IpPermissions.1.IpRanges' in key:
 | 
			
		||||
            ip_ranges.append(value[0])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    source_groups = []
 | 
			
		||||
    source_group_ids = []
 | 
			
		||||
 | 
			
		||||
    for key, value in querystring.iteritems():
 | 
			
		||||
        if 'IpPermissions.1.Groups' in key:
 | 
			
		||||
        if 'IpPermissions.1.Groups.1.GroupId' in key:
 | 
			
		||||
            source_group_ids.append(value[0])
 | 
			
		||||
        elif 'IpPermissions.1.Groups' in key:
 | 
			
		||||
            source_groups.append(value[0])
 | 
			
		||||
    return (name, ip_protocol, from_port, to_port, ip_ranges, source_groups)
 | 
			
		||||
 | 
			
		||||
    return (name, group_id, ip_protocol, from_port, to_port, ip_ranges, source_groups, source_group_ids)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SecurityGroups(BaseResponse):
 | 
			
		||||
 | 
			
		||||
@ -20,6 +20,7 @@ def test_create_and_describe_security_group():
 | 
			
		||||
    all_groups.should.have.length_of(1)
 | 
			
		||||
    all_groups[0].name.should.equal('test security group')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@mock_ec2
 | 
			
		||||
def test_create_and_describe_vpc_security_group():
 | 
			
		||||
    conn = boto.connect_ec2('the_key', 'the_secret')
 | 
			
		||||
@ -130,3 +131,19 @@ def test_authorize_other_group_and_revoke():
 | 
			
		||||
 | 
			
		||||
    security_group = [group for group in conn.get_all_security_groups() if group.name == 'test'][0]
 | 
			
		||||
    security_group.rules.should.have.length_of(0)
 | 
			
		||||
 | 
			
		||||
@mock_ec2
 | 
			
		||||
def test_authorize_group_in_vpc():
 | 
			
		||||
    conn = boto.connect_ec2('the_key', 'the_secret')
 | 
			
		||||
    vpc_id = "vpc-12345"
 | 
			
		||||
 | 
			
		||||
    # create 2 groups in a vpc
 | 
			
		||||
    security_group1 = conn.create_security_group('test1', 'test1', vpc_id)
 | 
			
		||||
    security_group2 = conn.create_security_group('test2', 'test2', vpc_id)
 | 
			
		||||
 | 
			
		||||
    success = security_group1.authorize(ip_protocol="tcp", from_port="22", to_port="2222", src_group=security_group2)
 | 
			
		||||
    success.should.be.true
 | 
			
		||||
    success = security_group1.revoke(ip_protocol="tcp", from_port="22", to_port="2222", src_group=security_group2)
 | 
			
		||||
    success.should.be.true
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user