Fix security group filters (#4079)

This commit is contained in:
James Light 2021-09-30 11:28:13 -04:00 committed by GitHub
parent 33e60a2d16
commit 73b7fcce26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 615 additions and 27 deletions

View File

@ -422,3 +422,15 @@ def merge_dicts(dict1, dict2, remove_nulls=False):
dict1[key] = dict2[key]
if dict1[key] is None and remove_nulls:
dict1.pop(key)
def glob_matches(pattern, string):
"""AWS API-style globbing regexes"""
pattern, n = re.subn(r"[^\\]\*", r".*", pattern)
pattern, m = re.subn(r"[^\\]\?", r".?", pattern)
pattern = ".*" + pattern + ".*"
if re.match(pattern, str(string)):
return True
return False

View File

@ -33,6 +33,7 @@ from moto.core.models import Model, BaseModel, CloudFormationModel
from moto.core.utils import (
iso_8601_datetime_with_milliseconds,
camelcase_to_underscores,
glob_matches,
)
from moto.core import ACCOUNT_ID
from moto.kms import kms_backends
@ -2173,7 +2174,9 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
):
self.ec2_backend = ec2_backend
self.id = group_id
self.group_id = self.id
self.name = name
self.group_name = self.name
self.description = description
self.ingress_rules = []
self.egress_rules = []
@ -2195,6 +2198,34 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
SecurityRule("-1", None, None, [{"CidrIpv6": "::/0"}], [])
)
# each filter as a simple function in a mapping
self.filters = {
"description": self.filter_description,
"egress.ip-permission.cidr": self.filter_egress__ip_permission__cidr,
"egress.ip-permission.from-port": self.filter_egress__ip_permission__from_port,
"egress.ip-permission.group-id": self.filter_egress__ip_permission__group_id,
"egress.ip-permission.group-name": self.filter_egress__ip_permission__group_name,
"egress.ip-permission.ipv6-cidr": self.filter_egress__ip_permission__ipv6_cidr,
"egress.ip-permission.prefix-list-id": self.filter_egress__ip_permission__prefix_list_id,
"egress.ip-permission.protocol": self.filter_egress__ip_permission__protocol,
"egress.ip-permission.to-port": self.filter_egress__ip_permission__to_port,
"egress.ip-permission.user-id": self.filter_egress__ip_permission__user_id,
"group-id": self.filter_group_id,
"group-name": self.filter_group_name,
"ip-permission.cidr": self.filter_ip_permission__cidr,
"ip-permission.from-port": self.filter_ip_permission__from_port,
"ip-permission.group-id": self.filter_ip_permission__group_id,
"ip-permission.group-name": self.filter_ip_permission__group_name,
"ip-permission.ipv6-cidr": self.filter_ip_permission__ipv6_cidr,
"ip-permission.prefix-list-id": self.filter_ip_permission__prefix_list_id,
"ip-permission.protocol": self.filter_ip_permission__protocol,
"ip-permission.to-port": self.filter_ip_permission__to_port,
"ip-permission.user-id": self.filter_ip_permission__user_id,
"owner-id": self.filter_owner_id,
"vpc-id": self.filter_vpc_id,
}
@staticmethod
def cloudformation_name_type():
return "GroupName"
@ -2280,38 +2311,160 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
def physical_resource_id(self):
return self.id
def matches_filter(self, key, filter_value):
def to_attr(filter_name):
attr = None
def filter_description(self, values):
for value in values:
if glob_matches(value, self.description):
return True
return False
if filter_name == "group-name":
attr = "name"
elif filter_name == "group-id":
attr = "id"
elif filter_name == "vpc-id":
attr = "vpc_id"
else:
attr = filter_name.replace("-", "_")
def filter_egress__ip_permission__cidr(self, values):
for value in values:
for rule in self.egress_rules:
for cidr in rule.ip_ranges:
if glob_matches(value, cidr):
return True
return False
return attr
if key.startswith("ip-permission"):
match = re.search(r"ip-permission.(.*)", key)
ingress_attr = to_attr(match.groups()[0])
for ingress in self.ingress_rules:
if str(getattr(ingress, ingress_attr)) in filter_value:
def filter_egress__ip_permission__from_port(self, values):
for value in values:
for rule in self.egress_rules:
if rule.ip_protocol != -1 and glob_matches(value, str(rule.from_port)):
return True
elif is_tag_filter(key):
return False
def filter_egress__ip_permission__group_id(self, values):
for value in values:
for rule in self.egress_rules:
for sg in rule.source_groups:
if glob_matches(value, sg.get("GroupId", None)):
return True
return False
def filter_egress__ip_permission__group_name(self, values):
for value in values:
for rule in self.egress_rules:
for group in rule.source_groups:
if glob_matches(value, group.get("GroupName", None)):
return True
return False
def filter_egress__ip_permission__ipv6_cidr(self, values):
raise MotoNotImplementedError("egress.ip-permission.ipv6-cidr filter")
def filter_egress__ip_permission__prefix_list_id(self, values):
raise MotoNotImplementedError("egress.ip-permission.prefix-list-id filter")
def filter_egress__ip_permission__protocol(self, values):
for value in values:
for rule in self.egress_rules:
if glob_matches(value, rule.ip_protocol):
return True
return False
def filter_egress__ip_permission__to_port(self, values):
for value in values:
for rule in self.egress_rules:
if glob_matches(value, rule.to_port):
return True
return False
def filter_egress__ip_permission__user_id(self, values):
for value in values:
for rule in self.egress_rules:
if glob_matches(value, rule.owner_id):
return True
return False
def filter_group_id(self, values):
for value in values:
if glob_matches(value, self.id):
return True
return False
def filter_group_name(self, values):
for value in values:
if glob_matches(value, self.group_name):
return True
return False
def filter_ip_permission__cidr(self, values):
for value in values:
for rule in self.ingress_rules:
for cidr in rule.ip_ranges:
if glob_matches(value, cidr):
return True
return False
def filter_ip_permission__from_port(self, values):
for value in values:
for rule in self.ingress_rules:
if glob_matches(value, rule.from_port):
return True
return False
def filter_ip_permission__group_id(self, values):
for value in values:
for rule in self.ingress_rules:
for group in rule.source_groups:
if glob_matches(value, group.get("GroupId", None)):
return True
return False
def filter_ip_permission__group_name(self, values):
for value in values:
for rule in self.ingress_rules:
for group in rule.source_groups:
if glob_matches(value, group.get("GroupName", None)):
return True
return False
def filter_ip_permission__ipv6_cidr(self, values):
raise MotoNotImplementedError("ip-permission.ipv6 filter")
def filter_ip_permission__prefix_list_id(self, values):
raise MotoNotImplementedError("ip-permission.prefix-list-id filter")
def filter_ip_permission__protocol(self, values):
for value in values:
for rule in self.ingress_rules:
if glob_matches(value, rule.protocol):
return True
return False
def filter_ip_permission__to_port(self, values):
for value in values:
for rule in self.ingress_rules:
if glob_matches(rule.to_port):
return True
return False
def filter_ip_permission__user_id(self, values):
for value in values:
for rule in self.ingress_rules:
if glob_matches(value, rule.owner_id):
return True
return False
def filter_owner_id(self, values):
for value in values:
if glob_matches(value, self.owner_id):
return True
return False
def filter_vpc_id(self, values):
for value in values:
if glob_matches(value, self.vpc_id):
return True
return False
def matches_filter(self, key, filter_value):
if is_tag_filter(key):
tag_value = self.get_filter_value(key)
if isinstance(filter_value, list):
return tag_filter_matches(self, key, filter_value)
return tag_value in filter_value
else:
attr_name = to_attr(key)
return getattr(self, attr_name) in filter_value
return False
return self.filters[key](filter_value)
def matches_filters(self, filters):
for key, value in filters.items():

