Security group egress ip permissions fix (#3250)

* Add support for Description in egress rule response

* Update SecurityGroup default egress rule ip range

* Remove extra commas

* Remove extra commas

* Lower docker package in Travis

* Add more lambda vars per PR 3247

* Remove code added in 3247

* Add tests for egress rules with Descriptions

* Reformat based on black

Co-authored-by: spillin <jmbollard@me.com>
This commit is contained in:
jmbollard 2020-08-26 08:27:45 -05:00 committed by GitHub
parent f744356da7
commit 2a27e457bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 13 deletions

View File

@ -1866,7 +1866,9 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
self.name = name
self.description = description
self.ingress_rules = []
self.egress_rules = [SecurityRule("-1", None, None, ["0.0.0.0/0"], [])]
self.egress_rules = [
SecurityRule("-1", None, None, [{"CidrIp": "0.0.0.0/0"}], [])
]
self.enis = {}
self.vpc_id = vpc_id
self.owner_id = OWNER_ID
@ -2266,13 +2268,16 @@ class SecurityGroupBackend(object):
if source_group:
source_groups.append(source_group)
for ip in ip_ranges:
ip_ranges = [ip.get("CidrIp") if ip.get("CidrIp") == "0.0.0.0/0" else ip]
# I don't believe this is required after changing the default egress rule
# to be {'CidrIp': '0.0.0.0/0'} instead of just '0.0.0.0/0'
# Not sure why this would return only the IP if it was 0.0.0.0/0 instead of
# the ip_range?
# for ip in ip_ranges:
# ip_ranges = [ip.get("CidrIp") if ip.get("CidrIp") == "0.0.0.0/0" else ip]
security_rule = SecurityRule(
ip_protocol, from_port, to_port, ip_ranges, source_groups
)
if security_rule in group.egress_rules:
group.egress_rules.remove(security_rule)
return security_rule

View File

@ -250,7 +250,10 @@ DESCRIBE_SECURITY_GROUPS_RESPONSE = (
<ipRanges>
{% for ip_range in rule.ip_ranges %}
<item>
<cidrIp>{{ ip_range }}</cidrIp>
<cidrIp>{{ ip_range['CidrIp'] }}</cidrIp>
{% if ip_range['Description'] %}
<description>{{ ip_range['Description'] }}</description>
{% endif %}
</item>
{% endfor %}
</ipRanges>

View File

@ -275,8 +275,9 @@ def test_authorize_ip_range_and_revoke():
int(egress_security_group.rules_egress[1].to_port).should.equal(2222)
actual_cidr = egress_security_group.rules_egress[1].grants[0].cidr_ip
# Deal with Python2 dict->unicode, instead of dict->string
actual_cidr = json.loads(actual_cidr.replace("u'", "'").replace("'", '"'))
actual_cidr.should.equal({"CidrIp": "123.123.123.123/32"})
if type(actual_cidr) == "unicode":
actual_cidr = json.loads(actual_cidr.replace("u'", "'").replace("'", '"'))
actual_cidr.should.equal("123.123.123.123/32")
# Wrong Cidr should throw error
egress_security_group.revoke.when.called_with(
@ -810,7 +811,9 @@ def test_authorize_and_revoke_in_bulk():
sg03 = ec2.create_security_group(
GroupName="sg03", Description="Test security group sg03"
)
sg04 = ec2.create_security_group(
GroupName="sg04", Description="Test security group sg04"
)
ip_permissions = [
{
"IpProtocol": "tcp",
@ -835,13 +838,31 @@ def test_authorize_and_revoke_in_bulk():
"UserIdGroupPairs": [{"GroupName": "sg03", "UserId": sg03.owner_id}],
"IpRanges": [],
},
{
"IpProtocol": "tcp",
"FromPort": 27015,
"ToPort": 27015,
"UserIdGroupPairs": [{"GroupName": "sg04", "UserId": sg04.owner_id}],
"IpRanges": [
{"CidrIp": "10.10.10.0/24", "Description": "Some Description"}
],
},
{
"IpProtocol": "tcp",
"FromPort": 27016,
"ToPort": 27016,
"UserIdGroupPairs": [{"GroupId": sg04.id, "UserId": sg04.owner_id}],
"IpRanges": [{"CidrIp": "10.10.10.0/24"}],
},
]
expected_ip_permissions = copy.deepcopy(ip_permissions)
expected_ip_permissions[1]["UserIdGroupPairs"][0]["GroupName"] = "sg02"
expected_ip_permissions[2]["UserIdGroupPairs"][0]["GroupId"] = sg03.id
expected_ip_permissions[3]["UserIdGroupPairs"][0]["GroupId"] = sg04.id
expected_ip_permissions[4]["UserIdGroupPairs"][0]["GroupName"] = "sg04"
sg01.authorize_ingress(IpPermissions=ip_permissions)
sg01.ip_permissions.should.have.length_of(3)
sg01.ip_permissions.should.have.length_of(5)
for ip_permission in expected_ip_permissions:
sg01.ip_permissions.should.contain(ip_permission)
@ -851,7 +872,7 @@ def test_authorize_and_revoke_in_bulk():
sg01.ip_permissions.shouldnt.contain(ip_permission)
sg01.authorize_egress(IpPermissions=ip_permissions)
sg01.ip_permissions_egress.should.have.length_of(4)
sg01.ip_permissions_egress.should.have.length_of(6)
for ip_permission in expected_ip_permissions:
sg01.ip_permissions_egress.should.contain(ip_permission)
@ -930,11 +951,10 @@ def test_revoke_security_group_egress():
sg.revoke_egress(
IpPermissions=[
{
"FromPort": 0,
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
"ToPort": 123,
},
"UserIdGroupPairs": [],
}
]
)