From 8178cbde1a3310aa5cdda696a970009f42fbc050 Mon Sep 17 00:00:00 2001 From: rafcio19 Date: Tue, 5 Mar 2024 15:08:13 +0100 Subject: [PATCH] EC2: SG rule tagging (#7424) --- moto/ec2/models/core.py | 7 + moto/ec2/models/security_groups.py | 87 ++++++++++-- moto/ec2/responses/security_groups.py | 100 +++++++++----- tests/test_ec2/test_security_groups.py | 184 ++++++++++++++++--------- 4 files changed, 264 insertions(+), 114 deletions(-) diff --git a/moto/ec2/models/core.py b/moto/ec2/models/core.py index 0ecc94dcf..581e44dd3 100644 --- a/moto/ec2/models/core.py +++ b/moto/ec2/models/core.py @@ -41,3 +41,10 @@ class TaggedEC2Resource(BaseModel): return value raise FilterNotImplementedError(filter_name, method_name) + + def match_tags(self, filters: Dict[str, str]) -> bool: + for tag_name in filters.keys(): + tag_value = self.get_filter_value(tag_name) + if tag_value == filters[tag_name][0]: + return True + return False diff --git a/moto/ec2/models/security_groups.py b/moto/ec2/models/security_groups.py index b432cca39..0a776d76f 100644 --- a/moto/ec2/models/security_groups.py +++ b/moto/ec2/models/security_groups.py @@ -33,12 +33,15 @@ class SecurityRule(TaggedEC2Resource): self, ec2_backend: Any, ip_protocol: str, + group_id: str, from_port: Optional[str], to_port: Optional[str], ip_ranges: Optional[List[Any]], source_groups: List[Dict[str, Any]], prefix_list_ids: Optional[List[Dict[str, str]]] = None, is_egress: bool = True, + tags: Dict[str, str] = {}, + description: str = "", ): self.ec2_backend = ec2_backend self.id = random_security_group_rule_id() @@ -48,6 +51,8 @@ class SecurityRule(TaggedEC2Resource): self.prefix_list_ids = prefix_list_ids or [] self.from_port = self.to_port = None self.is_egress = is_egress + self.description = description + self.group_id = group_id if self.ip_protocol and self.ip_protocol != "-1": self.from_port = int(from_port) # type: ignore[arg-type] @@ -73,6 +78,7 @@ class SecurityRule(TaggedEC2Resource): else None ) self.ip_protocol = proto if proto else self.ip_protocol + self.add_tags(tags) @property def owner_id(self) -> str: @@ -156,6 +162,7 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): SecurityRule( self.ec2_backend, "-1", + self.id, None, None, [{"CidrIp": "0.0.0.0/0"}], @@ -167,6 +174,7 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel): SecurityRule( self.ec2_backend, "-1", + self.id, None, None, [{"CidrIpv6": "::/0"}], @@ -571,22 +579,60 @@ class SecurityGroupBackend: return matches def describe_security_group_rules( - self, group_ids: Optional[List[str]] = None, filters: Any = None - ) -> Dict[str, List[SecurityRule]]: - matches = self.describe_security_groups(group_ids=group_ids, filters=filters) - if not matches: - raise InvalidSecurityGroupNotFoundError( - "No security groups found matching the filters provided." - ) - rules = {} - for group in matches: - group_rules = [] - group_rules.extend(group.ingress_rules) - group_rules.extend(group.egress_rules) - if group_rules: - rules[group.group_id] = group_rules + self, + group_ids: Optional[List[str]] = None, + sg_rule_ids: List[str] = [], + filters: Any = None, + ) -> List[SecurityRule]: + results = [] - return rules + if sg_rule_ids: + # go thru all the rules in the backend to find a match + for sg_rule_id in sg_rule_ids: + for sg in self.sg_old_ingress_ruls: + for rule in self.sg_old_ingress_ruls[sg]: + if rule.id == sg_rule_id: + results.append(rule) + + return results + + if group_ids: + all_sgs = self.describe_security_groups(group_ids=group_ids) + for group in all_sgs: + results.extend(group.ingress_rules) + results.extend(group.egress_rules) + + return results + + if filters and "group-id" in filters: + matches = self.describe_security_groups( + group_ids=group_ids, filters=filters + ) + if not matches: + raise InvalidSecurityGroupNotFoundError( + "No security groups found matching the filters provided." + ) + for group in matches: + results.extend(group.ingress_rules) + results.extend(group.egress_rules) + + return results + + all_sgs = self.describe_security_groups() + + for group in all_sgs: + results.extend(self._match_sg_rules(group.ingress_rules, filters)) + results.extend(self._match_sg_rules(group.egress_rules, filters)) + + return results + + @staticmethod + def _match_sg_rules(rules_list: List[SecurityRule], filters: Any) -> List[SecurityRule]: # type: ignore[misc] + results = [] + for rule in rules_list: + if rule.match_tags(filters): + results.append(rule) + return results def _delete_security_group(self, vpc_id: Optional[str], group_id: str) -> None: vpc_id = vpc_id or self.default_vpc.id # type: ignore[attr-defined] @@ -657,6 +703,7 @@ class SecurityGroupBackend: from_port: str, to_port: str, ip_ranges: List[Any], + sgrule_tags: Dict[str, str] = {}, source_groups: Optional[List[Dict[str, str]]] = None, prefix_list_ids: Optional[List[Dict[str, str]]] = None, security_rule_ids: Optional[List[str]] = None, # pylint:disable=unused-argument @@ -695,12 +742,14 @@ class SecurityGroupBackend: security_rule = SecurityRule( self, ip_protocol, + group.group_id, from_port, to_port, ip_ranges, _source_groups, prefix_list_ids, is_egress=False, + tags=sgrule_tags, ) if security_rule in group.ingress_rules: @@ -738,6 +787,7 @@ class SecurityGroupBackend: break else: group.add_ingress_rule(security_rule) + return security_rule, group def revoke_security_group_ingress( @@ -765,6 +815,7 @@ class SecurityGroupBackend: security_rule = SecurityRule( self, ip_protocol, + group.group_id, from_port, to_port, ip_ranges, @@ -819,6 +870,7 @@ class SecurityGroupBackend: from_port: str, to_port: str, ip_ranges: List[Any], + sgrule_tags: Dict[str, str] = {}, source_groups: Optional[List[Dict[str, Any]]] = None, prefix_list_ids: Optional[List[Dict[str, str]]] = None, security_rule_ids: Optional[List[str]] = None, # pylint:disable=unused-argument @@ -860,11 +912,13 @@ class SecurityGroupBackend: security_rule = SecurityRule( self, ip_protocol, + group.group_id, from_port, to_port, ip_ranges, _source_groups, prefix_list_ids, + tags=sgrule_tags, ) if security_rule in group.egress_rules: @@ -944,6 +998,7 @@ class SecurityGroupBackend: security_rule = SecurityRule( self, ip_protocol, + group.group_id, from_port, to_port, ip_ranges, @@ -1030,6 +1085,7 @@ class SecurityGroupBackend: security_rule = SecurityRule( self, ip_protocol, + group.group_id, from_port, to_port, ip_ranges, @@ -1085,6 +1141,7 @@ class SecurityGroupBackend: security_rule = SecurityRule( self, ip_protocol, + group.group_id, from_port, to_port, ip_ranges, diff --git a/moto/ec2/responses/security_groups.py b/moto/ec2/responses/security_groups.py index b65949291..49553d05e 100644 --- a/moto/ec2/responses/security_groups.py +++ b/moto/ec2/responses/security_groups.py @@ -86,6 +86,8 @@ class SecurityGroups(EC2BaseResponse): d = d[subkey] d[key_splitted[-1]] = value + sg_rule_tags = self._parse_tag_specification().get("security-group-rule", {}) + if "IpPermissions" not in querytree: # Handle single rule syntax ( @@ -97,16 +99,17 @@ class SecurityGroups(EC2BaseResponse): prefix_list_ids, ) = parse_sg_attributes_from_dict(querytree) - yield ( - group_name_or_id, - ip_protocol, - from_port, - to_port, - ip_ranges, - source_groups, - prefix_list_ids, - security_rule_ids, - ) + yield { + "group_name_or_id": group_name_or_id, + "ip_protocol": ip_protocol, + "from_port": from_port, + "to_port": to_port, + "ip_ranges": ip_ranges, + "sgrule_tags": sg_rule_tags, + "source_groups": source_groups, + "prefix_list_ids": prefix_list_ids, + "security_rule_ids": security_rule_ids, + } ip_permissions = querytree.get("IpPermissions") or {} for ip_permission_idx in sorted(ip_permissions.keys()): @@ -121,22 +124,23 @@ class SecurityGroups(EC2BaseResponse): prefix_list_ids, ) = parse_sg_attributes_from_dict(ip_permission) - yield ( - group_name_or_id, - ip_protocol, - from_port, - to_port, - ip_ranges, - source_groups, - prefix_list_ids, - security_rule_ids, - ) + yield { + "group_name_or_id": group_name_or_id, + "ip_protocol": ip_protocol, + "from_port": from_port, + "to_port": to_port, + "ip_ranges": ip_ranges, + "sgrule_tags": sg_rule_tags, + "source_groups": source_groups, + "prefix_list_ids": prefix_list_ids, + "security_rule_ids": security_rule_ids, + } def authorize_security_group_egress(self) -> str: self.error_on_dryrun() - for args in self._process_rules_from_querystring(): - rule, group = self.ec2_backend.authorize_security_group_egress(*args) + for kwargs in self._process_rules_from_querystring(): + rule, group = self.ec2_backend.authorize_security_group_egress(**kwargs) self.ec2_backend.sg_old_egress_ruls[group.id] = group.egress_rules.copy() template = self.response_template(AUTHORIZE_SECURITY_GROUP_EGRESS_RESPONSE) return template.render(rule=rule, group=group) @@ -144,8 +148,8 @@ class SecurityGroups(EC2BaseResponse): def authorize_security_group_ingress(self) -> str: self.error_on_dryrun() - for args in self._process_rules_from_querystring(): - rule, group = self.ec2_backend.authorize_security_group_ingress(*args) + for kwargs in self._process_rules_from_querystring(): + rule, group = self.ec2_backend.authorize_security_group_ingress(**kwargs) self.ec2_backend.sg_old_ingress_ruls[group.id] = group.ingress_rules.copy() template = self.response_template(AUTHORIZE_SECURITY_GROUP_INGRESS_RESPONSE) return template.render(rule=rule, group=group) @@ -198,11 +202,19 @@ class SecurityGroups(EC2BaseResponse): def describe_security_group_rules(self) -> str: group_id = self._get_param("GroupId") + sg_rule_ids = self._get_param("SecurityGroupRuleId.1") filters = self._filters_from_querystring() self.error_on_dryrun() - rules = self.ec2_backend.describe_security_group_rules(group_id, filters) + # if sg rule ids are not None then wrap in a list + # as expected by ec2_backend.describe_security_group_rules + if sg_rule_ids: + sg_rule_ids = [sg_rule_ids] + + rules = self.ec2_backend.describe_security_group_rules( + group_id, sg_rule_ids, filters + ) template = self.response_template(DESCRIBE_SECURITY_GROUP_RULES_RESPONSE) return template.render(rules=rules) @@ -210,28 +222,36 @@ class SecurityGroups(EC2BaseResponse): self.error_on_dryrun() for args in self._process_rules_from_querystring(): - self.ec2_backend.revoke_security_group_egress(*args) + # we don't need this parameter to revoke + del args["sgrule_tags"] + self.ec2_backend.revoke_security_group_egress(**args) return REVOKE_SECURITY_GROUP_EGRESS_RESPONSE def revoke_security_group_ingress(self) -> str: self.error_on_dryrun() for args in self._process_rules_from_querystring(): - self.ec2_backend.revoke_security_group_ingress(*args) + # we don't need this parameter to revoke + del args["sgrule_tags"] + self.ec2_backend.revoke_security_group_ingress(**args) return REVOKE_SECURITY_GROUP_INGRESS_RESPONSE def update_security_group_rule_descriptions_ingress(self) -> str: for args in self._process_rules_from_querystring(): + # we don't need this parameter to revoke + del args["sgrule_tags"] group = self.ec2_backend.update_security_group_rule_descriptions_ingress( - *args + **args ) self.ec2_backend.sg_old_ingress_ruls[group.id] = group.ingress_rules.copy() return UPDATE_SECURITY_GROUP_RULE_DESCRIPTIONS_INGRESS def update_security_group_rule_descriptions_egress(self) -> str: for args in self._process_rules_from_querystring(): + # we don't need this parameter to revoke + del args["sgrule_tags"] group = self.ec2_backend.update_security_group_rule_descriptions_egress( - *args + **args ) self.ec2_backend.sg_old_egress_ruls[group.id] = group.egress_rules.copy() return UPDATE_SECURITY_GROUP_RULE_DESCRIPTIONS_EGRESS @@ -255,8 +275,7 @@ DESCRIBE_SECURITY_GROUP_RULES_RESPONSE = """ {{ request_id }} - {% for group, rule_list in rules.items() %} - {% for rule in rule_list %} + {% for rule in rules %} {% if rule.from_port is not none %} {{ rule.from_port }} @@ -268,7 +287,7 @@ DESCRIBE_SECURITY_GROUP_RULES_RESPONSE = """ {{ rule.ip_ranges[0]['CidrIp'] }} {% endif %} {{ rule.ip_protocol }} - {{ group }} + {{ rule.group_id }} {{ rule.owner_id }} {{ 'true' if rule.is_egress else 'false' }} {{ rule.id }} @@ -282,7 +301,6 @@ DESCRIBE_SECURITY_GROUP_RULES_RESPONSE = """ {% endfor %} - {% endfor %} """ @@ -470,6 +488,14 @@ AUTHORIZE_SECURITY_GROUP_INGRESS_RESPONSE = """{{ rule.to_port }} {% endif %} + + {% for tag in rule.get_tags() %} + + {{ tag.key }} + {{ tag.value }} + + {% endfor %} + {% endfor %} {% for item in rule.prefix_list_ids %} @@ -550,6 +576,14 @@ AUTHORIZE_SECURITY_GROUP_EGRESS_RESPONSE = """{{ rule.to_port }} {% endif %} + + {% for tag in rule.get_tags() %} + + {{ tag.key }} + {{ tag.value }} + + {% endfor %} + {% endfor %} {% for item in rule.prefix_list_ids %} diff --git a/tests/test_ec2/test_security_groups.py b/tests/test_ec2/test_security_groups.py index d826c1578..3486c395c 100644 --- a/tests/test_ec2/test_security_groups.py +++ b/tests/test_ec2/test_security_groups.py @@ -13,11 +13,13 @@ from moto import mock_aws, settings from moto.core import DEFAULT_ACCOUNT_ID from moto.ec2 import ec2_backends +REGION = "us-east-1" + @mock_aws def test_create_and_describe_security_group(): - ec2 = boto3.resource("ec2", "us-west-1") - client = boto3.client("ec2", "us-west-1") + ec2 = boto3.resource("ec2", REGION) + client = boto3.client("ec2", REGION) with pytest.raises(ClientError) as ex: client.create_security_group(GroupName="test", Description="test", DryRun=True) @@ -51,7 +53,7 @@ def test_create_and_describe_security_group(): @mock_aws def test_create_security_group_without_description_raises_error(): - ec2 = boto3.resource("ec2", "us-west-1") + ec2 = boto3.resource("ec2", REGION) with pytest.raises(ClientError) as ex: ec2.create_security_group(GroupName="test security group", Description="") @@ -62,15 +64,15 @@ def test_create_security_group_without_description_raises_error(): @mock_aws def test_default_security_group(): - client = boto3.client("ec2", "us-west-1") + client = boto3.client("ec2", REGION) groups = retrieve_all_sgs(client) assert "default" in [g["GroupName"] for g in groups] @mock_aws def test_create_and_describe_vpc_security_group(): - ec2 = boto3.resource("ec2", "us-west-1") - client = boto3.client("ec2", "us-west-1") + ec2 = boto3.resource("ec2", REGION) + client = boto3.client("ec2", REGION) name = str(uuid4()) vpc_id = f"vpc-{str(uuid4())[0:6]}" @@ -111,8 +113,8 @@ def test_create_and_describe_vpc_security_group(): @mock_aws def test_create_two_security_groups_with_same_name_in_different_vpc(): - ec2 = boto3.resource("ec2", "us-east-1") - client = boto3.client("ec2", "us-east-1") + ec2 = boto3.resource("ec2", REGION) + client = boto3.client("ec2", REGION) name = str(uuid4()) vpc_id = "vpc-5300000c" @@ -132,7 +134,7 @@ def test_create_two_security_groups_with_same_name_in_different_vpc(): @mock_aws def test_create_two_security_groups_in_vpc_with_ipv6_enabled(): - ec2 = boto3.resource("ec2", region_name="us-west-1") + ec2 = boto3.resource("ec2", region_name=REGION) vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16", AmazonProvidedIpv6CidrBlock=True) security_group = ec2.create_security_group( @@ -145,8 +147,8 @@ def test_create_two_security_groups_in_vpc_with_ipv6_enabled(): @mock_aws def test_deleting_security_groups(): - ec2 = boto3.resource("ec2", "us-west-1") - client = boto3.client("ec2", "us-west-1") + ec2 = boto3.resource("ec2", REGION) + client = boto3.client("ec2", REGION) sg_name1 = str(uuid4()) sg_name2 = str(uuid4()) group1 = ec2.create_security_group(GroupName=sg_name1, Description="test desc 1") @@ -188,8 +190,8 @@ def test_deleting_security_groups(): @mock_aws def test_delete_security_group_in_vpc(): - ec2 = boto3.resource("ec2", "us-west-1") - client = boto3.client("ec2", "us-west-1") + ec2 = boto3.resource("ec2", REGION) + client = boto3.client("ec2", REGION) group = ec2.create_security_group( GroupName=str(uuid4()), Description="test1", VpcId="vpc-12345" @@ -207,8 +209,8 @@ def test_delete_security_group_in_vpc(): @mock_aws def test_authorize_ip_range_and_revoke(): - ec2 = boto3.resource("ec2", "us-west-1") - client = boto3.client("ec2", "us-west-1") + ec2 = boto3.resource("ec2", REGION) + client = boto3.client("ec2", REGION) security_group = ec2.create_security_group( GroupName=str(uuid4()), Description="test" ) @@ -332,8 +334,8 @@ def test_authorize_ip_range_and_revoke(): @mock_aws def test_authorize_other_group_and_revoke(): - ec2 = boto3.resource("ec2", "us-west-1") - client = boto3.client("ec2", "us-west-1") + ec2 = boto3.resource("ec2", REGION) + client = boto3.client("ec2", REGION) sg_name = str(uuid4()) security_group = ec2.create_security_group( GroupName=sg_name, Description="test desc" @@ -389,7 +391,7 @@ def test_authorize_other_group_and_revoke(): @mock_aws def test_authorize_other_group_egress_and_revoke(): - ec2 = boto3.resource("ec2", region_name="us-west-1") + ec2 = boto3.resource("ec2", region_name=REGION) vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") @@ -476,8 +478,8 @@ def test_authorize_group_in_vpc(): @mock_aws def test_describe_security_groups(): - ec2 = boto3.resource("ec2", "us-west-1") - client = boto3.client("ec2", "us-west-1") + ec2 = boto3.resource("ec2", REGION) + client = boto3.client("ec2", REGION) vpc_id = f"vpc-{str(uuid4())[0:6]}" name_1 = str(uuid4()) desc_1 = str(uuid4()) @@ -514,7 +516,7 @@ def test_describe_security_groups(): @mock_aws def test_authorize_bad_cidr_throws_invalid_parameter_value(): - ec2 = boto3.resource("ec2", "us-west-1") + ec2 = boto3.resource("ec2", REGION) sec_group = ec2.create_security_group(GroupName=str(uuid4()), Description="test") with pytest.raises(ClientError) as ex: permissions = [ @@ -533,8 +535,8 @@ def test_authorize_bad_cidr_throws_invalid_parameter_value(): @mock_aws def test_security_group_tag_filtering(): - ec2 = boto3.resource("ec2", region_name="us-east-1") - client = boto3.client("ec2", region_name="us-east-1") + ec2 = boto3.resource("ec2", region_name=REGION) + client = boto3.client("ec2", region_name=REGION) sg = ec2.create_security_group(GroupName=str(uuid4()), Description="Test SG") tag_name = str(uuid4())[0:6] tag_val = str(uuid4()) @@ -553,8 +555,8 @@ def test_security_group_tag_filtering(): @mock_aws def test_authorize_all_protocols_with_no_port_specification(): - ec2 = boto3.resource("ec2", region_name="us-east-1") - client = boto3.client("ec2", region_name="us-east-1") + ec2 = boto3.resource("ec2", region_name=REGION) + client = boto3.client("ec2", region_name=REGION) sg_name = str(uuid4()) sg = ec2.create_security_group(GroupName=sg_name, Description="test desc") @@ -570,9 +572,9 @@ def test_authorize_all_protocols_with_no_port_specification(): @mock_aws -def test_security_group_rule_tagging(): - ec2 = boto3.resource("ec2", "us-east-1") - client = boto3.client("ec2", "us-east-1") +def test_security_group_rule_filtering_group_id(): + ec2 = boto3.resource("ec2", REGION) + client = boto3.client("ec2", REGION) vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") sg_name = str(uuid4()) @@ -598,10 +600,55 @@ def test_security_group_rule_tagging(): assert response["SecurityGroupRules"][0]["Tags"][0]["Value"] == tag_val +@mock_aws +def test_security_group_rule_filtering_tags(): + # Setup + ec2 = boto3.resource("ec2", REGION) + client = boto3.client("ec2", REGION) + vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") + tags = [ + {"Key": "Automation", "Value": "Lambda"}, + {"Key": "Partner", "Value": "test"}, + ] + + sg_name = str(uuid4()) + sg = client.create_security_group( + Description="Test SG", + GroupName=sg_name, + VpcId=vpc.id, + ) + + # Execute + response1 = client.authorize_security_group_ingress( + GroupId=sg["GroupId"], + IpPermissions=[ + { + "IpProtocol": "tcp", + "FromPort": 80, + "ToPort": 80, + "IpRanges": [ + {"CidrIp": "1.2.3.4/32", "Description": "Test description"} + ], + } + ], + TagSpecifications=[{"ResourceType": "security-group-rule", "Tags": tags}], + ) + + response2 = client.describe_security_group_rules( + Filters=[{"Name": "tag:Partner", "Values": ["test"]}] + ) + + # Verify + assert response1["SecurityGroupRules"][0]["Tags"] == tags + assert "Tags" in response2["SecurityGroupRules"][0] + assert response2["SecurityGroupRules"][0]["Tags"][1]["Key"] == "Partner" + assert response2["SecurityGroupRules"][0]["Tags"][1]["Value"] == "test" + + @mock_aws def test_create_and_describe_security_grp_rule(): - ec2 = boto3.resource("ec2", "us-east-1") - client = boto3.client("ec2", "us-east-1") + ec2 = boto3.resource("ec2", REGION) + client = boto3.client("ec2", REGION) vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") sg_name = str(uuid4()) @@ -627,8 +674,8 @@ def test_create_and_describe_security_grp_rule(): @mock_aws @pytest.mark.parametrize("use_vpc", [True, False], ids=["Use VPC", "Without VPC"]) def test_sec_group_rule_limit(use_vpc): - ec2 = boto3.resource("ec2", region_name="us-west-1") - client = boto3.client("ec2", region_name="us-west-1") + ec2 = boto3.resource("ec2", region_name=REGION) + client = boto3.client("ec2", region_name=REGION) limit = 60 if use_vpc: @@ -728,7 +775,7 @@ def test_sec_group_rule_limit(use_vpc): @mock_aws def test_add_same_rule_twice_throws_error(): - ec2 = boto3.resource("ec2", region_name="us-west-1") + ec2 = boto3.resource("ec2", region_name=REGION) vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") sg = ec2.create_security_group( @@ -768,8 +815,8 @@ def test_add_same_rule_twice_throws_error(): @mock_aws def test_description_in_ip_permissions(): - ec2 = boto3.resource("ec2", region_name="us-west-1") - conn = boto3.client("ec2", region_name="us-east-1") + ec2 = boto3.resource("ec2", region_name=REGION) + conn = boto3.client("ec2", region_name=REGION) vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") sg = conn.create_security_group( GroupName="sg1", Description="Test security group sg1", VpcId=vpc.id @@ -818,7 +865,7 @@ def test_description_in_ip_permissions(): @mock_aws def test_security_group_tagging(): - conn = boto3.client("ec2", region_name="us-east-1") + conn = boto3.client("ec2", region_name=REGION) sg = conn.create_security_group(GroupName=str(uuid4()), Description="Test SG") @@ -849,7 +896,7 @@ def test_security_group_tagging(): @mock_aws def test_security_group_wildcard_tag_filter(): - conn = boto3.client("ec2", region_name="us-east-1") + conn = boto3.client("ec2", region_name=REGION) sg = conn.create_security_group(GroupName=str(uuid4()), Description="Test SG") rand_name = str(uuid4())[0:6] @@ -868,10 +915,10 @@ def test_security_group_wildcard_tag_filter(): @mock_aws def test_security_group_filter_ip_permission(): - ec2 = boto3.resource("ec2", region_name="us-east-1") + ec2 = boto3.resource("ec2", region_name=REGION) vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") - conn = boto3.client("ec2", region_name="us-east-1") + conn = boto3.client("ec2", region_name=REGION) sg_name = str(uuid4())[0:6] sg = ec2.create_security_group( GroupName=sg_name, Description="Test SG", VpcId=vpc.id @@ -910,7 +957,7 @@ def retrieve_all_sgs(conn, filters=[]): # pylint: disable=W0102 @mock_aws def test_authorize_and_revoke_in_bulk(): - ec2 = boto3.resource("ec2", region_name="us-west-1") + ec2 = boto3.resource("ec2", region_name=REGION) vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") @@ -1090,8 +1137,8 @@ def test_security_group_ingress_without_multirule_after_reload(): @mock_aws def test_get_all_security_groups_filter_with_same_vpc_id(): - ec2 = boto3.resource("ec2", region_name="us-east-1") - client = boto3.client("ec2", region_name="us-east-1") + ec2 = boto3.resource("ec2", region_name=REGION) + client = boto3.client("ec2", region_name=REGION) vpc_id = "vpc-5300000c" security_group = ec2.create_security_group( GroupName=str(uuid4()), Description="test1", VpcId=vpc_id @@ -1117,7 +1164,7 @@ def test_get_all_security_groups_filter_with_same_vpc_id(): @mock_aws def test_revoke_security_group_egress(): - ec2 = boto3.resource("ec2", "us-east-1") + ec2 = boto3.resource("ec2", REGION) vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") sg = ec2.create_security_group( Description="Test SG", GroupName=str(uuid4()), VpcId=vpc.id @@ -1177,8 +1224,8 @@ def test_revoke_security_group_egress__without_ipprotocol(): @mock_aws def test_update_security_group_rule_descriptions_egress(): - ec2 = boto3.resource("ec2", "us-east-1") - client = boto3.client("ec2", "us-east-1") + ec2 = boto3.resource("ec2", REGION) + client = boto3.client("ec2", REGION) vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") sg_name = str(uuid4()) sg = ec2.create_security_group( @@ -1214,8 +1261,8 @@ def test_update_security_group_rule_descriptions_egress(): @mock_aws def test_update_security_group_rule_descriptions_ingress(): - ec2 = boto3.resource("ec2", "us-east-1") - client = boto3.client("ec2", "us-east-1") + ec2 = boto3.resource("ec2", REGION) + client = boto3.client("ec2", REGION) vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") sg_name = str(uuid4()) sg = ec2.create_security_group( @@ -1231,8 +1278,13 @@ def test_update_security_group_rule_descriptions_ingress(): "IpRanges": [{"CidrIp": "1.2.3.4/32", "Description": "first desc"}], } ] - client.authorize_security_group_ingress(GroupId=sg_id, IpPermissions=ip_permissions) - + client.authorize_security_group_ingress( + GroupId=sg_id, + IpPermissions=ip_permissions, + ) + client.describe_security_group_rules( + Filters=[{"Name": "tag:Partner", "Values": ["test"]}] + ) ip_ranges = client.describe_security_groups(GroupIds=[sg_id])["SecurityGroups"][0][ "IpPermissions" ][0]["IpRanges"] @@ -1260,7 +1312,7 @@ def test_update_security_group_rule_descriptions_ingress(): @mock_aws def test_non_existent_security_group_raises_error_on_authorize(): - client = boto3.client("ec2", "us-east-1") + client = boto3.client("ec2", REGION) non_existent_sg = "sg-123abc" expected_error = f"The security group '{non_existent_sg}' does not exist" authorize_funcs = [ @@ -1278,9 +1330,9 @@ def test_non_existent_security_group_raises_error_on_authorize(): def test_security_group_rules_added_via_the_backend_can_be_revoked_via_the_api(): if settings.TEST_SERVER_MODE: raise unittest.SkipTest("Can't test backend directly in server mode.") - ec2_backend = ec2_backends[DEFAULT_ACCOUNT_ID]["us-east-1"] - ec2_resource = boto3.resource("ec2", region_name="us-east-1") - ec2_client = boto3.client("ec2", region_name="us-east-1") + ec2_backend = ec2_backends[DEFAULT_ACCOUNT_ID][REGION] + ec2_resource = boto3.resource("ec2", region_name=REGION) + ec2_client = boto3.client("ec2", region_name=REGION) vpc = ec2_resource.create_vpc(CidrBlock="10.0.0.0/16") group_name = str(uuid4()) sg = ec2_resource.create_security_group( @@ -1329,7 +1381,7 @@ def test_security_group_rules_added_via_the_backend_can_be_revoked_via_the_api() @mock_aws def test_filter_description(): - ec2r = boto3.resource("ec2", region_name="us-west-1") + ec2r = boto3.resource("ec2", region_name=REGION) vpc = ec2r.create_vpc(CidrBlock="10.250.0.0/16") unique = str(uuid4()) @@ -1360,7 +1412,7 @@ def test_filter_ip_permission__cidr(): raise SkipTest( "CIDR's might already exist due to other tests creating IP ranges" ) - ec2r = boto3.resource("ec2", region_name="us-west-1") + ec2r = boto3.resource("ec2", region_name=REGION) vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16") sg1 = vpc.create_security_group( @@ -1407,7 +1459,7 @@ def test_filter_egress__ip_permission__cidr(): raise SkipTest( "CIDR's might already exist due to other tests creating IP ranges" ) - ec2r = boto3.resource("ec2", region_name="us-west-1") + ec2r = boto3.resource("ec2", region_name=REGION) vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16") sg1 = vpc.create_security_group( @@ -1450,7 +1502,7 @@ def test_filter_egress__ip_permission__cidr(): @mock_aws def test_filter_egress__ip_permission__from_port(): - ec2r = boto3.resource("ec2", region_name="us-west-1") + ec2r = boto3.resource("ec2", region_name=REGION) vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16") sg1 = vpc.create_security_group( @@ -1494,7 +1546,7 @@ def test_filter_egress__ip_permission__from_port(): @mock_aws def test_filter_egress__ip_permission__group_id(): - ec2r = boto3.resource("ec2", region_name="us-west-1") + ec2r = boto3.resource("ec2", region_name=REGION) vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16") sg1 = vpc.create_security_group( @@ -1547,7 +1599,7 @@ def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_name( """ this fails to find the group in the AWS API, so we should also fail to find it """ - ec2r = boto3.resource("ec2", region_name="us-west-1") + ec2r = boto3.resource("ec2", region_name=REGION) vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16") sg1 = vpc.create_security_group( @@ -1599,7 +1651,7 @@ def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_name( @mock_aws def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_id(): - ec2r = boto3.resource("ec2", region_name="us-west-1") + ec2r = boto3.resource("ec2", region_name=REGION) vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16") sg1 = vpc.create_security_group( @@ -1652,7 +1704,7 @@ def test_filter_egress__ip_permission__group_name_create_with_id_filter_by_id(): @mock_aws def test_filter_egress__ip_permission__protocol(): - ec2r = boto3.resource("ec2", region_name="us-west-1") + ec2r = boto3.resource("ec2", region_name=REGION) vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16") sg1 = vpc.create_security_group( @@ -1695,7 +1747,7 @@ def test_filter_egress__ip_permission__protocol(): @mock_aws def test_filter_egress__ip_permission__to_port(): - ec2r = boto3.resource("ec2", region_name="us-west-1") + ec2r = boto3.resource("ec2", region_name=REGION) vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16") sg1 = vpc.create_security_group( @@ -1739,7 +1791,7 @@ def test_filter_egress__ip_permission__to_port(): @mock_aws def test_get_groups_by_ippermissions_group_id_filter(): - ec2r = boto3.resource("ec2", region_name="us-west-1") + ec2r = boto3.resource("ec2", region_name=REGION) vpc = ec2r.create_vpc(CidrBlock="10.250.0.0/16") sg1 = vpc.create_security_group(Description="test", GroupName="test-1") sg2 = vpc.create_security_group(Description="test", GroupName="test-2") @@ -1772,7 +1824,7 @@ def test_get_groups_by_ippermissions_group_id_filter(): def test_get_groups_by_ippermissions_group_id_filter_across_vpcs(): # setup 2 VPCs, each with a single Security Group # where one security group authorizes the other sg (in another vpc) via GroupId - ec2r = boto3.resource("ec2", region_name="us-west-1") + ec2r = boto3.resource("ec2", region_name=REGION) vpc1 = ec2r.create_vpc(CidrBlock="10.250.0.0/16") vpc2 = ec2r.create_vpc(CidrBlock="10.251.0.0/16") @@ -1809,7 +1861,7 @@ def test_filter_group_name(): """ this filter is an exact match, not a glob """ - ec2r = boto3.resource("ec2", region_name="us-west-1") + ec2r = boto3.resource("ec2", region_name=REGION) vpc = ec2r.create_vpc(CidrBlock="10.250.1.0/16") uniq_sg_name_prefix = str(uuid4())[0:6] @@ -1843,7 +1895,7 @@ def test_filter_group_name(): @mock_aws def test_revoke_security_group_ingress(): - ec2 = boto3.client("ec2", region_name="us-east-1") + ec2 = boto3.client("ec2", region_name=REGION) vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")