From dc460a325839bc6797084a54afc297a2c9d87e63 Mon Sep 17 00:00:00 2001 From: Josh Kropf Date: Thu, 6 Apr 2023 14:59:18 -0400 Subject: [PATCH] EC2: Fix egress rules used in ingress revoke method (#6180) --- moto/ec2/models/security_groups.py | 2 +- tests/test_ec2/test_security_groups.py | 49 ++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/moto/ec2/models/security_groups.py b/moto/ec2/models/security_groups.py index 1f19ce8a7..1e458e751 100644 --- a/moto/ec2/models/security_groups.py +++ b/moto/ec2/models/security_groups.py @@ -733,7 +733,7 @@ class SecurityGroupBackend: if security_rule_ids: group.ingress_rules = [ - rule for rule in group.egress_rules if rule.id not in security_rule_ids + rule for rule in group.ingress_rules if rule.id not in security_rule_ids ] return diff --git a/tests/test_ec2/test_security_groups.py b/tests/test_ec2/test_security_groups.py index c09a680de..c7cc6dc87 100644 --- a/tests/test_ec2/test_security_groups.py +++ b/tests/test_ec2/test_security_groups.py @@ -1820,3 +1820,52 @@ def test_filter_group_name(): security_groups = list(security_groups) assert len(security_groups) == 1 assert security_groups[0].group_name == sg1.group_name + + +@mock_ec2 +def test_revoke_security_group_ingress(): + ec2 = boto3.client("ec2", region_name="us-east-1") + + vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") + + sg = ec2.create_security_group( + Description="Test SG", GroupName=str(uuid4()), VpcId=vpc["Vpc"]["VpcId"] + ) + sg_id = sg["GroupId"] + + ec2.authorize_security_group_ingress( + GroupId=sg_id, + IpPermissions=[ + { + "FromPort": 3000, + "ToPort": 3300, + "IpProtocol": "TCP", + "IpRanges": [{"CidrIp": "10.0.0.1/32"}], + }, + { + "FromPort": 8080, + "ToPort": 8080, + "IpProtocol": "TCP", + "IpRanges": [{"CidrIp": "10.0.0.1/32"}], + }, + ], + ) + + response = ec2.describe_security_group_rules( + Filters=[{"Name": "group-id", "Values": [sg_id]}] + ) + + ingress_rules = [r for r in response["SecurityGroupRules"] if not r["IsEgress"]] + assert len(ingress_rules) == 2 + + # revoke 1 of the 2 ingress rules + ec2.revoke_security_group_ingress( + GroupId=sg_id, SecurityGroupRuleIds=[ingress_rules[0]["SecurityGroupRuleId"]] + ) + + response = ec2.describe_security_group_rules( + Filters=[{"Name": "group-id", "Values": [sg_id]}] + ) + + ingress_rules = [r for r in response["SecurityGroupRules"] if not r["IsEgress"]] + assert len(ingress_rules) == 1