diff --git a/moto/ec2/models/core.py b/moto/ec2/models/core.py
index 0ecc94dcf..581e44dd3 100644
--- a/moto/ec2/models/core.py
+++ b/moto/ec2/models/core.py
@@ -41,3 +41,10 @@ class TaggedEC2Resource(BaseModel):
return value
raise FilterNotImplementedError(filter_name, method_name)
+
+ def match_tags(self, filters: Dict[str, str]) -> bool:
+ for tag_name in filters.keys():
+ tag_value = self.get_filter_value(tag_name)
+ if tag_value == filters[tag_name][0]:
+ return True
+ return False
diff --git a/moto/ec2/models/security_groups.py b/moto/ec2/models/security_groups.py
index b432cca39..0a776d76f 100644
--- a/moto/ec2/models/security_groups.py
+++ b/moto/ec2/models/security_groups.py
@@ -33,12 +33,15 @@ class SecurityRule(TaggedEC2Resource):
self,
ec2_backend: Any,
ip_protocol: str,
+ group_id: str,
from_port: Optional[str],
to_port: Optional[str],
ip_ranges: Optional[List[Any]],
source_groups: List[Dict[str, Any]],
prefix_list_ids: Optional[List[Dict[str, str]]] = None,
is_egress: bool = True,
+ tags: Dict[str, str] = {},
+ description: str = "",
):
self.ec2_backend = ec2_backend
self.id = random_security_group_rule_id()
@@ -48,6 +51,8 @@ class SecurityRule(TaggedEC2Resource):
self.prefix_list_ids = prefix_list_ids or []
self.from_port = self.to_port = None
self.is_egress = is_egress
+ self.description = description
+ self.group_id = group_id
if self.ip_protocol and self.ip_protocol != "-1":
self.from_port = int(from_port) # type: ignore[arg-type]
@@ -73,6 +78,7 @@ class SecurityRule(TaggedEC2Resource):
else None
)
self.ip_protocol = proto if proto else self.ip_protocol
+ self.add_tags(tags)
@property
def owner_id(self) -> str:
@@ -156,6 +162,7 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
SecurityRule(
self.ec2_backend,
"-1",
+ self.id,
None,
None,
[{"CidrIp": "0.0.0.0/0"}],
@@ -167,6 +174,7 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
SecurityRule(
self.ec2_backend,
"-1",
+ self.id,
None,
None,
[{"CidrIpv6": "::/0"}],
@@ -571,22 +579,60 @@ class SecurityGroupBackend:
return matches
def describe_security_group_rules(
- self, group_ids: Optional[List[str]] = None, filters: Any = None
- ) -> Dict[str, List[SecurityRule]]:
- matches = self.describe_security_groups(group_ids=group_ids, filters=filters)
- if not matches:
- raise InvalidSecurityGroupNotFoundError(
- "No security groups found matching the filters provided."
- )
- rules = {}
- for group in matches:
- group_rules = []
- group_rules.extend(group.ingress_rules)
- group_rules.extend(group.egress_rules)
- if group_rules:
- rules[group.group_id] = group_rules
+ self,
+ group_ids: Optional[List[str]] = None,
+ sg_rule_ids: List[str] = [],
+ filters: Any = None,
+ ) -> List[SecurityRule]:
+ results = []
- return rules
+ if sg_rule_ids:
+ # go thru all the rules in the backend to find a match
+ for sg_rule_id in sg_rule_ids:
+ for sg in self.sg_old_ingress_ruls:
+ for rule in self.sg_old_ingress_ruls[sg]:
+ if rule.id == sg_rule_id:
+ results.append(rule)
+
+ return results
+
+ if group_ids:
+ all_sgs = self.describe_security_groups(group_ids=group_ids)
+ for group in all_sgs:
+ results.extend(group.ingress_rules)
+ results.extend(group.egress_rules)
+
+ return results
+
+ if filters and "group-id" in filters:
+ matches = self.describe_security_groups(
+ group_ids=group_ids, filters=filters
+ )
+ if not matches:
+ raise InvalidSecurityGroupNotFoundError(
+ "No security groups found matching the filters provided."
+ )
+ for group in matches:
+ results.extend(group.ingress_rules)
+ results.extend(group.egress_rules)
+
+ return results
+
+ all_sgs = self.describe_security_groups()
+
+ for group in all_sgs:
+ results.extend(self._match_sg_rules(group.ingress_rules, filters))
+ results.extend(self._match_sg_rules(group.egress_rules, filters))
+
+ return results
+
+ @staticmethod
+ def _match_sg_rules(rules_list: List[SecurityRule], filters: Any) -> List[SecurityRule]: # type: ignore[misc]
+ results = []
+ for rule in rules_list:
+ if rule.match_tags(filters):
+ results.append(rule)
+ return results
def _delete_security_group(self, vpc_id: Optional[str], group_id: str) -> None:
vpc_id = vpc_id or self.default_vpc.id # type: ignore[attr-defined]
@@ -657,6 +703,7 @@ class SecurityGroupBackend:
from_port: str,
to_port: str,
ip_ranges: List[Any],
+ sgrule_tags: Dict[str, str] = {},
source_groups: Optional[List[Dict[str, str]]] = None,
prefix_list_ids: Optional[List[Dict[str, str]]] = None,
security_rule_ids: Optional[List[str]] = None, # pylint:disable=unused-argument
@@ -695,12 +742,14 @@ class SecurityGroupBackend:
security_rule = SecurityRule(
self,
ip_protocol,
+ group.group_id,
from_port,
to_port,
ip_ranges,
_source_groups,
prefix_list_ids,
is_egress=False,
+ tags=sgrule_tags,
)
if security_rule in group.ingress_rules:
@@ -738,6 +787,7 @@ class SecurityGroupBackend:
break
else:
group.add_ingress_rule(security_rule)
+
return security_rule, group
def revoke_security_group_ingress(
@@ -765,6 +815,7 @@ class SecurityGroupBackend:
security_rule = SecurityRule(
self,
ip_protocol,
+ group.group_id,
from_port,
to_port,
ip_ranges,
@@ -819,6 +870,7 @@ class SecurityGroupBackend:
from_port: str,
to_port: str,
ip_ranges: List[Any],
+ sgrule_tags: Dict[str, str] = {},
source_groups: Optional[List[Dict[str, Any]]] = None,
prefix_list_ids: Optional[List[Dict[str, str]]] = None,
security_rule_ids: Optional[List[str]] = None, # pylint:disable=unused-argument
@@ -860,11 +912,13 @@ class SecurityGroupBackend:
security_rule = SecurityRule(
self,
ip_protocol,
+ group.group_id,
from_port,
to_port,
ip_ranges,
_source_groups,
prefix_list_ids,
+ tags=sgrule_tags,
)
if security_rule in group.egress_rules:
@@ -944,6 +998,7 @@ class SecurityGroupBackend:
security_rule = SecurityRule(
self,
ip_protocol,
+ group.group_id,
from_port,
to_port,
ip_ranges,
@@ -1030,6 +1085,7 @@ class SecurityGroupBackend:
security_rule = SecurityRule(
self,
ip_protocol,
+ group.group_id,
from_port,
to_port,
ip_ranges,
@@ -1085,6 +1141,7 @@ class SecurityGroupBackend:
security_rule = SecurityRule(
self,
ip_protocol,
+ group.group_id,
from_port,
to_port,
ip_ranges,
diff --git a/moto/ec2/responses/security_groups.py b/moto/ec2/responses/security_groups.py
index b65949291..49553d05e 100644
--- a/moto/ec2/responses/security_groups.py
+++ b/moto/ec2/responses/security_groups.py
@@ -86,6 +86,8 @@ class SecurityGroups(EC2BaseResponse):
d = d[subkey]
d[key_splitted[-1]] = value
+ sg_rule_tags = self._parse_tag_specification().get("security-group-rule", {})
+
if "IpPermissions" not in querytree:
# Handle single rule syntax
(
@@ -97,16 +99,17 @@ class SecurityGroups(EC2BaseResponse):
prefix_list_ids,
) = parse_sg_attributes_from_dict(querytree)
- yield (
- group_name_or_id,
- ip_protocol,
- from_port,
- to_port,
- ip_ranges,
- source_groups,
- prefix_list_ids,
- security_rule_ids,
- )
+ yield {
+ "group_name_or_id": group_name_or_id,
+ "ip_protocol": ip_protocol,
+ "from_port": from_port,
+ "to_port": to_port,
+ "ip_ranges": ip_ranges,
+ "sgrule_tags": sg_rule_tags,
+ "source_groups": source_groups,
+ "prefix_list_ids": prefix_list_ids,
+ "security_rule_ids": security_rule_ids,
+ }
ip_permissions = querytree.get("IpPermissions") or {}
for ip_permission_idx in sorted(ip_permissions.keys()):
@@ -121,22 +124,23 @@ class SecurityGroups(EC2BaseResponse):
prefix_list_ids,
) = parse_sg_attributes_from_dict(ip_permission)
- yield (
- group_name_or_id,
- ip_protocol,
- from_port,
- to_port,
- ip_ranges,
- source_groups,
- prefix_list_ids,
- security_rule_ids,
- )
+ yield {
+ "group_name_or_id": group_name_or_id,
+ "ip_protocol": ip_protocol,
+ "from_port": from_port,
+ "to_port": to_port,
+ "ip_ranges": ip_ranges,
+ "sgrule_tags": sg_rule_tags,
+ "source_groups": source_groups,
+ "prefix_list_ids": prefix_list_ids,
+ "security_rule_ids": security_rule_ids,
+ }
def authorize_security_group_egress(self) -> str:
self.error_on_dryrun()
- for args in self._process_rules_from_querystring():
- rule, group = self.ec2_backend.authorize_security_group_egress(*args)
+ for kwargs in self._process_rules_from_querystring():
+ rule, group = self.ec2_backend.authorize_security_group_egress(**kwargs)
self.ec2_backend.sg_old_egress_ruls[group.id] = group.egress_rules.copy()
template = self.response_template(AUTHORIZE_SECURITY_GROUP_EGRESS_RESPONSE)
return template.render(rule=rule, group=group)
@@ -144,8 +148,8 @@ class SecurityGroups(EC2BaseResponse):
def authorize_security_group_ingress(self) -> str:
self.error_on_dryrun()
- for args in self._process_rules_from_querystring():
- rule, group = self.ec2_backend.authorize_security_group_ingress(*args)
+ for kwargs in self._process_rules_from_querystring():
+ rule, group = self.ec2_backend.authorize_security_group_ingress(**kwargs)
self.ec2_backend.sg_old_ingress_ruls[group.id] = group.ingress_rules.copy()
template = self.response_template(AUTHORIZE_SECURITY_GROUP_INGRESS_RESPONSE)
return template.render(rule=rule, group=group)
@@ -198,11 +202,19 @@ class SecurityGroups(EC2BaseResponse):
def describe_security_group_rules(self) -> str:
group_id = self._get_param("GroupId")
+ sg_rule_ids = self._get_param("SecurityGroupRuleId.1")
filters = self._filters_from_querystring()
self.error_on_dryrun()
- rules = self.ec2_backend.describe_security_group_rules(group_id, filters)
+ # if sg rule ids are not None then wrap in a list
+ # as expected by ec2_backend.describe_security_group_rules
+ if sg_rule_ids:
+ sg_rule_ids = [sg_rule_ids]
+
+ rules = self.ec2_backend.describe_security_group_rules(
+ group_id, sg_rule_ids, filters
+ )
template = self.response_template(DESCRIBE_SECURITY_GROUP_RULES_RESPONSE)
return template.render(rules=rules)
@@ -210,28 +222,36 @@ class SecurityGroups(EC2BaseResponse):
self.error_on_dryrun()
for args in self._process_rules_from_querystring():
- self.ec2_backend.revoke_security_group_egress(*args)
+ # we don't need this parameter to revoke
+ del args["sgrule_tags"]
+ self.ec2_backend.revoke_security_group_egress(**args)
return REVOKE_SECURITY_GROUP_EGRESS_RESPONSE
def revoke_security_group_ingress(self) -> str:
self.error_on_dryrun()
for args in self._process_rules_from_querystring():
- self.ec2_backend.revoke_security_group_ingress(*args)
+ # we don't need this parameter to revoke
+ del args["sgrule_tags"]
+ self.ec2_backend.revoke_security_group_ingress(**args)
return REVOKE_SECURITY_GROUP_INGRESS_RESPONSE
def update_security_group_rule_descriptions_ingress(self) -> str:
for args in self._process_rules_from_querystring():
+ # we don't need this parameter to revoke
+ del args["sgrule_tags"]
group = self.ec2_backend.update_security_group_rule_descriptions_ingress(
- *args
+ **args
)
self.ec2_backend.sg_old_ingress_ruls[group.id] = group.ingress_rules.copy()
return UPDATE_SECURITY_GROUP_RULE_DESCRIPTIONS_INGRESS
def update_security_group_rule_descriptions_egress(self) -> str:
for args in self._process_rules_from_querystring():
+ # we don't need this parameter to revoke
+ del args["sgrule_tags"]
group = self.ec2_backend.update_security_group_rule_descriptions_egress(
- *args
+ **args
)
self.ec2_backend.sg_old_egress_ruls[group.id] = group.egress_rules.copy()
return UPDATE_SECURITY_GROUP_RULE_DESCRIPTIONS_EGRESS
@@ -255,8 +275,7 @@ DESCRIBE_SECURITY_GROUP_RULES_RESPONSE = """
{{ request_id }}
- {% for group, rule_list in rules.items() %}
- {% for rule in rule_list %}
+ {% for rule in rules %}
-
{% if rule.from_port is not none %}
{{ rule.from_port }}
@@ -268,7 +287,7 @@ DESCRIBE_SECURITY_GROUP_RULES_RESPONSE = """
{{ rule.ip_ranges[0]['CidrIp'] }}
{% endif %}
{{ rule.ip_protocol }}
- {{ group }}
+ {{ rule.group_id }}
{{ rule.owner_id }}
{{ 'true' if rule.is_egress else 'false' }}
{{ rule.id }}
@@ -282,7 +301,6 @@ DESCRIBE_SECURITY_GROUP_RULES_RESPONSE = """
{% endfor %}
- {% endfor %}
"""
@@ -470,6 +488,14 @@ AUTHORIZE_SECURITY_GROUP_INGRESS_RESPONSE = """{{ rule.to_port }}
{% endif %}
+
+ {% for tag in rule.get_tags() %}
+ -
+ {{ tag.key }}
+ {{ tag.value }}
+
+ {% endfor %}
+
{% endfor %}
{% for item in rule.prefix_list_ids %}
@@ -550,6 +576,14 @@ AUTHORIZE_SECURITY_GROUP_EGRESS_RESPONSE = """{{ rule.to_port }}
{% endif %}
+
+ {% for tag in rule.get_tags() %}
+ -
+ {{ tag.key }}
+ {{ tag.value }}
+
+ {% endfor %}
+
{% endfor %}
{% for item in rule.prefix_list_ids %}
diff --git a/tests/test_ec2/test_security_groups.py b/tests/test_ec2/test_security_groups.py
index d826c1578..3486c395c 100644
--- a/tests/test_ec2/test_security_groups.py
+++ b/tests/test_ec2/test_security_groups.py
@@ -13,11 +13,13 @@ from moto import mock_aws, settings
from moto.core import DEFAULT_ACCOUNT_ID
from moto.ec2 import ec2_backends
+REGION = "us-east-1"
+
@mock_aws
def test_create_and_describe_security_group():
- ec2 = boto3.resource("ec2", "us-west-1")
- client = boto3.client("ec2", "us-west-1")
+ ec2 = boto3.resource("ec2", REGION)
+ client = boto3.client("ec2", REGION)
with pytest.raises(ClientError) as ex:
client.create_security_group(GroupName="test", Description="test", DryRun=True)
@@ -51,7 +53,7 @@ def test_create_and_describe_security_group():
@mock_aws
def test_create_security_group_without_description_raises_error():
- ec2 = boto3.resource("ec2", "us-west-1")
+ ec2 = boto3.resource("ec2", REGION)
with pytest.raises(ClientError) as ex:
ec2.create_security_group(GroupName="test security group", Description="")
@@ -62,15 +64,15 @@ def test_create_security_group_without_description_raises_error():
@mock_aws
def test_default_security_group():
- client = boto3.client("ec2", "us-west-1")
+ client = boto3.client("ec2", REGION)
groups = retrieve_all_sgs(client)
assert "default" in [g["GroupName"] for g in groups]
@mock_aws
def test_create_and_describe_vpc_security_group():
- ec2 = boto3.resource("ec2", "us-west-1")
- client = boto3.client("ec2", "us-west-1")
+ ec2 = boto3.resource("ec2", REGION)
+ client = boto3.client("ec2", REGION)
name = str(uuid4())
vpc_id = f"vpc-{str(uuid4())[0:6]}"
@@ -111,8 +113,8 @@ def test_create_and_describe_vpc_security_group():
@mock_aws
def test_create_two_security_groups_with_same_name_in_different_vpc():
- ec2 = boto3.resource("ec2", "us-east-1")
- client = boto3.client("ec2", "us-east-1")
+ ec2 = boto3.resource("ec2", REGION)
+ client = boto3.client("ec2", REGION)
name = str(uuid4())
vpc_id = "vpc-5300000c"
@@ -132,7 +134,7 @@ def test_create_two_security_groups_with_same_name_in_different_vpc():
@mock_aws
def test_create_two_security_groups_in_vpc_with_ipv6_enabled():
- ec2 = boto3.resource("ec2", region_name="us-west-1")
+ ec2 = boto3.resource("ec2", region_name=REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16", AmazonProvidedIpv6CidrBlock=True)
security_group = ec2.create_security_group(
@@ -145,8 +147,8 @@ def test_create_two_security_groups_in_vpc_with_ipv6_enabled():
@mock_aws
def test_deleting_security_groups():
- ec2 = boto3.resource("ec2", "us-west-1")
- client = boto3.client("ec2", "us-west-1")
+ ec2 = boto3.resource("ec2", REGION)
+ client = boto3.client("ec2", REGION)
sg_name1 = str(uuid4())
sg_name2 = str(uuid4())
group1 = ec2.create_security_group(GroupName=sg_name1, Description="test desc 1")
@@ -188,8 +190,8 @@ def test_deleting_security_groups():
@mock_aws
def test_delete_security_group_in_vpc():
- ec2 = boto3.resource("ec2", "us-west-1")
- client = boto3.client("ec2", "us-west-1")
+ ec2 = boto3.resource("ec2", REGION)
+ client = boto3.client("ec2", REGION)
group = ec2.create_security_group(
GroupName=str(uuid4()), Description="test1", VpcId="vpc-12345"
@@ -207,8 +209,8 @@ def test_delete_security_group_in_vpc():
@mock_aws
def test_authorize_ip_range_and_revoke():
- ec2 = boto3.resource("ec2", "us-west-1")
- client = boto3.client("ec2", "us-west-1")
+ ec2 = boto3.resource("ec2", REGION)
+ client = boto3.client("ec2", REGION)
security_group = ec2.create_security_group(
GroupName=str(uuid4()), Description="test"
)
@@ -332,8 +334,8 @@ def test_authorize_ip_range_and_revoke():
@mock_aws
def test_authorize_other_group_and_revoke():
- ec2 = boto3.resource("ec2", "us-west-1")
- client = boto3.client("ec2", "us-west-1")
+ ec2 = boto3.resource("ec2", REGION)
+ client = boto3.client("ec2", REGION)
sg_name = str(uuid4())
security_group = ec2.create_security_group(
GroupName=sg_name, Description="test desc"
@@ -389,7 +391,7 @@ def test_authorize_other_group_and_revoke():
@mock_aws
def test_authorize_other_group_egress_and_revoke():
- ec2 = boto3.resource("ec2", region_name="us-west-1")
+ ec2 = boto3.resource("ec2", region_name=REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
@@ -476,8 +478,8 @@ def test_authorize_group_in_vpc():
@mock_aws
def test_describe_security_groups():
- ec2 = boto3.resource("ec2", "us-west-1")
- client = boto3.client("ec2", "us-west-1")
+ ec2 = boto3.resource("ec2", REGION)
+ client = boto3.client("ec2", REGION)
vpc_id = f"vpc-{str(uuid4())[0:6]}"
name_1 = str(uuid4())
desc_1 = str(uuid4())
@@ -514,7 +516,7 @@ def test_describe_security_groups():
@mock_aws
def test_authorize_bad_cidr_throws_invalid_parameter_value():
- ec2 = boto3.resource("ec2", "us-west-1")
+ ec2 = boto3.resource("ec2", REGION)
sec_group = ec2.create_security_group(GroupName=str(uuid4()), Description="test")
with pytest.raises(ClientError) as ex:
permissions = [
@@ -533,8 +535,8 @@ def test_authorize_bad_cidr_throws_invalid_parameter_value():
@mock_aws
def test_security_group_tag_filtering():
- ec2 = boto3.resource("ec2", region_name="us-east-1")
- client = boto3.client("ec2", region_name="us-east-1")
+ ec2 = boto3.resource("ec2", region_name=REGION)
+ client = boto3.client("ec2", region_name=REGION)
sg = ec2.create_security_group(GroupName=str(uuid4()), Description="Test SG")
tag_name = str(uuid4())[0:6]
tag_val = str(uuid4())
@@ -553,8 +555,8 @@ def test_security_group_tag_filtering():
@mock_aws
def test_authorize_all_protocols_with_no_port_specification():
- ec2 = boto3.resource("ec2", region_name="us-east-1")
- client = boto3.client("ec2", region_name="us-east-1")
+ ec2 = boto3.resource("ec2", region_name=REGION)
+ client = boto3.client("ec2", region_name=REGION)
sg_name = str(uuid4())
sg = ec2.create_security_group(GroupName=sg_name, Description="test desc")
@@ -570,9 +572,9 @@ def test_authorize_all_protocols_with_no_port_specification():
@mock_aws
-def test_security_group_rule_tagging():
- ec2 = boto3.resource("ec2", "us-east-1")
- client = boto3.client("ec2", "us-east-1")
+def test_security_group_rule_filtering_group_id():
+ ec2 = boto3.resource("ec2", REGION)
+ client = boto3.client("ec2", REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg_name = str(uuid4())
@@ -598,10 +600,55 @@ def test_security_group_rule_tagging():
assert response["SecurityGroupRules"][0]["Tags"][0]["Value"] == tag_val
+@mock_aws
+def test_security_group_rule_filtering_tags():
+ # Setup
+ ec2 = boto3.resource("ec2", REGION)
+ client = boto3.client("ec2", REGION)
+ vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
+ tags = [
+ {"Key": "Automation", "Value": "Lambda"},
+ {"Key": "Partner", "Value": "test"},
+ ]
+
+ sg_name = str(uuid4())
+ sg = client.create_security_group(
+ Description="Test SG",
+ GroupName=sg_name,
+ VpcId=vpc.id,
+ )
+
+ # Execute
+ response1 = client.authorize_security_group_ingress(
+ GroupId=sg["GroupId"],
+ IpPermissions=[
+ {
+ "IpProtocol": "tcp",
+ "FromPort": 80,
+ "ToPort": 80,
+ "IpRanges": [
+ {"CidrIp": "1.2.3.4/32", "Description": "Test description"}
+ ],
+ }
+ ],
+ TagSpecifications=[{"ResourceType": "security-group-rule", "Tags": tags}],
+ )
+
+ response2 = client.describe_security_group_rules(
+ Filters=[{"Name": "tag:Partner", "Values": ["test"]}]
+ )
+
+ # Verify
+ assert response1["SecurityGroupRules"][0]["Tags"] == tags
+ assert "Tags" in response2["SecurityGroupRules"][0]
+ assert response2["SecurityGroupRules"][0]["Tags"][1]["Key"] == "Partner"
+ assert response2["SecurityGroupRules"][0]["Tags"][1]["Value"] == "test"
+
+
@mock_aws
def test_create_and_describe_security_grp_rule():
- ec2 = boto3.resource("ec2", "us-east-1")
- client = boto3.client("ec2", "us-east-1")
+ ec2 = boto3.resource("ec2", REGION)
+ client = boto3.client("ec2", REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg_name = str(uuid4())
@@ -627,8 +674,8 @@ def test_create_and_describe_security_grp_rule():
@mock_aws
@pytest.mark.parametrize("use_vpc", [True, False], ids=["Use VPC", "Without VPC"])
def test_sec_group_rule_limit(use_vpc):
- ec2 = boto3.resource("ec2", region_name="us-west-1")
- client = boto3.client("ec2", region_name="us-west-1")
+ ec2 = boto3.resource("ec2", region_name=REGION)
+ client = boto3.client("ec2", region_name=REGION)
limit = 60
if use_vpc:
@@ -728,7 +775,7 @@ def test_sec_group_rule_limit(use_vpc):
@mock_aws
def test_add_same_rule_twice_throws_error():
- ec2 = boto3.resource("ec2", region_name="us-west-1")
+ ec2 = boto3.resource("ec2", region_name=REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg = ec2.create_security_group(
@@ -768,8 +815,8 @@ def test_add_same_rule_twice_throws_error():
@mock_aws
def test_description_in_ip_permissions():
- ec2 = boto3.resource("ec2", region_name="us-west-1")
- conn = boto3.client("ec2", region_name="us-east-1")
+ ec2 = boto3.resource("ec2", region_name=REGION)
+ conn = boto3.client("ec2", region_name=REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg = conn.create_security_group(
GroupName="sg1", Description="Test security group sg1", VpcId=vpc.id
@@ -818,7 +865,7 @@ def test_description_in_ip_permissions():
@mock_aws
def test_security_group_tagging():
- conn = boto3.client("ec2", region_name="us-east-1")
+ conn = boto3.client("ec2", region_name=REGION)
sg = conn.create_security_group(GroupName=str(uuid4()), Description="Test SG")
@@ -849,7 +896,7 @@ def test_security_group_tagging():
@mock_aws
def test_security_group_wildcard_tag_filter():
- conn = boto3.client("ec2", region_name="us-east-1")
+ conn = boto3.client("ec2", region_name=REGION)
sg = conn.create_security_group(GroupName=str(uuid4()), Description="Test SG")
rand_name = str(uuid4())[0:6]
@@ -868,10 +915,10 @@ def test_security_group_wildcard_tag_filter():
@mock_aws
def test_security_group_filter_ip_permission():
- ec2 = boto3.resource("ec2", region_name="us-east-1")
+ ec2 = boto3.resource("ec2", region_name=REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
- conn = boto3.client("ec2", region_name="us-east-1")
+ conn = boto3.client("ec2", region_name=REGION)
sg_name = str(uuid4())[0:6]
sg = ec2.create_security_group(
GroupName=sg_name, Description="Test SG", VpcId=vpc.id
@@ -910,7 +957,7 @@ def retrieve_all_sgs(conn, filters=[]): # pylint: disable=W0102
@mock_aws
def test_authorize_and_revoke_in_bulk():
- ec2 = boto3.resource("ec2", region_name="us-west-1")
+ ec2 = boto3.resource("ec2", region_name=REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
@@ -1090,8 +1137,8 @@ def test_security_group_ingress_without_multirule_after_reload():
@mock_aws
def test_get_all_security_groups_filter_with_same_vpc_id():
- ec2 = boto3.resource("ec2", region_name="us-east-1")
- client = boto3.client("ec2", region_name="us-east-1")
+ ec2 = boto3.resource("ec2", region_name=REGION)
+ client = boto3.client("ec2", region_name=REGION)
vpc_id = "vpc-5300000c"
security_group = ec2.create_security_group(
GroupName=str(uuid4()), Description="test1", VpcId=vpc_id
@@ -1117,7 +1164,7 @@ def test_get_all_security_groups_filter_with_same_vpc_id():
@mock_aws
def test_revoke_security_group_egress():
- ec2 = boto3.resource("ec2", "us-east-1")
+ ec2 = boto3.resource("ec2", REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg = ec2.create_security_group(
Description="Test SG", GroupName=str(uuid4()), VpcId=vpc.id
@@ -1177,8 +1224,8 @@ def test_revoke_security_group_egress__without_ipprotocol():
@mock_aws
def test_update_security_group_rule_descriptions_egress():
- ec2 = boto3.resource("ec2", "us-east-1")
- client = boto3.client("ec2", "us-east-1")
+ ec2 = boto3.resource("ec2", REGION)
+ client = boto3.client("ec2", REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg_name = str(uuid4())
sg = ec2.create_security_group(
@@ -1214,8 +1261,8 @@ def test_update_security_group_rule_descriptions_egress():
@mock_aws
def test_update_security_group_rule_descriptions_ingress():
- ec2 = boto3.resource("ec2", "us-east-1")
- client = boto3.client("ec2", "us-east-1")
+ ec2 = boto3.resource("ec2", REGION)
+ client = boto3.client("ec2", REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
sg_name = str(uuid4())
sg = ec2.create_security_group(
@@ -1231,8 +1278,13 @@ def test_update_security_group_rule_descriptions_ingress():
"IpRanges": [{"CidrIp": "1.2.3.4/32", "Description": "first desc"}],
}
]
- client.authorize_security_group_ingress(GroupId=sg_id, IpPermissions=ip_permissions)
-
+ client.authorize_security_group_ingress(
+ GroupId=sg_id,
+ IpPermissions=ip_permissions,
+ )
+ client.describe_security_group_rules(
+ Filters=[{"Name": "tag:Partner", "Values": ["test"]}]
+ )
ip_ranges = client.describe_security_groups(GroupIds=[sg_id])["SecurityGroups"][0][
"IpPermissions"
][0]["IpRanges"]
@@ -1260,7 +1312,7 @@ def test_update_security_group_rule_descriptions_ingress():
@mock_aws
def test_non_existent_security_group_raises_error_on_authorize():
- client = boto3.client("ec2", "us-east-1")
+ client = boto3.client("ec2", REGION)
non_existent_sg = "sg-123abc"
expected_error = f"The security group '{non_existent_sg}' does not exist"
authorize_funcs = [
@@ -1278,9 +1330,9 @@ def test_non_existent_security_group_raises_error_on_authorize():
def test_security_group_rules_added_via_the_backend_can_be_revoked_via_the_api():
if settings.TEST_SERVER_MODE:
raise unittest.SkipTest("Can't test backend directly in server mode.")
- ec2_backend = ec2_backends[DEFAULT_ACCOUNT_ID]["us-east-1"]
- ec2_resource = boto3.resource("ec2", region_name="us-east-1")
- ec2_client = boto3.client("ec2", region_name="us-east-1")
+ ec2_backend = ec2_backends[DEFAULT_ACCOUNT_ID][REGION]
+ ec2_resource = boto3.resource("ec2", region_name=REGION)
+ ec2_client = boto3.client("ec2", region_name=REGION)
vpc = ec2_resource.create_vpc(CidrBlock="10.0.0.0/16")
group_name = str(uuid4())
sg = ec2_resource.create_security_group(
@@ -1329,7 +1381,7 @@ def test_security_group_rules_added_via_the_backend_can_be_revoked_via_the_api()
@mock_aws
def test_filter_description():
- ec2r = boto3.resource("ec2", region_name="us-west-1")
+ ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.0.0/16")
unique = str(uuid4())
@@ -1360,7 +1412,7 @@ def test_filter_ip_permission__cidr():
raise SkipTest(
"CIDR's might already exist due to other tests creating IP ranges"
)
- ec2r = boto3.resource("ec2", region_name="us-west-1")
+ ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@@ -1407,7 +1459,7 @@ def test_filter_egress__ip_permission__cidr():
raise SkipTest(
"CIDR's might already exist due to other tests creating IP ranges"
)
- ec2r = boto3.resource("ec2", region_name="us-west-1")
+ ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@@ -1450,7 +1502,7 @@ def test_filter_egress__ip_permission__cidr():
@mock_aws
def test_filter_egress__ip_permission__from_port():
- ec2r = boto3.resource("ec2", region_name="us-west-1")
+ ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@@ -1494,7 +1546,7 @@ def test_filter_egress__ip_permission__from_port():
@mock_aws
def test_filter_egress__ip_permission__group_id():
- ec2r = boto3.resource("ec2", region_name="us-west-1")
+ ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@@ -1547,7 +1599,7 @@ def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_name(
"""
this fails to find the group in the AWS API, so we should also fail to find it
"""
- ec2r = boto3.resource("ec2", region_name="us-west-1")
+ ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@@ -1599,7 +1651,7 @@ def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_name(
@mock_aws
def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_id():
- ec2r = boto3.resource("ec2", region_name="us-west-1")
+ ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@@ -1652,7 +1704,7 @@ def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_id():
@mock_aws
def test_filter_egress__ip_permission__protocol():
- ec2r = boto3.resource("ec2", region_name="us-west-1")
+ ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@@ -1695,7 +1747,7 @@ def test_filter_egress__ip_permission__protocol():
@mock_aws
def test_filter_egress__ip_permission__to_port():
- ec2r = boto3.resource("ec2", region_name="us-west-1")
+ ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
sg1 = vpc.create_security_group(
@@ -1739,7 +1791,7 @@ def test_filter_egress__ip_permission__to_port():
@mock_aws
def test_get_groups_by_ippermissions_group_id_filter():
- ec2r = boto3.resource("ec2", region_name="us-west-1")
+ ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.0.0/16")
sg1 = vpc.create_security_group(Description="test", GroupName="test-1")
sg2 = vpc.create_security_group(Description="test", GroupName="test-2")
@@ -1772,7 +1824,7 @@ def test_get_groups_by_ippermissions_group_id_filter():
def test_get_groups_by_ippermissions_group_id_filter_across_vpcs():
# setup 2 VPCs, each with a single Security Group
# where one security group authorizes the other sg (in another vpc) via GroupId
- ec2r = boto3.resource("ec2", region_name="us-west-1")
+ ec2r = boto3.resource("ec2", region_name=REGION)
vpc1 = ec2r.create_vpc(CidrBlock="10.250.0.0/16")
vpc2 = ec2r.create_vpc(CidrBlock="10.251.0.0/16")
@@ -1809,7 +1861,7 @@ def test_filter_group_name():
"""
this filter is an exact match, not a glob
"""
- ec2r = boto3.resource("ec2", region_name="us-west-1")
+ ec2r = boto3.resource("ec2", region_name=REGION)
vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16")
uniq_sg_name_prefix = str(uuid4())[0:6]
@@ -1843,7 +1895,7 @@ def test_filter_group_name():
@mock_aws
def test_revoke_security_group_ingress():
- ec2 = boto3.client("ec2", region_name="us-east-1")
+ ec2 = boto3.client("ec2", region_name=REGION)
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")