Make security rules consistent between direct (backend) and indirect (api) boundaries (#3817)

* Make security rules consistent between direct (backend) and indirect (api) boundaries

Security rules added directly via the backend were unable to be revoked via the API
because the port values were being stored as strings but were always coerced back
to integers by the botocore model.  `"0" != 0`, so the rules would never match,
raising an `InvalidPermissionNotFoundError`.

This change ensures that the port values for a security group rule are always of type
`Union[int, None]`.

No tests needed to be modified as a result of this change.  A new test was added that
explicitly covers the behavior that had been failing.

* Skip test in server mode
This commit is contained in:
Brian Pandola 2021-03-31 11:33:36 -07:00 committed by GitHub
parent ec168cf24c
commit 463472c2b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 60 additions and 7 deletions

View File

@ -1984,10 +1984,11 @@ class SecurityRule(object):
self.ip_protocol = ip_protocol
self.ip_ranges = ip_ranges or []
self.source_groups = source_groups
self.from_port = self.to_port = None
if ip_protocol != "-1":
self.from_port = from_port
self.to_port = to_port
self.from_port = int(from_port)
self.to_port = int(to_port)
def __eq__(self, other):
if self.ip_protocol != other.ip_protocol:

View File

@ -196,10 +196,10 @@ DESCRIBE_SECURITY_GROUPS_RESPONSE = (
{% for rule in group.ingress_rules %}
<item>
<ipProtocol>{{ rule.ip_protocol }}</ipProtocol>
{% if rule.from_port %}
{% if rule.from_port is not none %}
<fromPort>{{ rule.from_port }}</fromPort>
{% endif %}
{% if rule.to_port %}
{% if rule.to_port is not none %}
<toPort>{{ rule.to_port }}</toPort>
{% endif %}
<groups>
@ -230,10 +230,10 @@ DESCRIBE_SECURITY_GROUPS_RESPONSE = (
{% for rule in group.egress_rules %}
<item>
<ipProtocol>{{ rule.ip_protocol }}</ipProtocol>
{% if rule.from_port %}
{% if rule.from_port is not none %}
<fromPort>{{ rule.from_port }}</fromPort>
{% endif %}
{% if rule.to_port %}
{% if rule.to_port is not none %}
<toPort>{{ rule.to_port }}</toPort>
{% endif %}
<groups>

View File

@ -2,6 +2,7 @@ from __future__ import unicode_literals
import copy
import json
import unittest
import pytest
@ -11,7 +12,8 @@ from botocore.exceptions import ClientError
from boto.exception import EC2ResponseError
import sure # noqa
from moto import mock_ec2, mock_ec2_deprecated
from moto import mock_ec2, mock_ec2_deprecated, settings
from moto.ec2 import ec2_backend
@mock_ec2_deprecated
@ -985,3 +987,53 @@ def test_non_existent_security_group_raises_error_on_authorize():
authorize_func(GroupId=non_existent_sg, IpPermissions=[{}])
ex.value.response["Error"]["Code"].should.equal("InvalidGroup.NotFound")
ex.value.response["Error"]["Message"].should.equal(expected_error)
@mock_ec2
def test_security_group_rules_added_via_the_backend_can_be_revoked_via_the_api():
if settings.TEST_SERVER_MODE:
raise unittest.SkipTest("Can't test backend directly in server mode.")
ec2_resource = boto3.resource("ec2", region_name="us-east-1")
ec2_client = boto3.client("ec2", region_name="us-east-1")
vpc = ec2_resource.create_vpc(CidrBlock="10.0.0.0/16")
group_name = "test-backend-authorize"
sg = ec2_resource.create_security_group(
GroupName=group_name, Description="test", VpcId=vpc.id
)
# Add an ingress/egress rule using the EC2 backend directly.
rule_ingress = {
"group_name_or_id": sg.id,
"from_port": 0,
"ip_protocol": "udp",
"ip_ranges": [],
"to_port": 65535,
"source_group_ids": [sg.id],
}
ec2_backend.authorize_security_group_ingress(**rule_ingress)
rule_egress = {
"group_name_or_id": sg.id,
"from_port": 8443,
"ip_protocol": "tcp",
"ip_ranges": [],
"to_port": 8443,
"source_group_ids": [sg.id],
}
ec2_backend.authorize_security_group_egress(**rule_egress)
# Both rules (plus the default egress) should now be present.
sg = ec2_client.describe_security_groups(
Filters=[{"Name": "group-name", "Values": [group_name]}]
).get("SecurityGroups")[0]
assert len(sg["IpPermissions"]) == 1
assert len(sg["IpPermissionsEgress"]) == 2
# Revoking via the API should work for all rules (even those we added directly).
ec2_client.revoke_security_group_egress(
GroupId=sg["GroupId"], IpPermissions=sg["IpPermissionsEgress"]
)
ec2_client.revoke_security_group_ingress(
GroupId=sg["GroupId"], IpPermissions=sg["IpPermissions"]
)
sg = ec2_client.describe_security_groups(
Filters=[{"Name": "group-name", "Values": [group_name]}]
).get("SecurityGroups")[0]
assert len(sg["IpPermissions"]) == 0
assert len(sg["IpPermissionsEgress"]) == 0