Feature describe_security_group_rules for ec2 (#5841)

This commit is contained in:
Uzair Ali 2023-01-18 01:04:41 +05:30 committed by GitHub
parent 50da0d0667
commit ba78420314
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 93 additions and 0 deletions

View File

@ -520,6 +520,26 @@ class SecurityGroupBackend:
return matches
def describe_security_group_rules(self, group_ids=None, filters=None):
matches = itertools.chain(*[x.copy().values() for x in self.groups.values()])
if group_ids:
matches = [grp for grp in matches if grp.id in group_ids]
if len(group_ids) > len(matches):
unknown_ids = set(group_ids) - set(matches)
raise InvalidSecurityGroupNotFoundError(unknown_ids)
if filters:
matches = [grp for grp in matches if grp.matches_filters(filters)]
if not matches:
raise InvalidSecurityGroupNotFoundError(
"No security groups found matching the filters provided."
)
rules = []
for group in matches:
rules.extend(group.ingress_rules)
rules.extend(group.egress_rules)
return rules
def _delete_security_group(self, vpc_id, group_id):
vpc_id = vpc_id or self.default_vpc.id
if self.groups[vpc_id][group_id].enis:

View File

@ -194,6 +194,14 @@ class SecurityGroups(EC2BaseResponse):
template = self.response_template(DESCRIBE_SECURITY_GROUPS_RESPONSE)
return template.render(groups=groups)
def describe_security_group_rules(self):
group_id = self._get_param("GroupId")
filters = self._get_param("Filter")
if self.is_not_dryrun("DescribeSecurityGroups"):
rules = self.ec2_backend.describe_security_group_rules(group_id, filters)
template = self.response_template(DESCRIBE_SECURITY_GROUP_RULES_RESPONSE)
return template.render(rules=rules)
def revoke_security_group_egress(self):
if self.is_not_dryrun("RevokeSecurityGroupEgress"):
for args in self._process_rules_from_querystring():
@ -239,6 +247,28 @@ CREATE_SECURITY_GROUP_RESPONSE = """<CreateSecurityGroupResponse xmlns="http://e
</tagSet>
</CreateSecurityGroupResponse>"""
DESCRIBE_SECURITY_GROUP_RULES_RESPONSE = """
<DescribeSecurityGroupRulesResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
<requestId>{{ request_id }}</requestId>
<securityGroupRuleSet>
{% for rule in rules %}
<item>
{% if rule.from_port is not none %}
<fromPort>{{ rule.from_port }}</fromPort>
{% endif %}
{% if rule.to_port is not none %}
<toPort>{{ rule.to_port }}</toPort>
{% endif %}
<cidrIpv4>{{ rule.ip_ranges[0]['CidrIp'] }}</cidrIpv4>
<ipProtocol>{{ rule.ip_protocol }}</ipProtocol>
<groupOwnerId>{{ rule.owner_id }}</groupOwnerId>
<isEgress>true</isEgress>
<securityGroupRuleId>{{ rule.id }}</securityGroupRuleId>
</item>
{% endfor %}
</securityGroupRuleSet>
</DescribeSecurityGroupRulesResponse>"""
DELETE_GROUP_RESPONSE = """<DeleteSecurityGroupResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<return>true</return>

View File

@ -563,6 +563,49 @@ def test_authorize_all_protocols_with_no_port_specification():
permission.shouldnt.have.key("ToPort")
@mock_ec2
def test_create_and_describe_security_grp_rule():
ip_protocol = "tcp"
from_port = 27017
to_port = 27017
cidr_ip_range = "1.2.3.4/32"
ec2 = boto3.resource("ec2", "us-east-1")
client = boto3.client("ec2", "us-east-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg_name = str(uuid4())
sg = ec2.create_security_group(
Description="Test SG", GroupName=sg_name, VpcId=vpc.id
)
# Ingress rule
ip_permissions = [
{
"IpProtocol": ip_protocol,
"FromPort": from_port,
"ToPort": to_port,
"IpRanges": [{"CidrIp": cidr_ip_range}],
}
]
sgr = sg.authorize_ingress(IpPermissions=ip_permissions)
# Describing the ingress rule
sgr_id = sgr["SecurityGroupRules"][0]["SecurityGroupRuleId"]
response = client.describe_security_group_rules(
Filters=[{"Name": "ip-permission-id", "Values": [sgr_id]}]
)
ingress_rule = response["SecurityGroupRules"]
rule_found = False
for rule in ingress_rule:
if rule["SecurityGroupRuleId"] == sgr_id:
assert rule["IpProtocol"] == ip_protocol
assert rule["FromPort"] == from_port
assert rule["ToPort"] == to_port
assert rule["CidrIpv4"] == cidr_ip_range
rule_found = True
break
assert rule_found, True
@mock_ec2
@pytest.mark.parametrize("use_vpc", [True, False], ids=["Use VPC", "Without VPC"])
def test_sec_group_rule_limit(use_vpc):