From ba784203142a4e8a4e1c3e34987a29c159e3c231 Mon Sep 17 00:00:00 2001 From: Uzair Ali <72073401+uzaxirr@users.noreply.github.com> Date: Wed, 18 Jan 2023 01:04:41 +0530 Subject: [PATCH] Feature `describe_security_group_rules` for `ec2` (#5841) --- moto/ec2/models/security_groups.py | 20 ++++++++++++ moto/ec2/responses/security_groups.py | 30 ++++++++++++++++++ tests/test_ec2/test_security_groups.py | 43 ++++++++++++++++++++++++++ 3 files changed, 93 insertions(+) diff --git a/moto/ec2/models/security_groups.py b/moto/ec2/models/security_groups.py index b662374ea..31bee8255 100644 --- a/moto/ec2/models/security_groups.py +++ b/moto/ec2/models/security_groups.py @@ -520,6 +520,26 @@ class SecurityGroupBackend: return matches + def describe_security_group_rules(self, group_ids=None, filters=None): + matches = itertools.chain(*[x.copy().values() for x in self.groups.values()]) + if group_ids: + matches = [grp for grp in matches if grp.id in group_ids] + if len(group_ids) > len(matches): + unknown_ids = set(group_ids) - set(matches) + raise InvalidSecurityGroupNotFoundError(unknown_ids) + if filters: + matches = [grp for grp in matches if grp.matches_filters(filters)] + if not matches: + raise InvalidSecurityGroupNotFoundError( + "No security groups found matching the filters provided." + ) + rules = [] + for group in matches: + rules.extend(group.ingress_rules) + rules.extend(group.egress_rules) + + return rules + def _delete_security_group(self, vpc_id, group_id): vpc_id = vpc_id or self.default_vpc.id if self.groups[vpc_id][group_id].enis: diff --git a/moto/ec2/responses/security_groups.py b/moto/ec2/responses/security_groups.py index c14beadfa..236e270ef 100644 --- a/moto/ec2/responses/security_groups.py +++ b/moto/ec2/responses/security_groups.py @@ -194,6 +194,14 @@ class SecurityGroups(EC2BaseResponse): template = self.response_template(DESCRIBE_SECURITY_GROUPS_RESPONSE) return template.render(groups=groups) + def describe_security_group_rules(self): + group_id = self._get_param("GroupId") + filters = self._get_param("Filter") + if self.is_not_dryrun("DescribeSecurityGroups"): + rules = self.ec2_backend.describe_security_group_rules(group_id, filters) + template = self.response_template(DESCRIBE_SECURITY_GROUP_RULES_RESPONSE) + return template.render(rules=rules) + def revoke_security_group_egress(self): if self.is_not_dryrun("RevokeSecurityGroupEgress"): for args in self._process_rules_from_querystring(): @@ -239,6 +247,28 @@ CREATE_SECURITY_GROUP_RESPONSE = """ + {{ request_id }} + + {% for rule in rules %} + + {% if rule.from_port is not none %} + {{ rule.from_port }} + {% endif %} + {% if rule.to_port is not none %} + {{ rule.to_port }} + {% endif %} + {{ rule.ip_ranges[0]['CidrIp'] }} + {{ rule.ip_protocol }} + {{ rule.owner_id }} + true + {{ rule.id }} + + {% endfor %} + +""" + DELETE_GROUP_RESPONSE = """ 59dbff89-35bd-4eac-99ed-be587EXAMPLE true diff --git a/tests/test_ec2/test_security_groups.py b/tests/test_ec2/test_security_groups.py index dba97645a..f88c1ae3e 100644 --- a/tests/test_ec2/test_security_groups.py +++ b/tests/test_ec2/test_security_groups.py @@ -563,6 +563,49 @@ def test_authorize_all_protocols_with_no_port_specification(): permission.shouldnt.have.key("ToPort") +@mock_ec2 +def test_create_and_describe_security_grp_rule(): + ip_protocol = "tcp" + from_port = 27017 + to_port = 27017 + cidr_ip_range = "1.2.3.4/32" + + ec2 = boto3.resource("ec2", "us-east-1") + client = boto3.client("ec2", "us-east-1") + vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") + sg_name = str(uuid4()) + sg = ec2.create_security_group( + Description="Test SG", GroupName=sg_name, VpcId=vpc.id + ) + + # Ingress rule + ip_permissions = [ + { + "IpProtocol": ip_protocol, + "FromPort": from_port, + "ToPort": to_port, + "IpRanges": [{"CidrIp": cidr_ip_range}], + } + ] + sgr = sg.authorize_ingress(IpPermissions=ip_permissions) + # Describing the ingress rule + sgr_id = sgr["SecurityGroupRules"][0]["SecurityGroupRuleId"] + response = client.describe_security_group_rules( + Filters=[{"Name": "ip-permission-id", "Values": [sgr_id]}] + ) + ingress_rule = response["SecurityGroupRules"] + rule_found = False + for rule in ingress_rule: + if rule["SecurityGroupRuleId"] == sgr_id: + assert rule["IpProtocol"] == ip_protocol + assert rule["FromPort"] == from_port + assert rule["ToPort"] == to_port + assert rule["CidrIpv4"] == cidr_ip_range + rule_found = True + break + assert rule_found, True + + @mock_ec2 @pytest.mark.parametrize("use_vpc", [True, False], ids=["Use VPC", "Without VPC"]) def test_sec_group_rule_limit(use_vpc):