From 0a1924358176b209e42b80e498d676596339a077 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Mon, 17 Apr 2023 23:01:06 +0000 Subject: [PATCH] Techdebt: MyPy ResourceGroups and ResourceGroupsTaggingAPI (#6224) --- moto/resourcegroups/exceptions.py | 2 +- moto/resourcegroups/models.py | 104 ++++++----- moto/resourcegroups/responses.py | 34 ++-- moto/resourcegroupstaggingapi/models.py | 194 +++++++++------------ moto/resourcegroupstaggingapi/responses.py | 17 +- setup.cfg | 2 +- 6 files changed, 165 insertions(+), 188 deletions(-) diff --git a/moto/resourcegroups/exceptions.py b/moto/resourcegroups/exceptions.py index 17e17c4a2..9a74f4651 100644 --- a/moto/resourcegroups/exceptions.py +++ b/moto/resourcegroups/exceptions.py @@ -4,5 +4,5 @@ from moto.core.exceptions import JsonRESTError class BadRequestException(JsonRESTError): code = 400 - def __init__(self, message, **kwargs): + def __init__(self, message: str): super().__init__(error_type="BadRequestException", message=message) diff --git a/moto/resourcegroups/models.py b/moto/resourcegroups/models.py index 76a06ce6f..0dd363fad 100644 --- a/moto/resourcegroups/models.py +++ b/moto/resourcegroups/models.py @@ -1,4 +1,4 @@ -from builtins import str +from typing import Any, Dict, List, Optional import json import re @@ -10,14 +10,14 @@ from .exceptions import BadRequestException class FakeResourceGroup(BaseModel): def __init__( self, - account_id, - name, - resource_query, - description=None, - tags=None, - configuration=None, + account_id: str, + name: str, + resource_query: Dict[str, str], + description: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + configuration: Optional[List[Dict[str, Any]]] = None, ): - self.errors = [] + self.errors: List[str] = [] description = description or "" tags = tags or {} if self._validate_description(value=description): @@ -33,10 +33,10 @@ class FakeResourceGroup(BaseModel): self.configuration = configuration @staticmethod - def _format_error(key, value, constraint): + def _format_error(key: str, value: Any, constraint: str) -> str: # type: ignore[misc] return f"Value '{value}' at '{key}' failed to satisfy constraint: {constraint}" - def _raise_errors(self): + def _raise_errors(self) -> None: if self.errors: errors_len = len(self.errors) plural = "s" if len(self.errors) > 1 else "" @@ -45,7 +45,7 @@ class FakeResourceGroup(BaseModel): f"{errors_len} validation error{plural} detected: {errors}" ) - def _validate_description(self, value): + def _validate_description(self, value: str) -> bool: errors = [] if len(value) > 511: errors.append( @@ -68,7 +68,7 @@ class FakeResourceGroup(BaseModel): return False return True - def _validate_name(self, value): + def _validate_name(self, value: str) -> bool: errors = [] if len(value) > 128: errors.append( @@ -92,7 +92,7 @@ class FakeResourceGroup(BaseModel): return False return True - def _validate_resource_query(self, value): + def _validate_resource_query(self, value: Dict[str, str]) -> bool: if not value: return True errors = [] @@ -117,7 +117,7 @@ class FakeResourceGroup(BaseModel): return False return True - def _validate_tags(self, value): + def _validate_tags(self, value: Dict[str, str]) -> bool: errors = [] # AWS only outputs one error for all keys and one for all values. error_keys = None @@ -160,59 +160,59 @@ class FakeResourceGroup(BaseModel): return True @property - def description(self): + def description(self) -> str: return self._description @description.setter - def description(self, value): + def description(self, value: str) -> None: if not self._validate_description(value=value): self._raise_errors() self._description = value @property - def name(self): + def name(self) -> str: return self._name @name.setter - def name(self, value): + def name(self, value: str) -> None: if not self._validate_name(value=value): self._raise_errors() self._name = value @property - def resource_query(self): + def resource_query(self) -> Dict[str, str]: return self._resource_query @resource_query.setter - def resource_query(self, value): + def resource_query(self, value: Dict[str, str]) -> None: if not self._validate_resource_query(value=value): self._raise_errors() self._resource_query = value @property - def tags(self): + def tags(self) -> Dict[str, str]: return self._tags @tags.setter - def tags(self, value): + def tags(self, value: Dict[str, str]) -> None: if not self._validate_tags(value=value): self._raise_errors() self._tags = value class ResourceGroups: - def __init__(self): - self.by_name = {} - self.by_arn = {} + def __init__(self) -> None: + self.by_name: Dict[str, FakeResourceGroup] = {} + self.by_arn: Dict[str, FakeResourceGroup] = {} - def __contains__(self, item): + def __contains__(self, item: str) -> bool: return item in self.by_name - def append(self, resource_group): + def append(self, resource_group: FakeResourceGroup) -> None: self.by_name[resource_group.name] = resource_group self.by_arn[resource_group.arn] = resource_group - def delete(self, name): + def delete(self, name: str) -> FakeResourceGroup: group = self.by_name[name] del self.by_name[name] del self.by_arn[group.arn] @@ -220,12 +220,12 @@ class ResourceGroups: class ResourceGroupsBackend(BaseBackend): - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.groups = ResourceGroups() @staticmethod - def _validate_resource_query(resource_query): + def _validate_resource_query(resource_query: Dict[str, str]) -> None: if not resource_query: return query_type = resource_query["Type"] @@ -294,14 +294,19 @@ class ResourceGroupsBackend(BaseBackend): ) @staticmethod - def _validate_tags(tags): + def _validate_tags(tags: Dict[str, str]) -> None: for tag in tags: if tag.lower().startswith("aws:"): raise BadRequestException("Tag keys must not start with 'aws:'") def create_group( - self, name, resource_query, description=None, tags=None, configuration=None - ): + self, + name: str, + resource_query: Dict[str, str], + description: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + configuration: Optional[List[Dict[str, Any]]] = None, + ) -> FakeResourceGroup: tags = tags or {} group = FakeResourceGroup( account_id=self.account_id, @@ -320,48 +325,55 @@ class ResourceGroupsBackend(BaseBackend): self.groups.append(group) return group - def delete_group(self, group_name): + def delete_group(self, group_name: str) -> FakeResourceGroup: return self.groups.delete(name=group_name) - def get_group(self, group_name): + def get_group(self, group_name: str) -> FakeResourceGroup: return self.groups.by_name[group_name] - def get_tags(self, arn): + def get_tags(self, arn: str) -> Dict[str, str]: return self.groups.by_arn[arn].tags - def list_groups(self): + def list_groups(self) -> Dict[str, FakeResourceGroup]: """ Pagination or the Filters-parameter is not yet implemented """ return self.groups.by_name - def tag(self, arn, tags): + def tag(self, arn: str, tags: Dict[str, str]) -> None: all_tags = self.groups.by_arn[arn].tags all_tags.update(tags) self._validate_tags(all_tags) self.groups.by_arn[arn].tags = all_tags - def untag(self, arn, keys): + def untag(self, arn: str, keys: List[str]) -> None: group = self.groups.by_arn[arn] for key in keys: del group.tags[key] - def update_group(self, group_name, description=None): + def update_group( + self, group_name: str, description: Optional[str] = None + ) -> FakeResourceGroup: if description: self.groups.by_name[group_name].description = description return self.groups.by_name[group_name] - def update_group_query(self, group_name, resource_query): + def update_group_query( + self, group_name: str, resource_query: Dict[str, str] + ) -> FakeResourceGroup: self._validate_resource_query(resource_query) self.groups.by_name[group_name].resource_query = resource_query return self.groups.by_name[group_name] - def get_group_configuration(self, group_name): - group = self.groups.by_name.get(group_name) - configuration = group.configuration - return configuration + def get_group_configuration( + self, group_name: str + ) -> Optional[List[Dict[str, Any]]]: + group = self.groups.by_name[group_name] + return group.configuration - def put_group_configuration(self, group_name, configuration): + def put_group_configuration( + self, group_name: str, configuration: List[Dict[str, Any]] + ) -> FakeResourceGroup: self.groups.by_name[group_name].configuration = configuration return self.groups.by_name[group_name] diff --git a/moto/resourcegroups/responses.py b/moto/resourcegroups/responses.py index 9a076f867..4057ba4f4 100644 --- a/moto/resourcegroups/responses.py +++ b/moto/resourcegroups/responses.py @@ -3,18 +3,18 @@ import json from urllib.parse import unquote from moto.core.responses import BaseResponse -from .models import resourcegroups_backends +from .models import resourcegroups_backends, ResourceGroupsBackend class ResourceGroupsResponse(BaseResponse): - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="resource-groups") @property - def resourcegroups_backend(self): + def resourcegroups_backend(self) -> ResourceGroupsBackend: return resourcegroups_backends[self.current_account][self.region] - def create_group(self): + def create_group(self) -> str: name = self._get_param("Name") description = self._get_param("Description") resource_query = self._get_param("ResourceQuery") @@ -40,7 +40,7 @@ class ResourceGroupsResponse(BaseResponse): } ) - def delete_group(self): + def delete_group(self) -> str: group_name = self._get_param("GroupName") or self._get_param("Group") group = self.resourcegroups_backend.delete_group(group_name=group_name) return json.dumps( @@ -53,7 +53,7 @@ class ResourceGroupsResponse(BaseResponse): } ) - def get_group(self): + def get_group(self) -> str: group_name = self._get_param("GroupName") group = self.resourcegroups_backend.get_group(group_name=group_name) return json.dumps( @@ -66,7 +66,7 @@ class ResourceGroupsResponse(BaseResponse): } ) - def get_group_query(self): + def get_group_query(self) -> str: group_name = self._get_param("GroupName") group_arn = self._get_param("Group") if group_arn and not group_name: @@ -81,18 +81,18 @@ class ResourceGroupsResponse(BaseResponse): } ) - def get_tags(self): + def get_tags(self) -> str: arn = unquote(self._get_param("Arn")) return json.dumps( {"Arn": arn, "Tags": self.resourcegroups_backend.get_tags(arn=arn)} ) - def list_group_resources(self): + def list_group_resources(self) -> None: raise NotImplementedError( "ResourceGroups.list_group_resources is not yet implemented" ) - def list_groups(self): + def list_groups(self) -> str: groups = self.resourcegroups_backend.list_groups() return json.dumps( { @@ -112,12 +112,12 @@ class ResourceGroupsResponse(BaseResponse): } ) - def search_resources(self): + def search_resources(self) -> None: raise NotImplementedError( "ResourceGroups.search_resources is not yet implemented" ) - def tag(self): + def tag(self) -> str: arn = unquote(self._get_param("Arn")) tags = self._get_param("Tags") if arn not in self.resourcegroups_backend.groups.by_arn: @@ -127,7 +127,7 @@ class ResourceGroupsResponse(BaseResponse): self.resourcegroups_backend.tag(arn=arn, tags=tags) return json.dumps({"Arn": arn, "Tags": tags}) - def untag(self): + def untag(self) -> str: arn = unquote(self._get_param("Arn")) keys = self._get_param("Keys") if arn not in self.resourcegroups_backend.groups.by_arn: @@ -137,7 +137,7 @@ class ResourceGroupsResponse(BaseResponse): self.resourcegroups_backend.untag(arn=arn, keys=keys) return json.dumps({"Arn": arn, "Keys": keys}) - def update_group(self): + def update_group(self) -> str: group_name = self._get_param("GroupName") description = self._get_param("Description", "") group = self.resourcegroups_backend.update_group( @@ -153,7 +153,7 @@ class ResourceGroupsResponse(BaseResponse): } ) - def update_group_query(self): + def update_group_query(self) -> str: group_name = self._get_param("GroupName") resource_query = self._get_param("ResourceQuery") group_arn = self._get_param("Group") @@ -166,14 +166,14 @@ class ResourceGroupsResponse(BaseResponse): {"GroupQuery": {"GroupName": group.name, "ResourceQuery": resource_query}} ) - def get_group_configuration(self): + def get_group_configuration(self) -> str: group_name = self._get_param("Group") configuration = self.resourcegroups_backend.get_group_configuration( group_name=group_name ) return json.dumps({"GroupConfiguration": {"Configuration": configuration}}) - def put_group_configuration(self): + def put_group_configuration(self) -> str: group_name = self._get_param("Group") configuration = self._get_param("Configuration") self.resourcegroups_backend.put_group_configuration( diff --git a/moto/resourcegroupstaggingapi/models.py b/moto/resourcegroupstaggingapi/models.py index 4abb7893f..d7e77938e 100644 --- a/moto/resourcegroupstaggingapi/models.py +++ b/moto/resourcegroupstaggingapi/models.py @@ -1,19 +1,20 @@ +from typing import Any, Dict, List, Iterator, Optional, Tuple from moto.core import BaseBackend, BackendDict from moto.core.exceptions import RESTError from moto.moto_api._internal import mock_random -from moto.s3 import s3_backends +from moto.s3.models import s3_backends, S3Backend from moto.ec2 import ec2_backends -from moto.elb import elb_backends -from moto.elbv2 import elbv2_backends -from moto.kinesis import kinesis_backends -from moto.kms import kms_backends -from moto.rds import rds_backends -from moto.glacier import glacier_backends -from moto.redshift import redshift_backends -from moto.emr import emr_backends -from moto.awslambda import lambda_backends -from moto.ecs import ecs_backends +from moto.elb.models import elb_backends, ELBBackend +from moto.elbv2.models import elbv2_backends, ELBv2Backend +from moto.kinesis.models import kinesis_backends, KinesisBackend +from moto.kms.models import kms_backends, KmsBackend +from moto.rds.models import rds_backends, RDSBackend +from moto.glacier.models import glacier_backends, GlacierBackend +from moto.redshift.models import redshift_backends, RedshiftBackend +from moto.emr.models import emr_backends, ElasticMapReduceBackend +from moto.awslambda.models import lambda_backends, LambdaBackend +from moto.ecs.models import ecs_backends, EC2ContainerServiceBackend # Left: EC2 ElastiCache RDS ELB CloudFront WorkSpaces Lambda EMR Glacier Kinesis Redshift Route53 # StorageGateway DynamoDB MachineLearning ACM DirectConnect DirectoryService CloudHSM @@ -21,106 +22,74 @@ from moto.ecs import ecs_backends class ResourceGroupsTaggingAPIBackend(BaseBackend): - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self._pages = {} + self._pages: Dict[str, Any] = {} # Like 'someuuid': {'gen': , 'misc': None} # Misc is there for peeking from a generator and it cant # fit in the current request. As we only store generators # theres not really any point to clean up @property - def s3_backend(self): - """ - :rtype: moto.s3.models.S3Backend - """ + def s3_backend(self) -> S3Backend: return s3_backends[self.account_id]["global"] @property - def ec2_backend(self): - """ - :rtype: moto.ec2.models.EC2Backend - """ + def ec2_backend(self) -> Any: # type: ignore[misc] return ec2_backends[self.account_id][self.region_name] @property - def elb_backend(self): - """ - :rtype: moto.elb.models.ELBBackend - """ + def elb_backend(self) -> ELBBackend: return elb_backends[self.account_id][self.region_name] @property - def elbv2_backend(self): - """ - :rtype: moto.elbv2.models.ELBv2Backend - """ + def elbv2_backend(self) -> ELBv2Backend: return elbv2_backends[self.account_id][self.region_name] @property - def kinesis_backend(self): - """ - :rtype: moto.kinesis.models.KinesisBackend - """ + def kinesis_backend(self) -> KinesisBackend: return kinesis_backends[self.account_id][self.region_name] @property - def kms_backend(self): - """ - :rtype: moto.kms.models.KmsBackend - """ + def kms_backend(self) -> KmsBackend: return kms_backends[self.account_id][self.region_name] @property - def rds_backend(self): - """ - :rtype: moto.rds.models.RDSBackend - """ + def rds_backend(self) -> RDSBackend: return rds_backends[self.account_id][self.region_name] @property - def glacier_backend(self): - """ - :rtype: moto.glacier.models.GlacierBackend - """ + def glacier_backend(self) -> GlacierBackend: return glacier_backends[self.account_id][self.region_name] @property - def emr_backend(self): - """ - :rtype: moto.emr.models.ElasticMapReduceBackend - """ + def emr_backend(self) -> ElasticMapReduceBackend: return emr_backends[self.account_id][self.region_name] @property - def redshift_backend(self): - """ - :rtype: moto.redshift.models.RedshiftBackend - """ + def redshift_backend(self) -> RedshiftBackend: return redshift_backends[self.account_id][self.region_name] @property - def lambda_backend(self): - """ - :rtype: moto.awslambda.models.LambdaBackend - """ + def lambda_backend(self) -> LambdaBackend: return lambda_backends[self.account_id][self.region_name] @property - def ecs_backend(self): - """ - :rtype: moto.ecs.models.EcsnBackend - """ + def ecs_backend(self) -> EC2ContainerServiceBackend: return ecs_backends[self.account_id][self.region_name] - def _get_resources_generator(self, tag_filters=None, resource_type_filters=None): + def _get_resources_generator( + self, + tag_filters: Optional[List[Dict[str, Any]]] = None, + resource_type_filters: Optional[List[str]] = None, + ) -> Iterator[Dict[str, Any]]: # Look at # https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html # TODO move these to their respective backends filters = [] - for tag_filter_dict in tag_filters: + for tag_filter_dict in tag_filters: # type: ignore values = tag_filter_dict.get("Values", []) if len(values) == 0: # Check key matches @@ -128,36 +97,38 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): elif len(values) == 1: # Check its exactly the same as key, value filters.append( - lambda t, v, key=tag_filter_dict["Key"], value=values[0]: t == key + lambda t, v, key=tag_filter_dict["Key"], value=values[0]: t == key # type: ignore and v == value ) else: # Check key matches and value is one of the provided values filters.append( - lambda t, v, key=tag_filter_dict["Key"], vl=values: t == key + lambda t, v, key=tag_filter_dict["Key"], vl=values: t == key # type: ignore and v in vl ) - def tag_filter(tag_list): + def tag_filter(tag_list: List[Dict[str, str]]) -> bool: result = [] if tag_filters: for f in filters: temp_result = [] for tag in tag_list: - f_result = f(tag["Key"], tag["Value"]) + f_result = f(tag["Key"], tag["Value"]) # type: ignore temp_result.append(f_result) result.append(any(temp_result)) return all(result) else: return True - def format_tags(tags): + def format_tags(tags: Dict[str, str]) -> List[Dict[str, str]]: result = [] for key, value in tags.items(): result.append({"Key": key, "Value": value}) return result - def format_tag_keys(tags, keys): + def format_tag_keys( + tags: List[Dict[str, str]], keys: List[str] + ) -> List[Dict[str, str]]: result = [] for tag in tags: result.append({"Key": tag[keys[0]], "Value": tag[keys[1]]}) @@ -200,7 +171,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): if not resource_type_filters or "ecs:cluster" in resource_type_filters: for cluster in self.ecs_backend.clusters.values(): - tags = format_tag_keys(cluster.tags, ["key", "value"]) + tags = format_tag_keys(cluster.tags, ["key", "value"]) # type: ignore[arg-type] if not tag_filter(tags): continue yield {"ResourceARN": f"{cluster.arn}", "Tags": tags} @@ -222,9 +193,8 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): for ami in self.ec2_backend.amis.values(): tags = format_tags(self.ec2_backend.tags.get(ami.id, {})) - if not tags or not tag_filter( - tags - ): # Skip if no tags, or invalid filter + if not tags or not tag_filter(tags): + # Skip if no tags, or invalid filter continue yield { "ResourceARN": f"arn:aws:ec2:{self.region_name}::image/{ami.id}", @@ -241,9 +211,8 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): for instance in reservation.instances: tags = format_tags(self.ec2_backend.tags.get(instance.id, {})) - if not tags or not tag_filter( - tags - ): # Skip if no tags, or invalid filter + if not tags or not tag_filter(tags): + # Skip if no tags, or invalid filter continue yield { "ResourceARN": f"arn:aws:ec2:{self.region_name}::instance/{instance.id}", @@ -259,9 +228,8 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): for eni in self.ec2_backend.enis.values(): tags = format_tags(self.ec2_backend.tags.get(eni.id, {})) - if not tags or not tag_filter( - tags - ): # Skip if no tags, or invalid filter + if not tags or not tag_filter(tags): + # Skip if no tags, or invalid filter continue yield { "ResourceARN": f"arn:aws:ec2:{self.region_name}::network-interface/{eni.id}", @@ -280,9 +248,8 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): for sg in vpc.values(): tags = format_tags(self.ec2_backend.tags.get(sg.id, {})) - if not tags or not tag_filter( - tags - ): # Skip if no tags, or invalid filter + if not tags or not tag_filter(tags): + # Skip if no tags, or invalid filter continue yield { "ResourceARN": f"arn:aws:ec2:{self.region_name}::security-group/{sg.id}", @@ -298,9 +265,8 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): for snapshot in self.ec2_backend.snapshots.values(): tags = format_tags(self.ec2_backend.tags.get(snapshot.id, {})) - if not tags or not tag_filter( - tags - ): # Skip if no tags, or invalid filter + if not tags or not tag_filter(tags): + # Skip if no tags, or invalid filter continue yield { "ResourceARN": f"arn:aws:ec2:{self.region_name}::snapshot/{snapshot.id}", @@ -382,12 +348,12 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): or "rds" in resource_type_filters or "rds:cluster" in resource_type_filters ): - for cluster in self.rds_backend.clusters.values(): - tags = cluster.get_tags() + for rds_cluster in self.rds_backend.clusters.values(): + tags = rds_cluster.get_tags() if not tags or not tag_filter(tags): continue yield { - "ResourceARN": cluster.db_cluster_arn, + "ResourceARN": rds_cluster.db_cluster_arn, "Tags": tags, } @@ -487,7 +453,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): "Tags": tags, } - def _get_tag_keys_generator(self): + def _get_tag_keys_generator(self) -> Iterator[str]: # Look at # https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html @@ -498,7 +464,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): yield key # EC2 tags - def get_ec2_keys(res_id): + def get_ec2_keys(res_id: str) -> List[Dict[str, str]]: result = [] for key in self.ec2_backend.tags.get(res_id, {}): result.append(key) @@ -506,18 +472,18 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): # EC2 AMI, resource type ec2:image for ami in self.ec2_backend.amis.values(): - for key in get_ec2_keys(ami.id): + for key in get_ec2_keys(ami.id): # type: ignore[assignment] yield key # EC2 Instance, resource type ec2:instance for reservation in self.ec2_backend.reservations.values(): for instance in reservation.instances: - for key in get_ec2_keys(instance.id): + for key in get_ec2_keys(instance.id): # type: ignore[assignment] yield key # EC2 NetworkInterface, resource type ec2:network-interface for eni in self.ec2_backend.enis.values(): - for key in get_ec2_keys(eni.id): + for key in get_ec2_keys(eni.id): # type: ignore[assignment] yield key # TODO EC2 ReservedInstance @@ -525,22 +491,22 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): # EC2 SecurityGroup, resource type ec2:security-group for vpc in self.ec2_backend.groups.values(): for sg in vpc.values(): - for key in get_ec2_keys(sg.id): + for key in get_ec2_keys(sg.id): # type: ignore[assignment] yield key # EC2 Snapshot, resource type ec2:snapshot for snapshot in self.ec2_backend.snapshots.values(): - for key in get_ec2_keys(snapshot.id): + for key in get_ec2_keys(snapshot.id): # type: ignore[assignment] yield key # TODO EC2 SpotInstanceRequest # EC2 Volume, resource type ec2:volume for volume in self.ec2_backend.volumes.values(): - for key in get_ec2_keys(volume.id): + for key in get_ec2_keys(volume.id): # type: ignore[assignment] yield key - def _get_tag_values_generator(self, tag_key): + def _get_tag_values_generator(self, tag_key: str) -> Iterator[str]: # Look at # https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html @@ -552,7 +518,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): yield value # EC2 tags - def get_ec2_values(res_id): + def get_ec2_values(res_id: str) -> List[Dict[str, str]]: result = [] for key, value in self.ec2_backend.tags.get(res_id, {}).items(): if key == tag_key: @@ -561,18 +527,18 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): # EC2 AMI, resource type ec2:image for ami in self.ec2_backend.amis.values(): - for value in get_ec2_values(ami.id): + for value in get_ec2_values(ami.id): # type: ignore[assignment] yield value # EC2 Instance, resource type ec2:instance for reservation in self.ec2_backend.reservations.values(): for instance in reservation.instances: - for value in get_ec2_values(instance.id): + for value in get_ec2_values(instance.id): # type: ignore[assignment] yield value # EC2 NetworkInterface, resource type ec2:network-interface for eni in self.ec2_backend.enis.values(): - for value in get_ec2_values(eni.id): + for value in get_ec2_values(eni.id): # type: ignore[assignment] yield value # TODO EC2 ReservedInstance @@ -580,29 +546,29 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): # EC2 SecurityGroup, resource type ec2:security-group for vpc in self.ec2_backend.groups.values(): for sg in vpc.values(): - for value in get_ec2_values(sg.id): + for value in get_ec2_values(sg.id): # type: ignore[assignment] yield value # EC2 Snapshot, resource type ec2:snapshot for snapshot in self.ec2_backend.snapshots.values(): - for value in get_ec2_values(snapshot.id): + for value in get_ec2_values(snapshot.id): # type: ignore[assignment] yield value # TODO EC2 SpotInstanceRequest # EC2 Volume, resource type ec2:volume for volume in self.ec2_backend.volumes.values(): - for value in get_ec2_values(volume.id): + for value in get_ec2_values(volume.id): # type: ignore[assignment] yield value def get_resources( self, - pagination_token=None, - resources_per_page=50, - tags_per_page=100, - tag_filters=None, - resource_type_filters=None, - ): + pagination_token: Optional[str] = None, + resources_per_page: int = 50, + tags_per_page: int = 100, + tag_filters: Optional[List[Dict[str, Any]]] = None, + resource_type_filters: Optional[List[str]] = None, + ) -> Tuple[Optional[str], List[Dict[str, Any]]]: # Simple range checking if 100 >= tags_per_page >= 500: raise RESTError( @@ -666,7 +632,9 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): return new_token, result - def get_tag_keys(self, pagination_token=None): + def get_tag_keys( + self, pagination_token: Optional[str] = None + ) -> Tuple[Optional[str], List[str]]: if pagination_token: if pagination_token not in self._pages: @@ -712,7 +680,9 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): return new_token, result - def get_tag_values(self, pagination_token, key): + def get_tag_values( + self, pagination_token: Optional[str], key: str + ) -> Tuple[Optional[str], List[str]]: if pagination_token: if pagination_token not in self._pages: diff --git a/moto/resourcegroupstaggingapi/responses.py b/moto/resourcegroupstaggingapi/responses.py index 3ecea5f6e..190ae4337 100644 --- a/moto/resourcegroupstaggingapi/responses.py +++ b/moto/resourcegroupstaggingapi/responses.py @@ -1,22 +1,17 @@ from moto.core.responses import BaseResponse -from .models import resourcegroupstaggingapi_backends +from .models import resourcegroupstaggingapi_backends, ResourceGroupsTaggingAPIBackend import json class ResourceGroupsTaggingAPIResponse(BaseResponse): - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="resourcegroupstaggingapi") @property - def backend(self): - """ - Backend - :returns: Resource tagging api backend - :rtype: moto.resourcegroupstaggingapi.models.ResourceGroupsTaggingAPIBackend - """ + def backend(self) -> ResourceGroupsTaggingAPIBackend: return resourcegroupstaggingapi_backends[self.current_account][self.region] - def get_resources(self): + def get_resources(self) -> str: pagination_token = self._get_param("PaginationToken") tag_filters = self._get_param("TagFilters", []) resources_per_page = self._get_int_param("ResourcesPerPage", 50) @@ -38,7 +33,7 @@ class ResourceGroupsTaggingAPIResponse(BaseResponse): return json.dumps(response) - def get_tag_keys(self): + def get_tag_keys(self) -> str: pagination_token = self._get_param("PaginationToken") pagination_token, tag_keys = self.backend.get_tag_keys( pagination_token=pagination_token @@ -50,7 +45,7 @@ class ResourceGroupsTaggingAPIResponse(BaseResponse): return json.dumps(response) - def get_tag_values(self): + def get_tag_values(self) -> str: pagination_token = self._get_param("PaginationToken") key = self._get_param("Key") pagination_token, tag_values = self.backend.get_tag_values( diff --git a/setup.cfg b/setup.cfg index a973320f1..3eddd8bdd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -239,7 +239,7 @@ disable = W,C,R,E enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import [mypy] -files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/f*,moto/g*,moto/i*,moto/k*,moto/l*,moto/m*,moto/n*,moto/o*,moto/p*,moto/q*,moto/ram,moto/rds*,moto/redshift*,moto/rekognition,moto/scheduler +files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/f*,moto/g*,moto/i*,moto/k*,moto/l*,moto/m*,moto/n*,moto/o*,moto/p*,moto/q*,moto/ram,moto/rds*,moto/redshift*,moto/rekognition,moto/resourcegroups*,moto/scheduler show_column_numbers=True show_error_codes = True disable_error_code=abstract