View File

@ -152,7 +152,7 @@ def test_create_and_describe_vpc_security_group():
cm.value.status.should.equal(400)
cm.value.request_id.should_not.be.none
all_groups = conn.get_all_security_groups(filters={"vpc_id": [vpc_id]})
all_groups = conn.get_all_security_groups(filters={"vpc-id": [vpc_id]})
all_groups[0].vpc_id.should.equal(vpc_id)
@ -191,7 +191,7 @@ def test_create_and_describe_vpc_security_group_boto3():
all_groups.should.have.length_of(3) # 1 default, 1 vpc, 1 no-vpc
all_groups = client.describe_security_groups(
Filters=[{"Name": "vpc_id", "Values": [vpc_id]}]
Filters=[{"Name": "vpc-id", "Values": [vpc_id]}]
)["SecurityGroups"]
all_groups.should.have.length_of(1)
@ -856,7 +856,7 @@ def test_get_all_security_groups():
resp.should.have.length_of(1)
resp[0].id.should.equal(sg1.id)
resp = conn.get_all_security_groups(filters={"vpc_id": ["vpc-mjm05d27"]})
resp = conn.get_all_security_groups(filters={"vpc-id": ["vpc-mjm05d27"]})
resp.should.have.length_of(1)
resp[0].id.should.equal(sg1.id)
@ -1852,3 +1852,426 @@ def test_security_group_rules_added_via_the_backend_can_be_revoked_via_the_api()
).get("SecurityGroups")[0]
assert len(sg["IpPermissions"]) == 0
assert len(sg["IpPermissionsEgress"]) == 0
@mock_ec2
def test_filter_description():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.0.0/16")
sg1 = vpc.create_security_group(
Description="An Excellent Description", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
filter_to_match_group_1_description = {
"Name": "description",
"Values": ["Excellent"],
}
security_groups = ec2r.security_groups.filter(
Filters=[filter_to_match_group_1_description]
)
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_filter_egress__ip_permission__cidr():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
sg1.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7357,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "10.250.0.0/16"}, {"CidrIp": "10.251.0.0/16"}],
}
]
)
sg2.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7357,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "172.16.0.0/16"}, {"CidrIp": "172.17.0.0/16"}],
}
]
)
filter_to_match_group_1 = {
"Name": "egress.ip-permission.cidr",
"Values": ["10.250.0.0/16"],
}
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_filter_egress__ip_permission__from_port():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
sg1.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7359,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "10.250.0.0/16"}, {"CidrIp": "10.251.0.0/16"}],
}
]
)
sg2.authorize_egress(
IpPermissions=[
{
"FromPort": 8000,
"ToPort": 8020,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "172.16.0.0/16"}, {"CidrIp": "172.17.0.0/16"}],
}
]
)
filter_to_match_group_1 = {
"Name": "egress.ip-permission.from-port",
"Values": ["7357"],
}
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_filter_egress__ip_permission__group_id():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
sg3 = vpc.create_security_group(
Description="Yet Another Descriptive Description", GroupName="test-3"
)
sg4 = vpc.create_security_group(
Description="Such Description Much Described", GroupName="test-4"
)
sg1.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7359,
"IpProtocol": "tcp",
"UserIdGroupPairs": [{"GroupId": sg3.group_id}],
}
]
)
sg2.authorize_egress(
IpPermissions=[
{
"FromPort": 8000,
"ToPort": 8020,
"IpProtocol": "tcp",
"UserIdGroupPairs": [{"GroupId": sg4.group_id}],
}
]
)
filter_to_match_group_1 = {
"Name": "egress.ip-permission.group-id",
"Values": [sg3.group_id],
}
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_name():
"""
this fails to find the group in the AWS API, so we should also fail to find it
"""
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
sg3 = vpc.create_security_group(
Description="Yet Another Descriptive Description", GroupName="test-3"
)
sg4 = vpc.create_security_group(
Description="Such Description Much Described", GroupName="test-4"
)
sg1.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7359,
"IpProtocol": "tcp",
"UserIdGroupPairs": [{"GroupId": sg3.group_id}],
}
]
)
sg2.authorize_egress(
IpPermissions=[
{
"FromPort": 8000,
"ToPort": 8020,
"IpProtocol": "tcp",
"UserIdGroupPairs": [{"GroupId": sg4.group_id}],
}
]
)
filter_to_match_group_1 = {
"Name": "egress.ip-permission.group-name",
"Values": [sg3.group_name],
}
sg1.load()
sg2.load()
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert len(security_groups) == 0
@mock_ec2
def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_id():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
sg3 = vpc.create_security_group(
Description="Yet Another Descriptive Description", GroupName="test-3"
)
sg4 = vpc.create_security_group(
Description="Such Description Much Described", GroupName="test-4"
)
sg1.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7359,
"IpProtocol": "tcp",
"UserIdGroupPairs": [{"GroupId": sg3.group_id}],
}
]
)
sg2.authorize_egress(
IpPermissions=[
{
"FromPort": 8000,
"ToPort": 8020,
"IpProtocol": "tcp",
"UserIdGroupPairs": [{"GroupId": sg4.group_id}],
}
]
)
filter_to_match_group_1 = {
"Name": "egress.ip-permission.group-id",
"Values": [sg3.group_id],
}
sg1.load()
sg2.load()
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_filter_egress__ip_permission__protocol():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
sg1.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7359,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "10.250.0.0/16"}, {"CidrIp": "10.251.0.0/16"}],
}
]
)
sg2.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7359,
"IpProtocol": "udp",
"IpRanges": [{"CidrIp": "10.250.0.0/16"}, {"CidrIp": "10.251.0.0/16"}],
}
]
)
filter_to_match_group_1 = {
"Name": "egress.ip-permission.protocol",
"Values": ["tcp"],
}
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_filter_egress__ip_permission__to_port():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
sg1.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7359,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "10.250.0.0/16"}, {"CidrIp": "10.251.0.0/16"}],
}
]
)
sg2.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7360,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "172.16.0.0/16"}, {"CidrIp": "172.17.0.0/16"}],
}
]
)
filter_to_match_group_1 = {
"Name": "egress.ip-permission.to-port",
"Values": ["7359"],
}
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_get_groups_by_ippermissions_group_id_filter():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.0.0/16")
sg1 = vpc.create_security_group(Description="test", GroupName="test-1")
sg2 = vpc.create_security_group(Description="test", GroupName="test-2")
sg1_allows_sg2_ingress_rule = {
"IpProtocol": "tcp",
"FromPort": 31337,
"ToPort": 31337,
"UserIdGroupPairs": [{"GroupId": sg2.group_id, "VpcId": sg2.vpc_id}],
}
sg1.authorize_ingress(IpPermissions=[sg1_allows_sg2_ingress_rule])
# we should be able to describe security groups and filter for all the ones that contain
# a reference to another group ID
match_only_groups_whose_ingress_rules_refer_to_group_2 = {
"Name": "ip-permission.group-id",
"Values": [sg2.group_id],
}
security_groups = ec2r.security_groups.filter(
Filters=[match_only_groups_whose_ingress_rules_refer_to_group_2]
)
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_get_groups_by_ippermissions_group_id_filter_across_vpcs():
# setup 2 VPCs, each with a single Security Group
# where one security group authorizes the other sg (in another vpc) via GroupId
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc1 = ec2r.create_vpc(CidrBlock="10.250.0.0/16")
vpc2 = ec2r.create_vpc(CidrBlock="10.251.0.0/16")
sg1 = vpc1.create_security_group(Description="test", GroupName="test-1")
sg2 = vpc2.create_security_group(Description="test", GroupName="test-2")
sg1_allows_sg2_ingress_rule = {
"IpProtocol": "tcp",
"FromPort": 31337,
"ToPort": 31337,
"UserIdGroupPairs": [{"GroupId": sg2.group_id, "VpcId": sg2.vpc_id}],
}
sg1.authorize_ingress(IpPermissions=[sg1_allows_sg2_ingress_rule])
# we should be able to describe security groups and filter for all the ones that contain
# a reference to another group ID
match_only_groups_whose_ingress_rules_refer_to_group_2 = {
"Name": "ip-permission.group-id",
"Values": [sg2.group_id],
}
security_groups = ec2r.security_groups.filter(
Filters=[match_only_groups_whose_ingress_rules_refer_to_group_2]
)
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id