EC2: SG rule tagging (#7424)

This commit is contained in:
rafcio19 2024-03-05 15:08:13 +01:00 committed by GitHub
parent 274fdae642
commit 8178cbde1a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 264 additions and 114 deletions

View File

@ -41,3 +41,10 @@ class TaggedEC2Resource(BaseModel):
return value
raise FilterNotImplementedError(filter_name, method_name)
def match_tags(self, filters: Dict[str, str]) -> bool:
for tag_name in filters.keys():
tag_value = self.get_filter_value(tag_name)
if tag_value == filters[tag_name][0]:
return True
return False

View File

@ -33,12 +33,15 @@ class SecurityRule(TaggedEC2Resource):
self,
ec2_backend: Any,
ip_protocol: str,
group_id: str,
from_port: Optional[str],
to_port: Optional[str],
ip_ranges: Optional[List[Any]],
source_groups: List[Dict[str, Any]],
prefix_list_ids: Optional[List[Dict[str, str]]] = None,
is_egress: bool = True,
tags: Dict[str, str] = {},
description: str = "",
):
self.ec2_backend = ec2_backend
self.id = random_security_group_rule_id()
@ -48,6 +51,8 @@ class SecurityRule(TaggedEC2Resource):
self.prefix_list_ids = prefix_list_ids or []
self.from_port = self.to_port = None
self.is_egress = is_egress
self.description = description
self.group_id = group_id
if self.ip_protocol and self.ip_protocol != "-1":
self.from_port = int(from_port) # type: ignore[arg-type]
@ -73,6 +78,7 @@ class SecurityRule(TaggedEC2Resource):
else None
)
self.ip_protocol = proto if proto else self.ip_protocol
self.add_tags(tags)
@property
def owner_id(self) -> str:
@ -156,6 +162,7 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
SecurityRule(
self.ec2_backend,
"-1",
self.id,
None,
None,
[{"CidrIp": "0.0.0.0/0"}],
@ -167,6 +174,7 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
SecurityRule(
self.ec2_backend,
"-1",
self.id,
None,
None,
[{"CidrIpv6": "::/0"}],
@ -571,22 +579,60 @@ class SecurityGroupBackend:
return matches
def describe_security_group_rules(
self, group_ids: Optional[List[str]] = None, filters: Any = None
) -> Dict[str, List[SecurityRule]]:
matches = self.describe_security_groups(group_ids=group_ids, filters=filters)
self,
group_ids: Optional[List[str]] = None,
sg_rule_ids: List[str] = [],
filters: Any = None,
) -> List[SecurityRule]:
results = []
if sg_rule_ids:
# go thru all the rules in the backend to find a match
for sg_rule_id in sg_rule_ids:
for sg in self.sg_old_ingress_ruls:
for rule in self.sg_old_ingress_ruls[sg]:
if rule.id == sg_rule_id:
results.append(rule)
return results
if group_ids:
all_sgs = self.describe_security_groups(group_ids=group_ids)
for group in all_sgs:
results.extend(group.ingress_rules)
results.extend(group.egress_rules)
return results
if filters and "group-id" in filters:
matches = self.describe_security_groups(
group_ids=group_ids, filters=filters
)
if not matches:
raise InvalidSecurityGroupNotFoundError(
"No security groups found matching the filters provided."
)
rules = {}
for group in matches:
group_rules = []
group_rules.extend(group.ingress_rules)
group_rules.extend(group.egress_rules)
if group_rules:
rules[group.group_id] = group_rules
results.extend(group.ingress_rules)
results.extend(group.egress_rules)
return rules
return results
all_sgs = self.describe_security_groups()
for group in all_sgs:
results.extend(self._match_sg_rules(group.ingress_rules, filters))
results.extend(self._match_sg_rules(group.egress_rules, filters))
return results
@staticmethod
def _match_sg_rules(rules_list: List[SecurityRule], filters: Any) -> List[SecurityRule]: # type: ignore[misc]
results = []
for rule in rules_list:
if rule.match_tags(filters):
results.append(rule)
return results
def _delete_security_group(self, vpc_id: Optional[str], group_id: str) -> None:
vpc_id = vpc_id or self.default_vpc.id # type: ignore[attr-defined]
@ -657,6 +703,7 @@ class SecurityGroupBackend:
from_port: str,
to_port: str,
ip_ranges: List[Any],
sgrule_tags: Dict[str, str] = {},
source_groups: Optional[List[Dict[str, str]]] = None,
prefix_list_ids: Optional[List[Dict[str, str]]] = None,
security_rule_ids: Optional[List[str]] = None, # pylint:disable=unused-argument
@ -695,12 +742,14 @@ class SecurityGroupBackend:
security_rule = SecurityRule(
self,
ip_protocol,
group.group_id,
from_port,
to_port,
ip_ranges,
_source_groups,
prefix_list_ids,
is_egress=False,
tags=sgrule_tags,
)
if security_rule in group.ingress_rules:
@ -738,6 +787,7 @@ class SecurityGroupBackend:
break
else:
group.add_ingress_rule(security_rule)
return security_rule, group
def revoke_security_group_ingress(
@ -765,6 +815,7 @@ class SecurityGroupBackend:
security_rule = SecurityRule(
self,
ip_protocol,
group.group_id,
from_port,
to_port,
ip_ranges,
@ -819,6 +870,7 @@ class SecurityGroupBackend:
from_port: str,
to_port: str,
ip_ranges: List[Any],
sgrule_tags: Dict[str, str] = {},
source_groups: Optional[List[Dict[str, Any]]] = None,
prefix_list_ids: Optional[List[Dict[str, str]]] = None,
security_rule_ids: Optional[List[str]] = None, # pylint:disable=unused-argument
@ -860,11 +912,13 @@ class SecurityGroupBackend:
security_rule = SecurityRule(
self,
ip_protocol,
group.group_id,
from_port,
to_port,
ip_ranges,
_source_groups,
prefix_list_ids,
tags=sgrule_tags,
)
if security_rule in group.egress_rules:
@ -944,6 +998,7 @@ class SecurityGroupBackend:
security_rule = SecurityRule(
self,
ip_protocol,
group.group_id,
from_port,
to_port,
ip_ranges,
@ -1030,6 +1085,7 @@ class SecurityGroupBackend:
security_rule = SecurityRule(
self,
ip_protocol,
group.group_id,
from_port,
to_port,
ip_ranges,
@ -1085,6 +1141,7 @@ class SecurityGroupBackend:
security_rule = SecurityRule(
self,
ip_protocol,
group.group_id,
from_port,
to_port,
ip_ranges,

View File

@ -86,6 +86,8 @@ class SecurityGroups(EC2BaseResponse):
d = d[subkey]
d[key_splitted[-1]] = value
sg_rule_tags = self._parse_tag_specification().get("security-group-rule", {})
if "IpPermissions" not in querytree:
# Handle single rule syntax
(
@ -97,16 +99,17 @@ class SecurityGroups(EC2BaseResponse):
prefix_list_ids,
) = parse_sg_attributes_from_dict(querytree)
yield (
group_name_or_id,
ip_protocol,
from_port,
to_port,
ip_ranges,
source_groups,
prefix_list_ids,
security_rule_ids,
)
yield {
"group_name_or_id": group_name_or_id,
"ip_protocol": ip_protocol,
"from_port": from_port,
"to_port": to_port,
"ip_ranges": ip_ranges,
"sgrule_tags": sg_rule_tags,
"source_groups": source_groups,
"prefix_list_ids": prefix_list_ids,
"security_rule_ids": security_rule_ids,
}
ip_permissions = querytree.get("IpPermissions") or {}
for ip_permission_idx in sorted(ip_permissions.keys()):
@ -121,22 +124,23 @@ class SecurityGroups(EC2BaseResponse):
prefix_list_ids,
) = parse_sg_attributes_from_dict(ip_permission)
yield (
group_name_or_id,
ip_protocol,
from_port,
to_port,
ip_ranges,
source_groups,
prefix_list_ids,
security_rule_ids,
)
yield {
"group_name_or_id": group_name_or_id,
"ip_protocol": ip_protocol,
"from_port": from_port,
"to_port": to_port,
"ip_ranges": ip_ranges,
"sgrule_tags": sg_rule_tags,
"source_groups": source_groups,
"prefix_list_ids": prefix_list_ids,
"security_rule_ids": security_rule_ids,
}
def authorize_security_group_egress(self) -> str:
self.error_on_dryrun()
for args in self._process_rules_from_querystring():
rule, group = self.ec2_backend.authorize_security_group_egress(*args)
for kwargs in self._process_rules_from_querystring():
rule, group = self.ec2_backend.authorize_security_group_egress(**kwargs)
self.ec2_backend.sg_old_egress_ruls[group.id] = group.egress_rules.copy()
template = self.response_template(AUTHORIZE_SECURITY_GROUP_EGRESS_RESPONSE)
return template.render(rule=rule, group=group)
@ -144,8 +148,8 @@ class SecurityGroups(EC2BaseResponse):
def authorize_security_group_ingress(self) -> str:
self.error_on_dryrun()
for args in self._process_rules_from_querystring():
rule, group = self.ec2_backend.authorize_security_group_ingress(*args)
for kwargs in self._process_rules_from_querystring():
rule, group = self.ec2_backend.authorize_security_group_ingress(**kwargs)
self.ec2_backend.sg_old_ingress_ruls[group.id] = group.ingress_rules.copy()
template = self.response_template(AUTHORIZE_SECURITY_GROUP_INGRESS_RESPONSE)
return template.render(rule=rule, group=group)
@ -198,11 +202,19 @@ class SecurityGroups(EC2BaseResponse):
def describe_security_group_rules(self) -> str:
group_id = self._get_param("GroupId")
sg_rule_ids = self._get_param("SecurityGroupRuleId.1")
filters = self._filters_from_querystring()
self.error_on_dryrun()
rules = self.ec2_backend.describe_security_group_rules(group_id, filters)
# if sg rule ids are not None then wrap in a list
# as expected by ec2_backend.describe_security_group_rules
if sg_rule_ids:
sg_rule_ids = [sg_rule_ids]
rules = self.ec2_backend.describe_security_group_rules(
group_id, sg_rule_ids, filters
)
template = self.response_template(DESCRIBE_SECURITY_GROUP_RULES_RESPONSE)
return template.render(rules=rules)
@ -210,28 +222,36 @@ class SecurityGroups(EC2BaseResponse):
self.error_on_dryrun()
for args in self._process_rules_from_querystring():
self.ec2_backend.revoke_security_group_egress(*args)
# we don't need this parameter to revoke
del args["sgrule_tags"]
self.ec2_backend.revoke_security_group_egress(**args)
return REVOKE_SECURITY_GROUP_EGRESS_RESPONSE
def revoke_security_group_ingress(self) -> str:
self.error_on_dryrun()
for args in self._process_rules_from_querystring():
self.ec2_backend.revoke_security_group_ingress(*args)
# we don't need this parameter to revoke
del args["sgrule_tags"]
self.ec2_backend.revoke_security_group_ingress(**args)
return REVOKE_SECURITY_GROUP_INGRESS_RESPONSE
def update_security_group_rule_descriptions_ingress(self) -> str:
for args in self._process_rules_from_querystring():
# we don't need this parameter to revoke
del args["sgrule_tags"]
group = self.ec2_backend.update_security_group_rule_descriptions_ingress(
*args
**args
)
self.ec2_backend.sg_old_ingress_ruls[group.id] = group.ingress_rules.copy()
return UPDATE_SECURITY_GROUP_RULE_DESCRIPTIONS_INGRESS
def update_security_group_rule_descriptions_egress(self) -> str:
for args in self._process_rules_from_querystring():
# we don't need this parameter to revoke
del args["sgrule_tags"]
group = self.ec2_backend.update_security_group_rule_descriptions_egress(
*args
**args
)
self.ec2_backend.sg_old_egress_ruls[group.id] = group.egress_rules.copy()
return UPDATE_SECURITY_GROUP_RULE_DESCRIPTIONS_EGRESS
@ -255,8 +275,7 @@ DESCRIBE_SECURITY_GROUP_RULES_RESPONSE = """
<DescribeSecurityGroupRulesResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
<requestId>{{ request_id }}</requestId>
<securityGroupRuleSet>
{% for group, rule_list in rules.items() %}
{% for rule in rule_list %}
{% for rule in rules %}
<item>
{% if rule.from_port is not none %}
<fromPort>{{ rule.from_port }}</fromPort>
@ -268,7 +287,7 @@ DESCRIBE_SECURITY_GROUP_RULES_RESPONSE = """
<cidrIpv4>{{ rule.ip_ranges[0]['CidrIp'] }}</cidrIpv4>
{% endif %}
<ipProtocol>{{ rule.ip_protocol }}</ipProtocol>
<groupId>{{ group }}</groupId>
<groupId>{{ rule.group_id }}</groupId>
<groupOwnerId>{{ rule.owner_id }}</groupOwnerId>
<isEgress>{{ 'true' if rule.is_egress else 'false' }}</isEgress>
<securityGroupRuleId>{{ rule.id }}</securityGroupRuleId>
@ -282,7 +301,6 @@ DESCRIBE_SECURITY_GROUP_RULES_RESPONSE = """
</tagSet>
</item>
{% endfor %}
{% endfor %}
</securityGroupRuleSet>
</DescribeSecurityGroupRulesResponse>"""
@ -470,6 +488,14 @@ AUTHORIZE_SECURITY_GROUP_INGRESS_RESPONSE = """<AuthorizeSecurityGroupIngressRes
{% if rule.to_port is not none %}
<toPort>{{ rule.to_port }}</toPort>
{% endif %}
<tagSet>
{% for tag in rule.get_tags() %}
<item>
<key>{{ tag.key }}</key>
<value>{{ tag.value }}</value>
</item>
{% endfor %}
</tagSet>
</item>
{% endfor %}
{% for item in rule.prefix_list_ids %}
@ -550,6 +576,14 @@ AUTHORIZE_SECURITY_GROUP_EGRESS_RESPONSE = """<AuthorizeSecurityGroupEgressRespo
{% if rule.to_port is not none %}
<toPort>{{ rule.to_port }}</toPort>
{% endif %}
<tagSet>
{% for tag in rule.get_tags() %}
<item>
<key>{{ tag.key }}</key>
<value>{{ tag.value }}</value>
</item>
{% endfor %}
</tagSet>
</item>
{% endfor %}
{% for item in rule.prefix_list_ids %}

View File

@ -13,11 +13,13 @@ from moto import mock_aws, settings
from moto.core import DEFAULT_ACCOUNT_ID
from moto.ec2 import ec2_backends
REGION = "us-east-1"
@mock_aws
def test_create_and_describe_security_group():
ec2 = boto3.resource("ec2", "us-west-1")
client = boto3.client("ec2", "us-west-1")
ec2 = boto3.resource("ec2", REGION)
client = boto3.client("ec2", REGION)
with pytest.raises(ClientError) as ex:
client.create_security_group(GroupName="test", Description="test", DryRun=True)
@ -51,7 +53,7 @@ def test_create_and_describe_security_group():
@mock_aws
def test_create_security_group_without_description_raises_error():
ec2 = boto3.resource("ec2", "us-west-1")
ec2 = boto3.resource("ec2", REGION)
with pytest.raises(ClientError) as ex:
ec2.create_security_group(GroupName="test security group", Description="")
@ -62,15 +64,15 @@ def test_create_security_group_without_description_raises_error():
@mock_aws
def test_default_security_group():
client = boto3.client("ec2", "us-west-1")
client = boto3.client("ec2", REGION)
groups = retrieve_all_sgs(client)
assert "default" in [g["GroupName"] for g in groups]
@mock_aws
def test_create_and_describe_vpc_security_group():
ec2 = boto3.resource("ec2", "us-west-1")
client = boto3.client("ec2", "us-west-1")
ec2 = boto3.resource("ec2", REGION)
client = boto3.client("ec2", REGION)
name = str(uuid4())
vpc_id = f"vpc-{str(uuid4())[0:6]}"
@ -111,8 +113,8 @@ def test_create_and_describe_vpc_security_group():
@mock_aws
def test_create_two_security_groups_with_same_name_in_different_vpc():
ec2 = boto3.resource("ec2", "us-east-1")
client = boto3.client("ec2", "us-east-1")
ec2 = boto3.resource("ec2", REGION)
client = boto3.client("ec2", REGION)
name = str(uuid4())
vpc_id = "vpc-5300000c"
@ -132,7 +134,7 @@ def test_create_two_security_groups_with_same_name_in_different_vpc():
@mock_aws
def test_create_two_security_groups_in_vpc_with_ipv6_enabled():
ec2 = boto3.resource("ec2", region_name="us-west-1")
ec2 = boto3.resource("ec2", region_name=REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16", AmazonProvidedIpv6CidrBlock=True)
security_group = ec2.create_security_group(
@ -145,8 +147,8 @@ def test_create_two_security_groups_in_vpc_with_ipv6_enabled():
@mock_aws
def test_deleting_security_groups():
ec2 = boto3.resource("ec2", "us-west-1")
client = boto3.client("ec2", "us-west-1")
ec2 = boto3.resource("ec2", REGION)
client = boto3.client("ec2", REGION)
sg_name1 = str(uuid4())
sg_name2 = str(uuid4())
group1 = ec2.create_security_group(GroupName=sg_name1, Description="test desc 1")
@ -188,8 +190,8 @@ def test_deleting_security_groups():
@mock_aws
def test_delete_security_group_in_vpc():
ec2 = boto3.resource("ec2", "us-west-1")
client = boto3.client("ec2", "us-west-1")
ec2 = boto3.resource("ec2", REGION)
client = boto3.client("ec2", REGION)
group = ec2.create_security_group(
GroupName=str(uuid4()), Description="test1", VpcId="vpc-12345"
@ -207,8 +209,8 @@ def test_delete_security_group_in_vpc():
@mock_aws
def test_authorize_ip_range_and_revoke():
ec2 = boto3.resource("ec2", "us-west-1")
client = boto3.client("ec2", "us-west-1")
ec2 = boto3.resource("ec2", REGION)
client = boto3.client("ec2", REGION)
security_group = ec2.create_security_group(
GroupName=str(uuid4()), Description="test"
)
@ -332,8 +334,8 @@ def test_authorize_ip_range_and_revoke():
@mock_aws
def test_authorize_other_group_and_revoke():
ec2 = boto3.resource("ec2", "us-west-1")
client = boto3.client("ec2", "us-west-1")
ec2 = boto3.resource("ec2", REGION)
client = boto3.client("ec2", REGION)
sg_name = str(uuid4())
security_group = ec2.create_security_group(
GroupName=sg_name, Description="test desc"
@ -389,7 +391,7 @@ def test_authorize_other_group_and_revoke():
@mock_aws
def test_authorize_other_group_egress_and_revoke():
ec2 = boto3.resource("ec2", region_name="us-west-1")
ec2 = boto3.resource("ec2", region_name=REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
@ -476,8 +478,8 @@ def test_authorize_group_in_vpc():
@mock_aws
def test_describe_security_groups():
ec2 = boto3.resource("ec2", "us-west-1")
client = boto3.client("ec2", "us-west-1")
ec2 = boto3.resource("ec2", REGION)
client = boto3.client("ec2", REGION)
vpc_id = f"vpc-{str(uuid4())[0:6]}"
name_1 = str(uuid4())
desc_1 = str(uuid4())
@ -514,7 +516,7 @@ def test_describe_security_groups():
@mock_aws
def test_authorize_bad_cidr_throws_invalid_parameter_value():
ec2 = boto3.resource("ec2", "us-west-1")
ec2 = boto3.resource("ec2", REGION)
sec_group = ec2.create_security_group(GroupName=str(uuid4()), Description="test")
with pytest.raises(ClientError) as ex:
permissions = [
@ -533,8 +535,8 @@ def test_authorize_bad_cidr_throws_invalid_parameter_value():
@mock_aws
def test_security_group_tag_filtering():
ec2 = boto3.resource("ec2", region_name="us-east-1")
client = boto3.client("ec2", region_name="us-east-1")
ec2 = boto3.resource("ec2", region_name=REGION)
client = boto3.client("ec2", region_name=REGION)
sg = ec2.create_security_group(GroupName=str(uuid4()), Description="Test SG")
tag_name = str(uuid4())[0:6]
tag_val = str(uuid4())
@ -553,8 +555,8 @@ def test_security_group_tag_filtering():
@mock_aws
def test_authorize_all_protocols_with_no_port_specification():
ec2 = boto3.resource("ec2", region_name="us-east-1")
client = boto3.client("ec2", region_name="us-east-1")
ec2 = boto3.resource("ec2", region_name=REGION)
client = boto3.client("ec2", region_name=REGION)
sg_name = str(uuid4())
sg = ec2.create_security_group(GroupName=sg_name, Description="test desc")
@ -570,9 +572,9 @@ def test_authorize_all_protocols_with_no_port_specification():
@mock_aws
def test_security_group_rule_tagging():
ec2 = boto3.resource("ec2", "us-east-1")
client = boto3.client("ec2", "us-east-1")
def test_security_group_rule_filtering_group_id():
ec2 = boto3.resource("ec2", REGION)
client = boto3.client("ec2", REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg_name = str(uuid4())
@ -598,10 +600,55 @@ def test_security_group_rule_tagging():
assert response["SecurityGroupRules"][0]["Tags"][0]["Value"] == tag_val
@mock_aws
def test_security_group_rule_filtering_tags():
# Setup
ec2 = boto3.resource("ec2", REGION)
client = boto3.client("ec2", REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
tags = [
{"Key": "Automation", "Value": "Lambda"},
{"Key": "Partner", "Value": "test"},
]
sg_name = str(uuid4())
sg = client.create_security_group(
Description="Test SG",
GroupName=sg_name,
VpcId=vpc.id,
)
# Execute
response1 = client.authorize_security_group_ingress(
GroupId=sg["GroupId"],
IpPermissions=[
{
"IpProtocol": "tcp",
"FromPort": 80,
"ToPort": 80,
"IpRanges": [
{"CidrIp": "1.2.3.4/32", "Description": "Test description"}
],
}
],
TagSpecifications=[{"ResourceType": "security-group-rule", "Tags": tags}],
)
response2 = client.describe_security_group_rules(
Filters=[{"Name": "tag:Partner", "Values": ["test"]}]
)
# Verify
assert response1["SecurityGroupRules"][0]["Tags"] == tags
assert "Tags" in response2["SecurityGroupRules"][0]
assert response2["SecurityGroupRules"][0]["Tags"][1]["Key"] == "Partner"
assert response2["SecurityGroupRules"][0]["Tags"][1]["Value"] == "test"
@mock_aws
def test_create_and_describe_security_grp_rule():
ec2 = boto3.resource("ec2", "us-east-1")
client = boto3.client("ec2", "us-east-1")
ec2 = boto3.resource("ec2", REGION)
client = boto3.client("ec2", REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg_name = str(uuid4())
@ -627,8 +674,8 @@ def test_create_and_describe_security_grp_rule():
@mock_aws
@pytest.mark.parametrize("use_vpc", [True, False], ids=["Use VPC", "Without VPC"])
def test_sec_group_rule_limit(use_vpc):
ec2 = boto3.resource("ec2", region_name="us-west-1")
client = boto3.client("ec2", region_name="us-west-1")
ec2 = boto3.resource("ec2", region_name=REGION)
client = boto3.client("ec2", region_name=REGION)
limit = 60
if use_vpc:
@ -728,7 +775,7 @@ def test_sec_group_rule_limit(use_vpc):
@mock_aws
def test_add_same_rule_twice_throws_error():
ec2 = boto3.resource("ec2", region_name="us-west-1")
ec2 = boto3.resource("ec2", region_name=REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg = ec2.create_security_group(
@ -768,8 +815,8 @@ def test_add_same_rule_twice_throws_error():
@mock_aws
def test_description_in_ip_permissions():
ec2 = boto3.resource("ec2", region_name="us-west-1")
conn = boto3.client("ec2", region_name="us-east-1")
ec2 = boto3.resource("ec2", region_name=REGION)
conn = boto3.client("ec2", region_name=REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg = conn.create_security_group(
GroupName="sg1", Description="Test security group sg1", VpcId=vpc.id
@ -818,7 +865,7 @@ def test_description_in_ip_permissions():
@mock_aws
def test_security_group_tagging():
conn = boto3.client("ec2", region_name="us-east-1")
conn = boto3.client("ec2", region_name=REGION)
sg = conn.create_security_group(GroupName=str(uuid4()), Description="Test SG")
@ -849,7 +896,7 @@ def test_security_group_tagging():
@mock_aws
def test_security_group_wildcard_tag_filter():
conn = boto3.client("ec2", region_name="us-east-1")
conn = boto3.client("ec2", region_name=REGION)
sg = conn.create_security_group(GroupName=str(uuid4()), Description="Test SG")
rand_name = str(uuid4())[0:6]
@ -868,10 +915,10 @@ def test_security_group_wildcard_tag_filter():
@mock_aws
def test_security_group_filter_ip_permission():
ec2 = boto3.resource("ec2", region_name="us-east-1")
ec2 = boto3.resource("ec2", region_name=REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
conn = boto3.client("ec2", region_name="us-east-1")
conn = boto3.client("ec2", region_name=REGION)
sg_name = str(uuid4())[0:6]
sg = ec2.create_security_group(
GroupName=sg_name, Description="Test SG", VpcId=vpc.id
@ -910,7 +957,7 @@ def retrieve_all_sgs(conn, filters=[]): # pylint: disable=W0102
@mock_aws
def test_authorize_and_revoke_in_bulk():
ec2 = boto3.resource("ec2", region_name="us-west-1")
ec2 = boto3.resource("ec2", region_name=REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
@ -1090,8 +1137,8 @@ def test_security_group_ingress_without_multirule_after_reload():
@mock_aws
def test_get_all_security_groups_filter_with_same_vpc_id():
ec2 = boto3.resource("ec2", region_name="us-east-1")
client = boto3.client("ec2", region_name="us-east-1")
ec2 = boto3.resource("ec2", region_name=REGION)
client = boto3.client("ec2", region_name=REGION)
vpc_id = "vpc-5300000c"
security_group = ec2.create_security_group(
GroupName=str(uuid4()), Description="test1", VpcId=vpc_id
@ -1117,7 +1164,7 @@ def test_get_all_security_groups_filter_with_same_vpc_id():
@mock_aws
def test_revoke_security_group_egress():
ec2 = boto3.resource("ec2", "us-east-1")
ec2 = boto3.resource("ec2", REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg = ec2.create_security_group(
Description="Test SG", GroupName=str(uuid4()), VpcId=vpc.id
@ -1177,8 +1224,8 @@ def test_revoke_security_group_egress__without_ipprotocol():
@mock_aws
def test_update_security_group_rule_descriptions_egress():
ec2 = boto3.resource("ec2", "us-east-1")
client = boto3.client("ec2", "us-east-1")
ec2 = boto3.resource("ec2", REGION)
client = boto3.client("ec2", REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg_name = str(uuid4())
sg = ec2.create_security_group(
@ -1214,8 +1261,8 @@ def test_update_security_group_rule_descriptions_egress():
@mock_aws
def test_update_security_group_rule_descriptions_ingress():
ec2 = boto3.resource("ec2", "us-east-1")
client = boto3.client("ec2", "us-east-1")
ec2 = boto3.resource("ec2", REGION)
client = boto3.client("ec2", REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg_name = str(uuid4())
sg = ec2.create_security_group(
@ -1231,8 +1278,13 @@ def test_update_security_group_rule_descriptions_ingress():
"IpRanges": [{"CidrIp": "1.2.3.4/32", "Description": "first desc"}],
}
]
client.authorize_security_group_ingress(GroupId=sg_id, IpPermissions=ip_permissions)
client.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=ip_permissions,
)
client.describe_security_group_rules(
Filters=[{"Name": "tag:Partner", "Values": ["test"]}]
)
ip_ranges = client.describe_security_groups(GroupIds=[sg_id])["SecurityGroups"][0][
"IpPermissions"
][0]["IpRanges"]
@ -1260,7 +1312,7 @@ def test_update_security_group_rule_descriptions_ingress():
@mock_aws
def test_non_existent_security_group_raises_error_on_authorize():
client = boto3.client("ec2", "us-east-1")
client = boto3.client("ec2", REGION)
non_existent_sg = "sg-123abc"
expected_error = f"The security group '{non_existent_sg}' does not exist"
authorize_funcs = [
@ -1278,9 +1330,9 @@ def test_non_existent_security_group_raises_error_on_authorize():
def test_security_group_rules_added_via_the_backend_can_be_revoked_via_the_api():
if settings.TEST_SERVER_MODE:
raise unittest.SkipTest("Can't test backend directly in server mode.")
ec2_backend = ec2_backends[DEFAULT_ACCOUNT_ID]["us-east-1"]
ec2_resource = boto3.resource("ec2", region_name="us-east-1")
ec2_client = boto3.client("ec2", region_name="us-east-1")
ec2_backend = ec2_backends[DEFAULT_ACCOUNT_ID][REGION]
ec2_resource = boto3.resource("ec2", region_name=REGION)
ec2_client = boto3.client("ec2", region_name=REGION)
vpc = ec2_resource.create_vpc(CidrBlock="10.0.0.0/16")
group_name = str(uuid4())
sg = ec2_resource.create_security_group(
@ -1329,7 +1381,7 @@ def test_security_group_rules_added_via_the_backend_can_be_revoked_via_the_api()
@mock_aws
def test_filter_description():
ec2r = boto3.resource("ec2", region_name="us-west-1")
ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.0.0/16")
unique = str(uuid4())
@ -1360,7 +1412,7 @@ def test_filter_ip_permission__cidr():
raise SkipTest(
"CIDR's might already exist due to other tests creating IP ranges"
)
ec2r = boto3.resource("ec2", region_name="us-west-1")
ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@ -1407,7 +1459,7 @@ def test_filter_egress__ip_permission__cidr():
raise SkipTest(
"CIDR's might already exist due to other tests creating IP ranges"
)
ec2r = boto3.resource("ec2", region_name="us-west-1")
ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@ -1450,7 +1502,7 @@ def test_filter_egress__ip_permission__cidr():
@mock_aws
def test_filter_egress__ip_permission__from_port():
ec2r = boto3.resource("ec2", region_name="us-west-1")
ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@ -1494,7 +1546,7 @@ def test_filter_egress__ip_permission__from_port():
@mock_aws
def test_filter_egress__ip_permission__group_id():
ec2r = boto3.resource("ec2", region_name="us-west-1")
ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@ -1547,7 +1599,7 @@ def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_name(
"""
this fails to find the group in the AWS API, so we should also fail to find it
"""
ec2r = boto3.resource("ec2", region_name="us-west-1")
ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@ -1599,7 +1651,7 @@ def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_name(
@mock_aws
def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_id():
ec2r = boto3.resource("ec2", region_name="us-west-1")
ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@ -1652,7 +1704,7 @@ def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_id():
@mock_aws
def test_filter_egress__ip_permission__protocol():
ec2r = boto3.resource("ec2", region_name="us-west-1")
ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@ -1695,7 +1747,7 @@ def test_filter_egress__ip_permission__protocol():
@mock_aws
def test_filter_egress__ip_permission__to_port():
ec2r = boto3.resource("ec2", region_name="us-west-1")
ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@ -1739,7 +1791,7 @@ def test_filter_egress__ip_permission__to_port():
@mock_aws
def test_get_groups_by_ippermissions_group_id_filter():
ec2r = boto3.resource("ec2", region_name="us-west-1")
ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.0.0/16")
sg1 = vpc.create_security_group(Description="test", GroupName="test-1")
sg2 = vpc.create_security_group(Description="test", GroupName="test-2")
@ -1772,7 +1824,7 @@ def test_get_groups_by_ippermissions_group_id_filter():
def test_get_groups_by_ippermissions_group_id_filter_across_vpcs():
# setup 2 VPCs, each with a single Security Group
# where one security group authorizes the other sg (in another vpc) via GroupId
ec2r = boto3.resource("ec2", region_name="us-west-1")
ec2r = boto3.resource("ec2", region_name=REGION)
vpc1 = ec2r.create_vpc(CidrBlock="10.250.0.0/16")
vpc2 = ec2r.create_vpc(CidrBlock="10.251.0.0/16")
@ -1809,7 +1861,7 @@ def test_filter_group_name():
"""
this filter is an exact match, not a glob
"""
ec2r = boto3.resource("ec2", region_name="us-west-1")
ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
uniq_sg_name_prefix = str(uuid4())[0:6]
@ -1843,7 +1895,7 @@ def test_filter_group_name():
@mock_aws
def test_revoke_security_group_ingress():
ec2 = boto3.client("ec2", region_name="us-east-1")
ec2 = boto3.client("ec2", region_name=REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")