From 463472c2b2c91461699d6018887b7b691451cc43 Mon Sep 17 00:00:00 2001 From: Brian Pandola Date: Wed, 31 Mar 2021 11:33:36 -0700 Subject: [PATCH] Make security rules consistent between direct (backend) and indirect (api) boundaries (#3817) * Make security rules consistent between direct (backend) and indirect (api) boundaries Security rules added directly via the backend were unable to be revoked via the API because the port values were being stored as strings but were always coerced back to integers by the botocore model. `"0" != 0`, so the rules would never match, raising an `InvalidPermissionNotFoundError`. This change ensures that the port values for a security group rule are always of type `Union[int, None]`. No tests needed to be modified as a result of this change. A new test was added that explicitly covers the behavior that had been failing. * Skip test in server mode --- moto/ec2/models.py | 5 ++- moto/ec2/responses/security_groups.py | 8 ++-- tests/test_ec2/test_security_groups.py | 54 +++++++++++++++++++++++++- 3 files changed, 60 insertions(+), 7 deletions(-) diff --git a/moto/ec2/models.py b/moto/ec2/models.py index 32b0ad053..555ebc6a2 100644 --- a/moto/ec2/models.py +++ b/moto/ec2/models.py @@ -1984,10 +1984,11 @@ class SecurityRule(object): self.ip_protocol = ip_protocol self.ip_ranges = ip_ranges or [] self.source_groups = source_groups + self.from_port = self.to_port = None if ip_protocol != "-1": - self.from_port = from_port - self.to_port = to_port + self.from_port = int(from_port) + self.to_port = int(to_port) def __eq__(self, other): if self.ip_protocol != other.ip_protocol: diff --git a/moto/ec2/responses/security_groups.py b/moto/ec2/responses/security_groups.py index 5c0d1c852..60645e165 100644 --- a/moto/ec2/responses/security_groups.py +++ b/moto/ec2/responses/security_groups.py @@ -196,10 +196,10 @@ DESCRIBE_SECURITY_GROUPS_RESPONSE = ( {% for rule in group.ingress_rules %} {{ rule.ip_protocol }} - {% if rule.from_port %} + {% if rule.from_port is not none %} {{ rule.from_port }} {% endif %} - {% if rule.to_port %} + {% if rule.to_port is not none %} {{ rule.to_port }} {% endif %} @@ -230,10 +230,10 @@ DESCRIBE_SECURITY_GROUPS_RESPONSE = ( {% for rule in group.egress_rules %} {{ rule.ip_protocol }} - {% if rule.from_port %} + {% if rule.from_port is not none %} {{ rule.from_port }} {% endif %} - {% if rule.to_port %} + {% if rule.to_port is not none %} {{ rule.to_port }} {% endif %} diff --git a/tests/test_ec2/test_security_groups.py b/tests/test_ec2/test_security_groups.py index ae4c97589..6e7ecf24a 100644 --- a/tests/test_ec2/test_security_groups.py +++ b/tests/test_ec2/test_security_groups.py @@ -2,6 +2,7 @@ from __future__ import unicode_literals import copy import json +import unittest import pytest @@ -11,7 +12,8 @@ from botocore.exceptions import ClientError from boto.exception import EC2ResponseError import sure # noqa -from moto import mock_ec2, mock_ec2_deprecated +from moto import mock_ec2, mock_ec2_deprecated, settings +from moto.ec2 import ec2_backend @mock_ec2_deprecated @@ -985,3 +987,53 @@ def test_non_existent_security_group_raises_error_on_authorize(): authorize_func(GroupId=non_existent_sg, IpPermissions=[{}]) ex.value.response["Error"]["Code"].should.equal("InvalidGroup.NotFound") ex.value.response["Error"]["Message"].should.equal(expected_error) + + +@mock_ec2 +def test_security_group_rules_added_via_the_backend_can_be_revoked_via_the_api(): + if settings.TEST_SERVER_MODE: + raise unittest.SkipTest("Can't test backend directly in server mode.") + ec2_resource = boto3.resource("ec2", region_name="us-east-1") + ec2_client = boto3.client("ec2", region_name="us-east-1") + vpc = ec2_resource.create_vpc(CidrBlock="10.0.0.0/16") + group_name = "test-backend-authorize" + sg = ec2_resource.create_security_group( + GroupName=group_name, Description="test", VpcId=vpc.id + ) + # Add an ingress/egress rule using the EC2 backend directly. + rule_ingress = { + "group_name_or_id": sg.id, + "from_port": 0, + "ip_protocol": "udp", + "ip_ranges": [], + "to_port": 65535, + "source_group_ids": [sg.id], + } + ec2_backend.authorize_security_group_ingress(**rule_ingress) + rule_egress = { + "group_name_or_id": sg.id, + "from_port": 8443, + "ip_protocol": "tcp", + "ip_ranges": [], + "to_port": 8443, + "source_group_ids": [sg.id], + } + ec2_backend.authorize_security_group_egress(**rule_egress) + # Both rules (plus the default egress) should now be present. + sg = ec2_client.describe_security_groups( + Filters=[{"Name": "group-name", "Values": [group_name]}] + ).get("SecurityGroups")[0] + assert len(sg["IpPermissions"]) == 1 + assert len(sg["IpPermissionsEgress"]) == 2 + # Revoking via the API should work for all rules (even those we added directly). + ec2_client.revoke_security_group_egress( + GroupId=sg["GroupId"], IpPermissions=sg["IpPermissionsEgress"] + ) + ec2_client.revoke_security_group_ingress( + GroupId=sg["GroupId"], IpPermissions=sg["IpPermissions"] + ) + sg = ec2_client.describe_security_groups( + Filters=[{"Name": "group-name", "Values": [group_name]}] + ).get("SecurityGroups")[0] + assert len(sg["IpPermissions"]) == 0 + assert len(sg["IpPermissionsEgress"]) == 0