moto/tests/test_ec2/test_security_groups.py

1891 lines
64 KiB
Python

import copy
import json
import unittest
from random import randint
from unittest import SkipTest
from uuid import uuid4
import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_ec2, settings
from moto.core import DEFAULT_ACCOUNT_ID
from moto.ec2 import ec2_backends
@mock_ec2
def test_create_and_describe_security_group():
ec2 = boto3.resource("ec2", "us-west-1")
client = boto3.client("ec2", "us-west-1")
with pytest.raises(ClientError) as ex:
client.create_security_group(GroupName="test", Description="test", DryRun=True)
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 412
assert ex.value.response["Error"]["Code"] == "DryRunOperation"
assert (
ex.value.response["Error"]["Message"]
== "An error occurred (DryRunOperation) when calling the CreateSecurityGroup operation: Request would have succeeded, but DryRun flag is set"
)
sec_name = str(uuid4())
security_group = ec2.create_security_group(GroupName=sec_name, Description="test")
assert security_group.group_name == sec_name
assert security_group.description == "test"
# Trying to create another group with the same name should throw an error
with pytest.raises(ClientError) as ex:
client.create_security_group(GroupName=sec_name, Description="n/a")
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert "RequestId" in ex.value.response["ResponseMetadata"]
assert ex.value.response["Error"]["Code"] == "InvalidGroup.Duplicate"
all_groups = retrieve_all_sgs(client)
# The default group gets created automatically
assert security_group.id in [g["GroupId"] for g in all_groups]
group_names = set([group["GroupName"] for group in all_groups])
assert "default" in group_names
assert sec_name in group_names
@mock_ec2
def test_create_security_group_without_description_raises_error():
ec2 = boto3.resource("ec2", "us-west-1")
with pytest.raises(ClientError) as ex:
ec2.create_security_group(GroupName="test security group", Description="")
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert "RequestId" in ex.value.response["ResponseMetadata"]
assert ex.value.response["Error"]["Code"] == "MissingParameter"
@mock_ec2
def test_default_security_group():
client = boto3.client("ec2", "us-west-1")
groups = retrieve_all_sgs(client)
assert "default" in [g["GroupName"] for g in groups]
@mock_ec2
def test_create_and_describe_vpc_security_group():
ec2 = boto3.resource("ec2", "us-west-1")
client = boto3.client("ec2", "us-west-1")
name = str(uuid4())
vpc_id = f"vpc-{str(uuid4())[0:6]}"
group_with = ec2.create_security_group(
GroupName=name, Description="test", VpcId=vpc_id
)
assert group_with.vpc_id == vpc_id
assert group_with.group_name == name
assert group_with.description == "test"
# Trying to create another group with the same name in the same VPC should
# throw an error
with pytest.raises(ClientError) as ex:
ec2.create_security_group(GroupName=name, Description="n/a", VpcId=vpc_id)
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert "RequestId" in ex.value.response["ResponseMetadata"]
assert ex.value.response["Error"]["Code"] == "InvalidGroup.Duplicate"
# Trying to create another group in the same name without VPC should pass
group_without = ec2.create_security_group(
GroupName=name, Description="non-vpc-group"
)
all_groups = retrieve_all_sgs(client)
assert group_with.id in [a["GroupId"] for a in all_groups]
assert group_without.id in [a["GroupId"] for a in all_groups]
all_groups = client.describe_security_groups(
Filters=[{"Name": "vpc-id", "Values": [vpc_id]}]
)["SecurityGroups"]
assert len(all_groups) == 1
assert all_groups[0]["VpcId"] == vpc_id
assert all_groups[0]["GroupName"] == name
@mock_ec2
def test_create_two_security_groups_with_same_name_in_different_vpc():
ec2 = boto3.resource("ec2", "us-east-1")
client = boto3.client("ec2", "us-east-1")
name = str(uuid4())
vpc_id = "vpc-5300000c"
vpc_id2 = "vpc-5300000d"
sg1 = ec2.create_security_group(GroupName=name, Description="n/a 1", VpcId=vpc_id)
sg2 = ec2.create_security_group(GroupName=name, Description="n/a 2", VpcId=vpc_id2)
all_groups = retrieve_all_sgs(client)
group_ids = [group["GroupId"] for group in all_groups]
assert sg1.id in group_ids
assert sg2.id in group_ids
group_names = [group["GroupName"] for group in all_groups]
assert name in group_names
@mock_ec2
def test_create_two_security_groups_in_vpc_with_ipv6_enabled():
ec2 = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16", AmazonProvidedIpv6CidrBlock=True)
security_group = ec2.create_security_group(
GroupName=str(uuid4()), Description="Test security group sg01", VpcId=vpc.id
)
# The security group must have two defaul egress rules (one for ipv4 and aonther for ipv6)
assert len(security_group.ip_permissions_egress) == 2
@mock_ec2
def test_deleting_security_groups():
ec2 = boto3.resource("ec2", "us-west-1")
client = boto3.client("ec2", "us-west-1")
sg_name1 = str(uuid4())
sg_name2 = str(uuid4())
group1 = ec2.create_security_group(GroupName=sg_name1, Description="test desc 1")
group2 = ec2.create_security_group(GroupName=sg_name2, Description="test desc 2")
all_groups = retrieve_all_sgs(client)
assert group1.id in [g["GroupId"] for g in all_groups]
assert group2.id in [g["GroupId"] for g in all_groups]
# Deleting a group that doesn't exist should throw an error
with pytest.raises(ClientError) as ex:
client.delete_security_group(GroupName="foobar")
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert "RequestId" in ex.value.response["ResponseMetadata"]
assert ex.value.response["Error"]["Code"] == "InvalidGroup.NotFound"
# Delete by name
with pytest.raises(ClientError) as ex:
client.delete_security_group(GroupName=sg_name2, DryRun=True)
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 412
assert ex.value.response["Error"]["Code"] == "DryRunOperation"
assert (
ex.value.response["Error"]["Message"]
== "An error occurred (DryRunOperation) when calling the DeleteSecurityGroup operation: Request would have succeeded, but DryRun flag is set"
)
client.delete_security_group(GroupName=sg_name2)
all_groups = retrieve_all_sgs(client)
assert group1.id in [g["GroupId"] for g in all_groups]
assert group2.id not in [g["GroupId"] for g in all_groups]
# Delete by group id
client.delete_security_group(GroupId=group1.id)
all_groups = retrieve_all_sgs(client)
assert group1.id not in [g["GroupId"] for g in all_groups]
@mock_ec2
def test_delete_security_group_in_vpc():
ec2 = boto3.resource("ec2", "us-west-1")
client = boto3.client("ec2", "us-west-1")
group = ec2.create_security_group(
GroupName=str(uuid4()), Description="test1", VpcId="vpc-12345"
)
all_groups = retrieve_all_sgs(client)
assert group.id in [g["GroupId"] for g in all_groups]
# this should not throw an exception
client.delete_security_group(GroupId=group.id)
all_groups = retrieve_all_sgs(client)
assert group.id not in [g["GroupId"] for g in all_groups]
@mock_ec2
def test_authorize_ip_range_and_revoke():
ec2 = boto3.resource("ec2", "us-west-1")
client = boto3.client("ec2", "us-west-1")
security_group = ec2.create_security_group(
GroupName=str(uuid4()), Description="test"
)
with pytest.raises(ClientError) as ex:
security_group.authorize_ingress(
IpProtocol="tcp",
FromPort=22,
ToPort=2222,
CidrIp="123.123.123.123/32",
DryRun=True,
)
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 412
assert ex.value.response["Error"]["Code"] == "DryRunOperation"
assert (
ex.value.response["Error"]["Message"]
== "An error occurred (DryRunOperation) when calling the AuthorizeSecurityGroupIngress operation: Request would have succeeded, but DryRun flag is set"
)
ingress_permissions = [
{
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 2222,
"IpRanges": [{"CidrIp": "123.123.123.123/32"}],
}
]
security_group.authorize_ingress(IpPermissions=ingress_permissions)
assert len(security_group.ip_permissions) == 1
assert security_group.ip_permissions[0]["ToPort"] == 2222
assert security_group.ip_permissions[0]["IpProtocol"] == "tcp"
assert security_group.ip_permissions[0]["IpRanges"] == [
{"CidrIp": "123.123.123.123/32"}
]
# Wrong Cidr should throw error
with pytest.raises(ClientError) as ex:
wrong_permissions = copy.deepcopy(ingress_permissions)
wrong_permissions[0]["IpRanges"][0]["CidrIp"] = "123.123.123.122/32"
security_group.revoke_ingress(IpPermissions=wrong_permissions)
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert "RequestId" in ex.value.response["ResponseMetadata"]
assert ex.value.response["Error"]["Code"] == "InvalidPermission.NotFound"
# Actually revoke
with pytest.raises(ClientError) as ex:
security_group.revoke_ingress(IpPermissions=ingress_permissions, DryRun=True)
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 412
assert ex.value.response["Error"]["Code"] == "DryRunOperation"
assert (
ex.value.response["Error"]["Message"]
== "An error occurred (DryRunOperation) when calling the RevokeSecurityGroupIngress operation: Request would have succeeded, but DryRun flag is set"
)
security_group.revoke_ingress(IpPermissions=ingress_permissions)
assert len(security_group.ip_permissions) == 0
# Test for egress as well
egress_security_group = ec2.create_security_group(
GroupName=str(uuid4()), Description="desc", VpcId="vpc-3432589"
)
egress_permissions = [
{
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 2222,
"IpRanges": [{"CidrIp": "123.123.123.123/32"}],
}
]
with pytest.raises(ClientError) as ex:
egress_security_group.authorize_egress(
IpPermissions=egress_permissions, DryRun=True
)
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 412
assert ex.value.response["Error"]["Code"] == "DryRunOperation"
assert (
ex.value.response["Error"]["Message"]
== "An error occurred (DryRunOperation) when calling the AuthorizeSecurityGroupEgress operation: Request would have succeeded, but DryRun flag is set"
)
egress_security_group.authorize_egress(IpPermissions=egress_permissions)
assert egress_security_group.ip_permissions_egress[0]["FromPort"] == 22
assert egress_security_group.ip_permissions_egress[0]["IpProtocol"] == "tcp"
assert egress_security_group.ip_permissions_egress[0]["ToPort"] == 2222
assert egress_security_group.ip_permissions_egress[0]["IpRanges"] == [
{"CidrIp": "123.123.123.123/32"}
]
# Wrong Cidr should throw error
with pytest.raises(ClientError) as ex:
wrong_permissions = copy.deepcopy(egress_permissions)
wrong_permissions[0]["IpRanges"][0]["CidrIp"] = "123.123.123.122/32"
security_group.revoke_egress(IpPermissions=wrong_permissions)
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert "RequestId" in ex.value.response["ResponseMetadata"]
assert ex.value.response["Error"]["Code"] == "InvalidPermission.NotFound"
# Actually revoke
with pytest.raises(ClientError) as ex:
egress_security_group.revoke_egress(
IpPermissions=egress_permissions, DryRun=True
)
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 412
assert ex.value.response["Error"]["Code"] == "DryRunOperation"
assert (
ex.value.response["Error"]["Message"]
== "An error occurred (DryRunOperation) when calling the RevokeSecurityGroupEgress operation: Request would have succeeded, but DryRun flag is set"
)
egress_security_group.revoke_egress(IpPermissions=egress_permissions)
egress_security_group = client.describe_security_groups()["SecurityGroups"][0]
# There is still the default outbound rule
assert len(egress_security_group["IpPermissionsEgress"]) == 1
@mock_ec2
def test_authorize_other_group_and_revoke():
ec2 = boto3.resource("ec2", "us-west-1")
client = boto3.client("ec2", "us-west-1")
sg_name = str(uuid4())
security_group = ec2.create_security_group(
GroupName=sg_name, Description="test desc"
)
other_security_group = ec2.create_security_group(
GroupName=str(uuid4()), Description="other"
)
ec2.create_security_group(GroupName=str(uuid4()), Description="wrong")
# Note: Should be easier to use the SourceSecurityGroupNames-parameter, but that's not supported atm
permissions = [
{
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 2222,
"UserIdGroupPairs": [
{
"GroupId": other_security_group.id,
"GroupName": other_security_group.group_name,
"UserId": other_security_group.owner_id,
}
],
}
]
security_group.authorize_ingress(IpPermissions=permissions)
found_sec_group = client.describe_security_groups(GroupNames=[sg_name])[
"SecurityGroups"
][0]
assert found_sec_group["IpPermissions"][0]["ToPort"] == 2222
assert (
found_sec_group["IpPermissions"][0]["UserIdGroupPairs"][0]["GroupId"]
== other_security_group.id
)
# Wrong source group should throw error
with pytest.raises(ClientError) as ex:
wrong_permissions = copy.deepcopy(permissions)
wrong_permissions[0]["UserIdGroupPairs"][0]["GroupId"] = "unknown"
security_group.revoke_ingress(IpPermissions=wrong_permissions)
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert "RequestId" in ex.value.response["ResponseMetadata"]
assert ex.value.response["Error"]["Code"] == "InvalidGroup.NotFound"
# Actually revoke
security_group.revoke_ingress(IpPermissions=permissions)
found_sec_group = client.describe_security_groups(GroupNames=[sg_name])[
"SecurityGroups"
][0]
assert len(found_sec_group["IpPermissions"]) == 0
@mock_ec2
def test_authorize_other_group_egress_and_revoke():
ec2 = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg01 = ec2.create_security_group(
GroupName=str(uuid4()), Description="Test security group sg01", VpcId=vpc.id
)
sg02 = ec2.create_security_group(
GroupName=str(uuid4()), Description="Test security group sg02", VpcId=vpc.id
)
ip_permission = {
"IpProtocol": "tcp",
"FromPort": 27017,
"ToPort": 27017,
"UserIdGroupPairs": [
{"GroupId": sg02.id, "GroupName": "sg02", "UserId": sg02.owner_id}
],
"IpRanges": [],
"Ipv6Ranges": [],
"PrefixListIds": [],
}
org_ip_permission = ip_permission.copy()
ip_permission["UserIdGroupPairs"][0].pop("GroupName")
sg01.authorize_egress(IpPermissions=[org_ip_permission])
assert len(sg01.ip_permissions_egress) == 2
assert ip_permission in sg01.ip_permissions_egress
sg01.revoke_egress(IpPermissions=[org_ip_permission])
assert len(sg01.ip_permissions_egress) == 1
@mock_ec2
def test_authorize_group_in_vpc():
ec2 = boto3.resource("ec2", "ap-south-1")
client = boto3.client("ec2", region_name="ap-south-1")
vpc_id = "vpc-12345"
# create 2 groups in a vpc
sec_name1 = str(uuid4())
sec_name2 = str(uuid4())
security_group = ec2.create_security_group(
GroupName=sec_name1, Description="test desc 1", VpcId=vpc_id
)
other_security_group = ec2.create_security_group(
GroupName=sec_name2, Description="test desc 2", VpcId=vpc_id
)
permissions = [
{
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 2222,
"UserIdGroupPairs": [
{
"GroupId": other_security_group.id,
"GroupName": other_security_group.group_name,
"UserId": other_security_group.owner_id,
}
],
}
]
security_group.authorize_ingress(IpPermissions=permissions)
# Check that the rule is accurate
found_sec_group = client.describe_security_groups(GroupNames=[sec_name1])[
"SecurityGroups"
][0]
assert found_sec_group["IpPermissions"][0]["ToPort"] == 2222
assert (
found_sec_group["IpPermissions"][0]["UserIdGroupPairs"][0]["GroupId"]
== other_security_group.id
)
# Now remove the rule
security_group.revoke_ingress(IpPermissions=permissions)
# And check that it gets revoked
found_sec_group = client.describe_security_groups(GroupNames=[sec_name1])[
"SecurityGroups"
][0]
assert len(found_sec_group["IpPermissions"]) == 0
@mock_ec2
def test_describe_security_groups():
ec2 = boto3.resource("ec2", "us-west-1")
client = boto3.client("ec2", "us-west-1")
vpc_id = f"vpc-{str(uuid4())[0:6]}"
name_1 = str(uuid4())
desc_1 = str(uuid4())
sg1 = ec2.create_security_group(GroupName=name_1, Description=desc_1, VpcId=vpc_id)
sg2 = ec2.create_security_group(GroupName=str(uuid4()), Description="test desc 2")
resp = client.describe_security_groups(GroupNames=[name_1])["SecurityGroups"]
assert len(resp) == 1
assert resp[0]["GroupId"] == sg1.id
with pytest.raises(ClientError) as ex:
client.describe_security_groups(GroupNames=["does_not_exist"])
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert "RequestId" in ex.value.response["ResponseMetadata"]
assert ex.value.response["Error"]["Code"] == "InvalidGroup.NotFound"
resp = client.describe_security_groups(
Filters=[{"Name": "vpc-id", "Values": [vpc_id]}]
)["SecurityGroups"]
assert len(resp) == 1
assert resp[0]["GroupId"] == sg1.id
resp = client.describe_security_groups(
Filters=[{"Name": "description", "Values": [desc_1]}]
)["SecurityGroups"]
assert len(resp) == 1
assert resp[0]["GroupId"] == sg1.id
all_sgs = retrieve_all_sgs(client)
sg_ids = [sg["GroupId"] for sg in all_sgs]
assert sg1.id in sg_ids
assert sg2.id in sg_ids
@mock_ec2
def test_authorize_bad_cidr_throws_invalid_parameter_value():
ec2 = boto3.resource("ec2", "us-west-1")
sec_group = ec2.create_security_group(GroupName=str(uuid4()), Description="test")
with pytest.raises(ClientError) as ex:
permissions = [
{
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 2222,
"IpRanges": [{"CidrIp": "123.123.123.123"}],
}
]
sec_group.authorize_ingress(IpPermissions=permissions)
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert "RequestId" in ex.value.response["ResponseMetadata"]
assert ex.value.response["Error"]["Code"] == "InvalidParameterValue"
@mock_ec2
def test_security_group_tag_filtering():
ec2 = boto3.resource("ec2", region_name="us-east-1")
client = boto3.client("ec2", region_name="us-east-1")
sg = ec2.create_security_group(GroupName=str(uuid4()), Description="Test SG")
tag_name = str(uuid4())[0:6]
tag_val = str(uuid4())
sg.create_tags(Tags=[{"Key": tag_name, "Value": tag_val}])
groups = client.describe_security_groups(
Filters=[{"Name": f"tag:{tag_name}", "Values": [tag_val]}]
)["SecurityGroups"]
assert len(groups) == 1
groups = client.describe_security_groups(
Filters=[{"Name": f"tag:{tag_name}", "Values": ["unknown"]}]
)["SecurityGroups"]
assert len(groups) == 0
@mock_ec2
def test_authorize_all_protocols_with_no_port_specification():
ec2 = boto3.resource("ec2", region_name="us-east-1")
client = boto3.client("ec2", region_name="us-east-1")
sg_name = str(uuid4())
sg = ec2.create_security_group(GroupName=sg_name, Description="test desc")
permissions = [{"IpProtocol": "-1", "IpRanges": [{"CidrIp": "0.0.0.0/0"}]}]
sg.authorize_ingress(IpPermissions=permissions)
sg = client.describe_security_groups(GroupNames=[sg_name])["SecurityGroups"][0]
permission = sg["IpPermissions"][0]
assert permission["IpProtocol"] == "-1"
assert permission["IpRanges"] == [{"CidrIp": "0.0.0.0/0"}]
assert "FromPort" not in permission
assert "ToPort" not in permission
@mock_ec2
def test_security_group_rule_tagging():
ec2 = boto3.resource("ec2", "us-east-1")
client = boto3.client("ec2", "us-east-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg_name = str(uuid4())
sg = client.create_security_group(
Description="Test SG", GroupName=sg_name, VpcId=vpc.id
)
response = client.describe_security_group_rules(
Filters=[{"Name": "group-id", "Values": [sg["GroupId"]]}]
)
rule_id = response["SecurityGroupRules"][0]["SecurityGroupRuleId"]
tag_name = str(uuid4())[0:6]
tag_val = str(uuid4())
client.create_tags(Resources=[rule_id], Tags=[{"Key": tag_name, "Value": tag_val}])
response = client.describe_security_group_rules(
Filters=[{"Name": "group-id", "Values": [sg["GroupId"]]}]
)
assert "Tags" in response["SecurityGroupRules"][0]
assert response["SecurityGroupRules"][0]["Tags"][0]["Key"] == tag_name
assert response["SecurityGroupRules"][0]["Tags"][0]["Value"] == tag_val
@mock_ec2
def test_create_and_describe_security_grp_rule():
ec2 = boto3.resource("ec2", "us-east-1")
client = boto3.client("ec2", "us-east-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg_name = str(uuid4())
sg = client.create_security_group(
Description="Test SG", GroupName=sg_name, VpcId=vpc.id
)
response = client.describe_security_group_rules(
Filters=[{"Name": "group-id", "Values": [sg["GroupId"]]}]
)
rules = response["SecurityGroupRules"]
# Only the default rule is present
assert len(rules) == 1
# Test default egress rule content
assert rules[0]["IsEgress"] is True
assert rules[0]["IpProtocol"] == "-1"
assert rules[0]["CidrIpv4"] == "0.0.0.0/0"
assert "GroupId" in rules[0]
@mock_ec2
@pytest.mark.parametrize("use_vpc", [True, False], ids=["Use VPC", "Without VPC"])
def test_sec_group_rule_limit(use_vpc):
ec2 = boto3.resource("ec2", region_name="us-west-1")
client = boto3.client("ec2", region_name="us-west-1")
limit = 60
if use_vpc:
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg = ec2.create_security_group(
GroupName=str(uuid4()), Description="test", VpcId=vpc.id
)
other_sg = ec2.create_security_group(
GroupName=str(uuid4()), Description="test_other", VpcId=vpc.id
)
else:
sg = ec2.create_security_group(GroupName=str(uuid4()), Description="test")
other_sg = ec2.create_security_group(
GroupName=str(uuid4()), Description="test_other"
)
# INGRESS
with pytest.raises(ClientError) as ex:
ip_permissions = [
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": f"{i}.0.0.0/0"} for i in range(110)],
}
]
client.authorize_security_group_ingress(
GroupId=sg.id, IpPermissions=ip_permissions
)
assert ex.value.response["Error"]["Code"] == "RulesPerSecurityGroupLimitExceeded"
sg.reload()
assert sg.ip_permissions == []
# authorize a rule targeting a different sec group (because this count too)
other_permissions = [
{
"IpProtocol": "-1",
"UserIdGroupPairs": [
{
"GroupId": other_sg.id,
"GroupName": other_sg.group_name,
"UserId": other_sg.owner_id,
}
],
}
]
client.authorize_security_group_ingress(
GroupId=sg.id, IpPermissions=other_permissions
)
# fill the rules up the limit
permissions = [
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": f"{i}.0.0.0/0"} for i in range(limit - 1)],
}
]
client.authorize_security_group_ingress(GroupId=sg.id, IpPermissions=permissions)
# verify that we cannot authorize past the limit for a CIDR IP
with pytest.raises(ClientError) as ex:
permissions = [{"IpProtocol": "-1", "IpRanges": [{"CidrIp": "100.0.0.0/0"}]}]
client.authorize_security_group_ingress(
GroupId=sg.id, IpPermissions=permissions
)
assert ex.value.response["Error"]["Code"] == "RulesPerSecurityGroupLimitExceeded"
# verify that we cannot authorize past the limit for a different sec group
with pytest.raises(ClientError) as ex:
client.authorize_security_group_ingress(
GroupId=sg.id, IpPermissions=other_permissions
)
assert ex.value.response["Error"]["Code"] == "RulesPerSecurityGroupLimitExceeded"
# EGRESS
# authorize a rule targeting a different sec group (because this count too)
client.authorize_security_group_egress(
GroupId=sg.id, IpPermissions=other_permissions
)
# fill the rules up the limit
# remember that by default, when created a sec group contains 1 egress rule
# so our other_sg rule + 98 CIDR IP rules + 1 by default == 100 the limit
permissions = [
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": f"{i}.0.0.0/0"} for i in range(1, limit - 1)],
}
]
client.authorize_security_group_egress(GroupId=sg.id, IpPermissions=permissions)
# verify that we cannot authorize past the limit for a CIDR IP
with pytest.raises(ClientError) as ex:
permissions = [{"IpProtocol": "-1", "IpRanges": [{"CidrIp": "101.0.0.0/0"}]}]
client.authorize_security_group_egress(GroupId=sg.id, IpPermissions=permissions)
assert ex.value.response["Error"]["Code"] == "RulesPerSecurityGroupLimitExceeded"
# verify that we cannot authorize past the limit for a different sec group
with pytest.raises(ClientError) as ex:
client.authorize_security_group_egress(
GroupId=sg.id, IpPermissions=other_permissions
)
assert ex.value.response["Error"]["Code"] == "RulesPerSecurityGroupLimitExceeded"
@mock_ec2
def test_add_same_rule_twice_throws_error():
ec2 = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg = ec2.create_security_group(
GroupName="sg1", Description="Test security group sg1", VpcId=vpc.id
)
# Ingress
ip_permissions = [
{
"IpProtocol": "tcp",
"FromPort": 27017,
"ToPort": 27017,
"IpRanges": [{"CidrIp": "1.2.3.4/32"}],
}
]
sg.authorize_ingress(IpPermissions=ip_permissions)
with pytest.raises(ClientError) as ex:
sg.authorize_ingress(IpPermissions=ip_permissions)
assert ex.value.response["Error"]["Code"] == "InvalidPermission.Duplicate"
# Egress
ip_permissions = [
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
"Ipv6Ranges": [],
"PrefixListIds": [],
"UserIdGroupPairs": [],
}
]
with pytest.raises(ClientError) as ex:
sg.authorize_egress(IpPermissions=ip_permissions)
assert ex.value.response["Error"]["Code"] == "InvalidPermission.Duplicate"
@mock_ec2
def test_description_in_ip_permissions():
ec2 = boto3.resource("ec2", region_name="us-west-1")
conn = boto3.client("ec2", region_name="us-east-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg = conn.create_security_group(
GroupName="sg1", Description="Test security group sg1", VpcId=vpc.id
)
ip_permissions = [
{
"IpProtocol": "tcp",
"FromPort": 27017,
"ToPort": 27017,
"IpRanges": [{"CidrIp": "1.2.3.4/32", "Description": "testDescription"}],
}
]
conn.authorize_security_group_ingress(
GroupId=sg["GroupId"], IpPermissions=ip_permissions
)
result = conn.describe_security_groups(GroupIds=[sg["GroupId"]])
group = result["SecurityGroups"][0]
assert group["IpPermissions"][0]["IpRanges"][0]["Description"] == "testDescription"
assert group["IpPermissions"][0]["IpRanges"][0]["CidrIp"] == "1.2.3.4/32"
sg = conn.create_security_group(
GroupName="sg2", Description="Test security group sg1", VpcId=vpc.id
)
ip_permissions = [
{
"IpProtocol": "tcp",
"FromPort": 27017,
"ToPort": 27017,
"IpRanges": [{"CidrIp": "1.2.3.4/32"}],
}
]
conn.authorize_security_group_ingress(
GroupId=sg["GroupId"], IpPermissions=ip_permissions
)
result = conn.describe_security_groups(GroupIds=[sg["GroupId"]])
group = result["SecurityGroups"][0]
assert group["IpPermissions"][0]["IpRanges"][0].get("Description") is None
assert group["IpPermissions"][0]["IpRanges"][0]["CidrIp"] == "1.2.3.4/32"
@mock_ec2
def test_security_group_tagging():
conn = boto3.client("ec2", region_name="us-east-1")
sg = conn.create_security_group(GroupName=str(uuid4()), Description="Test SG")
with pytest.raises(ClientError) as ex:
conn.create_tags(
Resources=[sg["GroupId"]],
Tags=[{"Key": "Test", "Value": "Tag"}],
DryRun=True,
)
assert ex.value.response["Error"]["Code"] == "DryRunOperation"
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 412
assert (
ex.value.response["Error"]["Message"]
== "An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set"
)
tag_val = str(uuid4())
conn.create_tags(
Resources=[sg["GroupId"]], Tags=[{"Key": "Test", "Value": tag_val}]
)
describe = conn.describe_security_groups(
Filters=[{"Name": "tag-value", "Values": [tag_val]}]
)
tag = describe["SecurityGroups"][0]["Tags"][0]
assert tag["Value"] == tag_val
assert tag["Key"] == "Test"
@mock_ec2
def test_security_group_wildcard_tag_filter():
conn = boto3.client("ec2", region_name="us-east-1")
sg = conn.create_security_group(GroupName=str(uuid4()), Description="Test SG")
rand_name = str(uuid4())[0:6]
tag_val = f"random {rand_name} things"
conn.create_tags(
Resources=[sg["GroupId"]], Tags=[{"Key": "Test", "Value": tag_val}]
)
describe = conn.describe_security_groups(
Filters=[{"Name": "tag-value", "Values": [f"*{rand_name}*"]}]
)
tag = describe["SecurityGroups"][0]["Tags"][0]
assert tag["Value"] == tag_val
assert tag["Key"] == "Test"
@mock_ec2
def test_security_group_filter_ip_permission():
ec2 = boto3.resource("ec2", region_name="us-east-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
conn = boto3.client("ec2", region_name="us-east-1")
sg_name = str(uuid4())[0:6]
sg = ec2.create_security_group(
GroupName=sg_name, Description="Test SG", VpcId=vpc.id
)
from_port = randint(0, 65535)
to_port = randint(0, 65535)
ip_permissions = [
{
"IpProtocol": "tcp",
"FromPort": from_port,
"ToPort": to_port,
"IpRanges": [],
},
]
sg.authorize_ingress(IpPermissions=ip_permissions)
filters = [{"Name": "ip-permission.from-port", "Values": [f"{from_port}"]}]
describe = retrieve_all_sgs(conn, filters)
assert len(describe) == 1
assert describe[0]["GroupName"] == sg_name
def retrieve_all_sgs(conn, filters=[]): # pylint: disable=W0102
res = conn.describe_security_groups(Filters=filters)
all_groups = res["SecurityGroups"]
next_token = res.get("NextToken")
while next_token:
res = conn.describe_security_groups(Filters=filters)
all_groups.extend(res["SecurityGroups"])
next_token = res.get("NextToken")
return all_groups
@mock_ec2
def test_authorize_and_revoke_in_bulk():
ec2 = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg01 = ec2.create_security_group(
GroupName=str(uuid4()), Description="Test sg01", VpcId=vpc.id
)
sg02 = ec2.create_security_group(
GroupName=str(uuid4()), Description="Test sg02", VpcId=vpc.id
)
sg03 = ec2.create_security_group(GroupName=str(uuid4()), Description="Test sg03")
sg04 = ec2.create_security_group(GroupName=str(uuid4()), Description="Test sg04")
ip_permissions = [
{
"IpProtocol": "tcp",
"FromPort": 27017,
"ToPort": 27017,
"UserIdGroupPairs": [{"GroupId": sg02.id, "UserId": sg02.owner_id}],
"IpRanges": [],
"Ipv6Ranges": [],
"PrefixListIds": [],
},
{
"IpProtocol": "tcp",
"FromPort": 27018,
"ToPort": 27018,
"UserIdGroupPairs": [{"GroupId": sg02.id, "UserId": sg02.owner_id}],
"IpRanges": [],
"Ipv6Ranges": [],
"PrefixListIds": [],
},
{
"IpProtocol": "tcp",
"FromPort": 27017,
"ToPort": 27017,
"UserIdGroupPairs": [{"GroupId": sg03.id, "UserId": sg03.owner_id}],
"IpRanges": [],
"Ipv6Ranges": [],
"PrefixListIds": [],
},
{
"IpProtocol": "tcp",
"FromPort": 27015,
"ToPort": 27015,
"UserIdGroupPairs": [{"GroupId": sg04.id, "UserId": sg04.owner_id}],
"IpRanges": [
{"CidrIp": "10.10.10.0/24", "Description": "Some Description"}
],
"Ipv6Ranges": [],
"PrefixListIds": [],
},
{
"IpProtocol": "tcp",
"FromPort": 27016,
"ToPort": 27016,
"UserIdGroupPairs": [{"GroupId": sg04.id, "UserId": sg04.owner_id}],
"IpRanges": [{"CidrIp": "10.10.10.0/24"}],
"Ipv6Ranges": [],
"PrefixListIds": [],
},
]
org_ip_permissions = copy.deepcopy(ip_permissions)
for rule in ip_permissions.copy():
for other_rule in ip_permissions.copy():
if (
rule is not other_rule
and rule.get("IpProtocol") == other_rule.get("IpProtocol")
and rule.get("FromPort") == other_rule.get("FromPort")
and rule.get("ToPort") == other_rule.get("ToPort")
):
if rule in ip_permissions:
ip_permissions.remove(rule)
if other_rule in ip_permissions:
ip_permissions.remove(other_rule)
rule["UserIdGroupPairs"].extend(
[
item
for item in other_rule["UserIdGroupPairs"]
if item not in rule["UserIdGroupPairs"]
]
)
rule["IpRanges"].extend(
[
item
for item in other_rule["IpRanges"]
if item not in rule["IpRanges"]
]
)
rule["Ipv6Ranges"].extend(
[
item
for item in other_rule["Ipv6Ranges"]
if item not in rule["Ipv6Ranges"]
]
)
rule["PrefixListIds"].extend(
[
item
for item in other_rule["PrefixListIds"]
if item not in rule["PrefixListIds"]
]
)
if rule not in ip_permissions:
ip_permissions.append(json.loads(json.dumps(rule, sort_keys=True)))
expected_ip_permissions = copy.deepcopy(ip_permissions)
expected_ip_permissions[1]["UserIdGroupPairs"][0]["GroupId"] = sg04.id
expected_ip_permissions[3]["UserIdGroupPairs"][0]["GroupId"] = sg03.id
expected_ip_permissions = json.dumps(expected_ip_permissions, sort_keys=True)
sg01.authorize_ingress(IpPermissions=org_ip_permissions)
# Due to drift property of the Security Group,
# rules with same Ip protocol, FromPort and ToPort will be merged together
assert len(sg01.ip_permissions) == 4
sorted_sg01_ip_permissions = json.dumps(sg01.ip_permissions, sort_keys=True)
for ip_permission in expected_ip_permissions:
assert ip_permission in sorted_sg01_ip_permissions
sg01.revoke_ingress(IpPermissions=ip_permissions)
assert sg01.ip_permissions == []
for ip_permission in expected_ip_permissions:
assert ip_permission not in sg01.ip_permissions
sg01.authorize_egress(IpPermissions=org_ip_permissions)
# Due to drift property of the Security Group,
# rules with same Ip protocol, FromPort and ToPort will be merged together
assert len(sg01.ip_permissions_egress) == 5
sorted_sg01_ip_permissions_egress = json.dumps(
sg01.ip_permissions_egress, sort_keys=True
)
for ip_permission in expected_ip_permissions:
assert ip_permission in sorted_sg01_ip_permissions_egress
sg01.revoke_egress(IpPermissions=ip_permissions)
assert len(sg01.ip_permissions_egress) == 1
for ip_permission in expected_ip_permissions:
assert ip_permission not in sg01.ip_permissions_egress
@mock_ec2
def test_security_group_ingress_without_multirule():
ec2 = boto3.resource("ec2", "ca-central-1")
client = boto3.client("ec2", "ca-central-1")
sg = ec2.create_security_group(Description="Test SG", GroupName=str(uuid4()))
assert len(sg.ip_permissions) == 0
rule_id = sg.authorize_ingress(
CidrIp="192.168.0.1/32", FromPort=22, ToPort=22, IpProtocol="tcp"
)["SecurityGroupRules"][0]["SecurityGroupRuleId"]
assert len(sg.ip_permissions) == 1
rules = client.describe_security_group_rules(SecurityGroupRuleIds=[rule_id])[
"SecurityGroupRules"
]
ingress = [rule for rule in rules if rule["SecurityGroupRuleId"] == rule_id]
assert len(ingress) == 1
assert ingress[0]["IsEgress"] is False
@mock_ec2
def test_security_group_ingress_without_multirule_after_reload():
ec2 = boto3.resource("ec2", "ca-central-1")
sg = ec2.create_security_group(Description="Test SG", GroupName=str(uuid4()))
assert len(sg.ip_permissions) == 0
sg.authorize_ingress(
CidrIp="192.168.0.1/32", FromPort=22, ToPort=22, IpProtocol="tcp"
)
# Also Fails
sg_after = ec2.SecurityGroup(sg.id)
assert len(sg_after.ip_permissions) == 1
@mock_ec2
def test_get_all_security_groups_filter_with_same_vpc_id():
ec2 = boto3.resource("ec2", region_name="us-east-1")
client = boto3.client("ec2", region_name="us-east-1")
vpc_id = "vpc-5300000c"
security_group = ec2.create_security_group(
GroupName=str(uuid4()), Description="test1", VpcId=vpc_id
)
security_group2 = ec2.create_security_group(
GroupName=str(uuid4()), Description="test2", VpcId=vpc_id
)
assert security_group.vpc_id == vpc_id
assert security_group2.vpc_id == vpc_id
security_groups = client.describe_security_groups(
GroupIds=[security_group.id], Filters=[{"Name": "vpc-id", "Values": [vpc_id]}]
)["SecurityGroups"]
assert len(security_groups) == 1
with pytest.raises(ClientError) as ex:
client.describe_security_groups(GroupIds=["does_not_exist"])
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert "RequestId" in ex.value.response["ResponseMetadata"]
assert ex.value.response["Error"]["Code"] == "InvalidGroup.NotFound"
@mock_ec2
def test_revoke_security_group_egress():
ec2 = boto3.resource("ec2", "us-east-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg = ec2.create_security_group(
Description="Test SG", GroupName=str(uuid4()), VpcId=vpc.id
)
assert sg.ip_permissions_egress == [
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
"UserIdGroupPairs": [],
"Ipv6Ranges": [],
"PrefixListIds": [],
}
]
sg.revoke_egress(
IpPermissions=[
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
"UserIdGroupPairs": [],
"Ipv6Ranges": [],
"PrefixListIds": [],
}
]
)
sg.reload()
assert len(sg.ip_permissions_egress) == 0
@mock_ec2
def test_revoke_security_group_egress__without_ipprotocol():
ec2 = boto3.resource("ec2", "eu-west-2")
client = boto3.client("ec2", region_name="eu-west-2")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sec_group = client.describe_security_groups(
Filters=[
{"Name": "group-name", "Values": ["default"]},
{"Name": "vpc-id", "Values": [vpc.id]},
]
)["SecurityGroups"][0]["GroupId"]
rule_id = client.describe_security_group_rules(
Filters=[{"Name": "group-id", "Values": [sec_group]}]
)["SecurityGroupRules"][0]["SecurityGroupRuleId"]
client.revoke_security_group_egress(
GroupId=sec_group, SecurityGroupRuleIds=[rule_id]
)
remaining_rules = client.describe_security_group_rules(
Filters=[{"Name": "group-id", "Values": [sec_group]}]
)["SecurityGroupRules"]
assert len(remaining_rules) == 0
@mock_ec2
def test_update_security_group_rule_descriptions_egress():
ec2 = boto3.resource("ec2", "us-east-1")
client = boto3.client("ec2", "us-east-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg_name = str(uuid4())
sg = ec2.create_security_group(
Description="Test SG", GroupName=sg_name, VpcId=vpc.id
)
sg_id = sg.id
ip_ranges = client.describe_security_groups(GroupIds=[sg_id])["SecurityGroups"][0][
"IpPermissionsEgress"
][0]["IpRanges"]
assert len(ip_ranges) == 1
assert ip_ranges[0] == {"CidrIp": "0.0.0.0/0"}
client.update_security_group_rule_descriptions_egress(
GroupName=sg_name,
IpPermissions=[
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "0.0.0.0/0", "Description": "my d3scription"}],
"UserIdGroupPairs": [],
"Ipv6Ranges": [],
"PrefixListIds": [],
}
],
)
ip_ranges = client.describe_security_groups(GroupIds=[sg_id])["SecurityGroups"][0][
"IpPermissionsEgress"
][0]["IpRanges"]
assert len(ip_ranges) == 1
assert ip_ranges[0] == {"CidrIp": "0.0.0.0/0", "Description": "my d3scription"}
@mock_ec2
def test_update_security_group_rule_descriptions_ingress():
ec2 = boto3.resource("ec2", "us-east-1")
client = boto3.client("ec2", "us-east-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg_name = str(uuid4())
sg = ec2.create_security_group(
Description="Test SG", GroupName=sg_name, VpcId=vpc.id
)
sg_id = sg.id
ip_permissions = [
{
"IpProtocol": "tcp",
"FromPort": 27017,
"ToPort": 27017,
"IpRanges": [{"CidrIp": "1.2.3.4/32", "Description": "first desc"}],
}
]
client.authorize_security_group_ingress(GroupId=sg_id, IpPermissions=ip_permissions)
ip_ranges = client.describe_security_groups(GroupIds=[sg_id])["SecurityGroups"][0][
"IpPermissions"
][0]["IpRanges"]
assert len(ip_ranges) == 1
assert ip_ranges[0] == {"CidrIp": "1.2.3.4/32", "Description": "first desc"}
client.update_security_group_rule_descriptions_ingress(
GroupName=sg_name,
IpPermissions=[
{
"IpProtocol": "tcp",
"FromPort": 27017,
"ToPort": 27017,
"IpRanges": [{"CidrIp": "1.2.3.4/32", "Description": "second desc"}],
}
],
)
ip_ranges = client.describe_security_groups(GroupIds=[sg_id])["SecurityGroups"][0][
"IpPermissions"
][0]["IpRanges"]
assert len(ip_ranges) == 1
assert ip_ranges[0] == {"CidrIp": "1.2.3.4/32", "Description": "second desc"}
@mock_ec2
def test_non_existent_security_group_raises_error_on_authorize():
client = boto3.client("ec2", "us-east-1")
non_existent_sg = "sg-123abc"
expected_error = f"The security group '{non_existent_sg}' does not exist"
authorize_funcs = [
client.authorize_security_group_egress,
client.authorize_security_group_ingress,
]
for authorize_func in authorize_funcs:
with pytest.raises(ClientError) as ex:
authorize_func(GroupId=non_existent_sg, IpPermissions=[{}])
assert ex.value.response["Error"]["Code"] == "InvalidGroup.NotFound"
assert ex.value.response["Error"]["Message"] == expected_error
@mock_ec2
def test_security_group_rules_added_via_the_backend_can_be_revoked_via_the_api():
if settings.TEST_SERVER_MODE:
raise unittest.SkipTest("Can't test backend directly in server mode.")
ec2_backend = ec2_backends[DEFAULT_ACCOUNT_ID]["us-east-1"]
ec2_resource = boto3.resource("ec2", region_name="us-east-1")
ec2_client = boto3.client("ec2", region_name="us-east-1")
vpc = ec2_resource.create_vpc(CidrBlock="10.0.0.0/16")
group_name = str(uuid4())
sg = ec2_resource.create_security_group(
GroupName=group_name, Description="test", VpcId=vpc.id
)
# Add an ingress/egress rule using the EC2 backend directly.
rule_ingress = {
"group_name_or_id": sg.id,
"security_rule_ids": None,
"from_port": 0,
"ip_protocol": "udp",
"ip_ranges": [],
"to_port": 65535,
"source_groups": [{"GroupId": sg.id}],
}
ec2_backend.authorize_security_group_ingress(**rule_ingress)
rule_egress = {
"group_name_or_id": sg.id,
"security_rule_ids": None,
"from_port": 8443,
"ip_protocol": "tcp",
"ip_ranges": [],
"to_port": 8443,
"source_groups": [{"GroupId": sg.id}],
}
ec2_backend.authorize_security_group_egress(**rule_egress)
# Both rules (plus the default egress) should now be present.
sg = ec2_client.describe_security_groups(
Filters=[{"Name": "group-name", "Values": [group_name]}]
).get("SecurityGroups")[0]
assert len(sg["IpPermissions"]) == 1
assert len(sg["IpPermissionsEgress"]) == 2
# Revoking via the API should work for all rules (even those we added directly).
ec2_client.revoke_security_group_egress(
GroupId=sg["GroupId"], IpPermissions=sg["IpPermissionsEgress"]
)
ec2_client.revoke_security_group_ingress(
GroupId=sg["GroupId"], IpPermissions=sg["IpPermissions"]
)
sg = ec2_client.describe_security_groups(
Filters=[{"Name": "group-name", "Values": [group_name]}]
).get("SecurityGroups")[0]
assert len(sg["IpPermissions"]) == 0
assert len(sg["IpPermissionsEgress"]) == 0
@mock_ec2
def test_filter_description():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.0.0/16")
unique = str(uuid4())
sg1 = vpc.create_security_group(
Description=(f"A {unique} Description"), GroupName="test-1"
)
vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
filter_to_match_group_1_description = {
"Name": "description",
"Values": [f"*{unique}*"],
}
security_groups = ec2r.security_groups.filter(
Filters=[filter_to_match_group_1_description]
)
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_filter_ip_permission__cidr():
if settings.TEST_SERVER_MODE:
raise SkipTest(
"CIDR's might already exist due to other tests creating IP ranges"
)
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
sg1.authorize_ingress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7357,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "10.250.0.0/16"}, {"CidrIp": "10.251.0.0/16"}],
}
]
)
sg2.authorize_ingress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7357,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "172.16.0.0/16"}, {"CidrIp": "172.17.0.0/16"}],
}
]
)
filter_to_match_group_1 = {
"Name": "ip-permission.cidr",
"Values": ["10.250.0.0/16"],
}
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_filter_egress__ip_permission__cidr():
if settings.TEST_SERVER_MODE:
raise SkipTest(
"CIDR's might already exist due to other tests creating IP ranges"
)
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
sg1.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7357,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "10.250.0.0/16"}, {"CidrIp": "10.251.0.0/16"}],
}
]
)
sg2.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7357,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "172.16.0.0/16"}, {"CidrIp": "172.17.0.0/16"}],
}
]
)
filter_to_match_group_1 = {
"Name": "egress.ip-permission.cidr",
"Values": ["10.250.0.0/16"],
}
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_filter_egress__ip_permission__from_port():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
from_port = randint(9999, 59999)
sg1.authorize_egress(
IpPermissions=[
{
"FromPort": from_port,
"ToPort": 7359,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "10.250.0.0/16"}, {"CidrIp": "10.251.0.0/16"}],
}
]
)
sg2.authorize_egress(
IpPermissions=[
{
"FromPort": 8000,
"ToPort": 8020,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "172.16.0.0/16"}, {"CidrIp": "172.17.0.0/16"}],
}
]
)
filter_to_match_group_1 = {
"Name": "egress.ip-permission.from-port",
"Values": [f"{from_port}"],
}
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert sg1.group_id in [s.group_id for s in security_groups]
assert sg2.group_id not in [s.group_id for s in security_groups]
@mock_ec2
def test_filter_egress__ip_permission__group_id():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
sg3 = vpc.create_security_group(
Description="Yet Another Descriptive Description", GroupName="test-3"
)
sg4 = vpc.create_security_group(
Description="Such Description Much Described", GroupName="test-4"
)
sg1.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7359,
"IpProtocol": "tcp",
"UserIdGroupPairs": [{"GroupId": sg3.group_id}],
}
]
)
sg2.authorize_egress(
IpPermissions=[
{
"FromPort": 8000,
"ToPort": 8020,
"IpProtocol": "tcp",
"UserIdGroupPairs": [{"GroupId": sg4.group_id}],
}
]
)
filter_to_match_group_1 = {
"Name": "egress.ip-permission.group-id",
"Values": [sg3.group_id],
}
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_name():
"""
this fails to find the group in the AWS API, so we should also fail to find it
"""
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
sg3 = vpc.create_security_group(
Description="Yet Another Descriptive Description", GroupName="test-3"
)
sg4 = vpc.create_security_group(
Description="Such Description Much Described", GroupName="test-4"
)
sg1.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7359,
"IpProtocol": "tcp",
"UserIdGroupPairs": [{"GroupId": sg3.group_id}],
}
]
)
sg2.authorize_egress(
IpPermissions=[
{
"FromPort": 8000,
"ToPort": 8020,
"IpProtocol": "tcp",
"UserIdGroupPairs": [{"GroupId": sg4.group_id}],
}
]
)
filter_to_match_group_1 = {
"Name": "egress.ip-permission.group-name",
"Values": [sg3.group_name],
}
sg1.load()
sg2.load()
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert len(security_groups) == 0
@mock_ec2
def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_id():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
sg3 = vpc.create_security_group(
Description="Yet Another Descriptive Description", GroupName="test-3"
)
sg4 = vpc.create_security_group(
Description="Such Description Much Described", GroupName="test-4"
)
sg1.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7359,
"IpProtocol": "tcp",
"UserIdGroupPairs": [{"GroupId": sg3.group_id}],
}
]
)
sg2.authorize_egress(
IpPermissions=[
{
"FromPort": 8000,
"ToPort": 8020,
"IpProtocol": "tcp",
"UserIdGroupPairs": [{"GroupId": sg4.group_id}],
}
]
)
filter_to_match_group_1 = {
"Name": "egress.ip-permission.group-id",
"Values": [sg3.group_id],
}
sg1.load()
sg2.load()
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_filter_egress__ip_permission__protocol():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
sg1.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7359,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "10.250.0.0/16"}, {"CidrIp": "10.251.0.0/16"}],
}
]
)
sg2.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7359,
"IpProtocol": "udp",
"IpRanges": [{"CidrIp": "10.250.0.0/16"}, {"CidrIp": "10.251.0.0/16"}],
}
]
)
filter_to_match_group_1 = {
"Name": "egress.ip-permission.protocol",
"Values": ["tcp"],
}
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
group_ids = [sg.group_id for sg in security_groups]
assert sg1.group_id in group_ids
assert sg2.group_id not in group_ids
@mock_ec2
def test_filter_egress__ip_permission__to_port():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor", GroupName="test-1"
)
sg2 = vpc.create_security_group(
Description="Another Description That Awes The Human Mind", GroupName="test-2"
)
to_port = randint(9999, 59999)
sg1.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": to_port,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "10.250.0.0/16"}, {"CidrIp": "10.251.0.0/16"}],
}
]
)
sg2.authorize_egress(
IpPermissions=[
{
"FromPort": 7357,
"ToPort": 7360,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "172.16.0.0/16"}, {"CidrIp": "172.17.0.0/16"}],
}
]
)
filter_to_match_group_1 = {
"Name": "egress.ip-permission.to-port",
"Values": [f"{to_port}"],
}
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert sg1.group_id in [s.group_id for s in security_groups]
assert sg2.group_id not in [s.group_id for s in security_groups]
@mock_ec2
def test_get_groups_by_ippermissions_group_id_filter():
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.0.0/16")
sg1 = vpc.create_security_group(Description="test", GroupName="test-1")
sg2 = vpc.create_security_group(Description="test", GroupName="test-2")
sg1_allows_sg2_ingress_rule = {
"IpProtocol": "tcp",
"FromPort": 31337,
"ToPort": 31337,
"UserIdGroupPairs": [{"GroupId": sg2.group_id, "VpcId": sg2.vpc_id}],
}
sg1.authorize_ingress(IpPermissions=[sg1_allows_sg2_ingress_rule])
# we should be able to describe security groups and filter for all the ones that contain
# a reference to another group ID
match_only_groups_whose_ingress_rules_refer_to_group_2 = {
"Name": "ip-permission.group-id",
"Values": [sg2.group_id],
}
security_groups = ec2r.security_groups.filter(
Filters=[match_only_groups_whose_ingress_rules_refer_to_group_2]
)
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_get_groups_by_ippermissions_group_id_filter_across_vpcs():
# setup 2 VPCs, each with a single Security Group
# where one security group authorizes the other sg (in another vpc) via GroupId
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc1 = ec2r.create_vpc(CidrBlock="10.250.0.0/16")
vpc2 = ec2r.create_vpc(CidrBlock="10.251.0.0/16")
sg1 = vpc1.create_security_group(Description="test", GroupName="test-1")
sg2 = vpc2.create_security_group(Description="test", GroupName="test-2")
sg1_allows_sg2_ingress_rule = {
"IpProtocol": "tcp",
"FromPort": 31337,
"ToPort": 31337,
"UserIdGroupPairs": [{"GroupId": sg2.group_id, "VpcId": sg2.vpc_id}],
}
sg1.authorize_ingress(IpPermissions=[sg1_allows_sg2_ingress_rule])
# we should be able to describe security groups and filter for all the ones that contain
# a reference to another group ID
match_only_groups_whose_ingress_rules_refer_to_group_2 = {
"Name": "ip-permission.group-id",
"Values": [sg2.group_id],
}
security_groups = ec2r.security_groups.filter(
Filters=[match_only_groups_whose_ingress_rules_refer_to_group_2]
)
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_id == sg1.group_id
@mock_ec2
def test_filter_group_name():
"""
this filter is an exact match, not a glob
"""
ec2r = boto3.resource("ec2", region_name="us-west-1")
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
uniq_sg_name_prefix = str(uuid4())[0:6]
sg1 = vpc.create_security_group(
Description="A Described Description Descriptor",
GroupName=f"{uniq_sg_name_prefix}-test-1",
)
vpc.create_security_group(
Description="Another Description That Awes The Human Mind",
GroupName=f"{uniq_sg_name_prefix}-test-12",
)
vpc.create_security_group(
Description="Yet Another Descriptive Description",
GroupName=f"{uniq_sg_name_prefix}-test-13",
)
vpc.create_security_group(
Description="Such Description Much Described",
GroupName=f"{uniq_sg_name_prefix}-test-14",
)
filter_to_match_group_1 = {
"Name": "group-name",
"Values": [sg1.group_name],
}
security_groups = ec2r.security_groups.filter(Filters=[filter_to_match_group_1])
security_groups = list(security_groups)
assert len(security_groups) == 1
assert security_groups[0].group_name == sg1.group_name
@mock_ec2
def test_revoke_security_group_ingress():
ec2 = boto3.client("ec2", region_name="us-east-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg = ec2.create_security_group(
Description="Test SG", GroupName=str(uuid4()), VpcId=vpc["Vpc"]["VpcId"]
)
sg_id = sg["GroupId"]
ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=[
{
"FromPort": 3000,
"ToPort": 3300,
"IpProtocol": "TCP",
"IpRanges": [{"CidrIp": "10.0.0.1/32"}],
},
{
"FromPort": 8080,
"ToPort": 8080,
"IpProtocol": "TCP",
"IpRanges": [{"CidrIp": "10.0.0.1/32"}],
},
],
)
response = ec2.describe_security_group_rules(
Filters=[{"Name": "group-id", "Values": [sg_id]}]
)
ingress_rules = [r for r in response["SecurityGroupRules"] if not r["IsEgress"]]
assert len(ingress_rules) == 2
# revoke 1 of the 2 ingress rules
ec2.revoke_security_group_ingress(
GroupId=sg_id, SecurityGroupRuleIds=[ingress_rules[0]["SecurityGroupRuleId"]]
)
response = ec2.describe_security_group_rules(
Filters=[{"Name": "group-id", "Values": [sg_id]}]
)
ingress_rules = [r for r in response["SecurityGroupRules"] if not r["IsEgress"]]
assert len(ingress_rules) == 1