Merge pull request #2596 from gruebel/fix-ec2-revoke-security-group-egress

Fix ec2.revoke_security_group_egress for IpProtocol -1
This commit is contained in:
Mike Grima 2019-12-09 14:06:09 -08:00 committed by GitHub
commit b2264feac5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 13 deletions

View File

@ -1644,23 +1644,27 @@ class RegionsAndZonesBackend(object):
class SecurityRule(object): class SecurityRule(object):
def __init__(self, ip_protocol, from_port, to_port, ip_ranges, source_groups): def __init__(self, ip_protocol, from_port, to_port, ip_ranges, source_groups):
self.ip_protocol = ip_protocol self.ip_protocol = ip_protocol
self.from_port = from_port
self.to_port = to_port
self.ip_ranges = ip_ranges or [] self.ip_ranges = ip_ranges or []
self.source_groups = source_groups self.source_groups = source_groups
@property if ip_protocol != "-1":
def unique_representation(self): self.from_port = from_port
return "{0}-{1}-{2}-{3}-{4}".format( self.to_port = to_port
self.ip_protocol,
self.from_port,
self.to_port,
self.ip_ranges,
self.source_groups,
)
def __eq__(self, other): def __eq__(self, other):
return self.unique_representation == other.unique_representation if self.ip_protocol != other.ip_protocol:
return False
if self.ip_ranges != other.ip_ranges:
return False
if self.source_groups != other.source_groups:
return False
if self.ip_protocol != "-1":
if self.from_port != other.from_port:
return False
if self.to_port != other.to_port:
return False
return True
class SecurityGroup(TaggedEC2Resource): class SecurityGroup(TaggedEC2Resource):
@ -1670,7 +1674,7 @@ class SecurityGroup(TaggedEC2Resource):
self.name = name self.name = name
self.description = description self.description = description
self.ingress_rules = [] self.ingress_rules = []
self.egress_rules = [SecurityRule(-1, None, None, ["0.0.0.0/0"], [])] self.egress_rules = [SecurityRule("-1", None, None, ["0.0.0.0/0"], [])]
self.enis = {} self.enis = {}
self.vpc_id = vpc_id self.vpc_id = vpc_id
self.owner_id = OWNER_ID self.owner_id = OWNER_ID

View File

@ -833,3 +833,33 @@ def test_get_all_security_groups_filter_with_same_vpc_id():
cm.exception.code.should.equal("InvalidGroup.NotFound") cm.exception.code.should.equal("InvalidGroup.NotFound")
cm.exception.status.should.equal(400) cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none cm.exception.request_id.should_not.be.none
@mock_ec2
def test_revoke_security_group_egress():
ec2 = boto3.resource("ec2", "us-east-1")
sg = ec2.create_security_group(Description="Test SG", GroupName="test-sg")
sg.ip_permissions_egress.should.equal(
[
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
"UserIdGroupPairs": [],
}
]
)
sg.revoke_egress(
IpPermissions=[
{
"FromPort": 0,
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "0.0.0.0/0"},],
"ToPort": 123,
},
]
)
sg.reload()
sg.ip_permissions_egress.should.have.length_of(0)