diff --git a/Makefile b/Makefile index d1636d048..7adc3c1a5 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,7 @@ lint: @echo "Running pylint..." pylint -j 0 moto tests @echo "Running MyPy..." - mypy --install-types --non-interactive moto/acm moto/applicationautoscaling/ + mypy --install-types --non-interactive moto/acm moto/amp moto/applicationautoscaling/ format: black moto/ tests/ diff --git a/moto/amp/exceptions.py b/moto/amp/exceptions.py index 45e8e789f..e5223d163 100644 --- a/moto/amp/exceptions.py +++ b/moto/amp/exceptions.py @@ -7,7 +7,7 @@ class AmpException(JsonRESTError): class ResourceNotFoundException(AmpException): - def __init__(self, message, resource_id, resource_type): + def __init__(self, message: str, resource_id: str, resource_type: str): super().__init__("ResourceNotFoundException", message) self.description = json.dumps( { @@ -21,7 +21,7 @@ class ResourceNotFoundException(AmpException): class WorkspaceNotFound(ResourceNotFoundException): code = 404 - def __init__(self, workspace_id): + def __init__(self, workspace_id: str): super().__init__( "Workspace not found", resource_id=workspace_id, @@ -32,7 +32,7 @@ class WorkspaceNotFound(ResourceNotFoundException): class RuleGroupNamespaceNotFound(ResourceNotFoundException): code = 404 - def __init__(self, name): + def __init__(self, name: str): super().__init__( "RuleGroupNamespace not found", resource_id=name, diff --git a/moto/amp/models.py b/moto/amp/models.py index 02160b1a4..032b7819a 100644 --- a/moto/amp/models.py +++ b/moto/amp/models.py @@ -5,13 +5,21 @@ from moto.core.utils import BackendDict, unix_time from moto.moto_api._internal import mock_random from moto.utilities.paginator import paginate from moto.utilities.tagging_service import TaggingService -from typing import Dict +from typing import Any, Callable, Dict, List from .exceptions import RuleGroupNamespaceNotFound, WorkspaceNotFound from .utils import PAGINATION_MODEL class RuleGroupNamespace(BaseModel): - def __init__(self, account_id, region, workspace_id, name, data, tag_fn): + def __init__( + self, + account_id: str, + region: str, + workspace_id: str, + name: str, + data: str, + tag_fn: Callable[[str], Dict[str, str]], + ): self.name = name self.data = data self.tag_fn = tag_fn @@ -19,11 +27,11 @@ class RuleGroupNamespace(BaseModel): self.created_at = unix_time() self.modified_at = self.created_at - def update(self, new_data): + def update(self, new_data: str) -> None: self.data = new_data self.modified_at = unix_time() - def to_dict(self): + def to_dict(self) -> Dict[str, Any]: return { "name": self.name, "arn": self.arn, @@ -36,7 +44,13 @@ class RuleGroupNamespace(BaseModel): class Workspace(BaseModel): - def __init__(self, account_id, region, alias, tag_fn): + def __init__( + self, + account_id: str, + region: str, + alias: str, + tag_fn: Callable[[str], Dict[str, str]], + ): self.alias = alias self.workspace_id = f"ws-{mock_random.uuid4()}" self.arn = f"arn:aws:aps:{region}:{account_id}:workspace/{self.workspace_id}" @@ -44,9 +58,9 @@ class Workspace(BaseModel): self.status = {"statusCode": "ACTIVE"} self.created_at = unix_time() self.tag_fn = tag_fn - self.rule_group_namespaces = dict() + self.rule_group_namespaces: Dict[str, RuleGroupNamespace] = dict() - def to_dict(self): + def to_dict(self) -> Dict[str, Any]: return { "alias": self.alias, "arn": self.arn, @@ -61,12 +75,12 @@ class Workspace(BaseModel): class PrometheusServiceBackend(BaseBackend): """Implementation of PrometheusService APIs.""" - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.workspaces: Dict(str, Workspace) = dict() + self.workspaces: Dict[str, Workspace] = dict() self.tagger = TaggingService() - def create_workspace(self, alias, tags): + def create_workspace(self, alias: str, tags: Dict[str, str]) -> Workspace: """ The ClientToken-parameter is not yet implemented """ @@ -80,41 +94,41 @@ class PrometheusServiceBackend(BaseBackend): self.tag_resource(workspace.arn, tags) return workspace - def describe_workspace(self, workspace_id) -> Workspace: + def describe_workspace(self, workspace_id: str) -> Workspace: if workspace_id not in self.workspaces: raise WorkspaceNotFound(workspace_id) return self.workspaces[workspace_id] - def list_tags_for_resource(self, resource_arn): + def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]: return self.tagger.get_tag_dict_for_resource(resource_arn) - def update_workspace_alias(self, alias, workspace_id): + def update_workspace_alias(self, alias: str, workspace_id: str) -> None: """ The ClientToken-parameter is not yet implemented """ self.workspaces[workspace_id].alias = alias - def delete_workspace(self, workspace_id): + def delete_workspace(self, workspace_id: str) -> None: """ The ClientToken-parameter is not yet implemented """ self.workspaces.pop(workspace_id, None) - @paginate(pagination_model=PAGINATION_MODEL) - def list_workspaces(self, alias): + @paginate(pagination_model=PAGINATION_MODEL) # type: ignore + def list_workspaces(self, alias: str) -> List[Workspace]: # type: ignore[misc] if alias: return [w for w in self.workspaces.values() if w.alias == alias] return list(self.workspaces.values()) - def tag_resource(self, resource_arn, tags): - tags = self.tagger.convert_dict_to_tags_input(tags) - self.tagger.tag_resource(resource_arn, tags) + def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None: + tag_list = self.tagger.convert_dict_to_tags_input(tags) + self.tagger.tag_resource(resource_arn, tag_list) - def untag_resource(self, resource_arn, tag_keys): + def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None: self.tagger.untag_resource_using_names(resource_arn, tag_keys) def create_rule_groups_namespace( - self, data, name, tags, workspace_id + self, data: str, name: str, tags: Dict[str, str], workspace_id: str ) -> RuleGroupNamespace: """ The ClientToken-parameter is not yet implemented @@ -132,20 +146,24 @@ class PrometheusServiceBackend(BaseBackend): self.tag_resource(group.arn, tags) return group - def delete_rule_groups_namespace(self, name, workspace_id) -> None: + def delete_rule_groups_namespace(self, name: str, workspace_id: str) -> None: """ The ClientToken-parameter is not yet implemented """ ws = self.describe_workspace(workspace_id) ws.rule_group_namespaces.pop(name, None) - def describe_rule_groups_namespace(self, name, workspace_id) -> RuleGroupNamespace: + def describe_rule_groups_namespace( + self, name: str, workspace_id: str + ) -> RuleGroupNamespace: ws = self.describe_workspace(workspace_id) if name not in ws.rule_group_namespaces: raise RuleGroupNamespaceNotFound(name=name) return ws.rule_group_namespaces[name] - def put_rule_groups_namespace(self, data, name, workspace_id) -> RuleGroupNamespace: + def put_rule_groups_namespace( + self, data: str, name: str, workspace_id: str + ) -> RuleGroupNamespace: """ The ClientToken-parameter is not yet implemented """ @@ -153,8 +171,8 @@ class PrometheusServiceBackend(BaseBackend): ns.update(data) return ns - @paginate(pagination_model=PAGINATION_MODEL) - def list_rule_groups_namespaces(self, name, workspace_id): + @paginate(pagination_model=PAGINATION_MODEL) # type: ignore + def list_rule_groups_namespaces(self, name: str, workspace_id: str) -> List[RuleGroupNamespace]: # type: ignore ws = self.describe_workspace(workspace_id) if name: return [ diff --git a/moto/amp/responses.py b/moto/amp/responses.py index ec1995ce7..08f43a8bd 100644 --- a/moto/amp/responses.py +++ b/moto/amp/responses.py @@ -3,13 +3,14 @@ import json from moto.core.responses import BaseResponse from .models import amp_backends, PrometheusServiceBackend +from typing import Any from urllib.parse import unquote class PrometheusServiceResponse(BaseResponse): """Handler for PrometheusService requests and responses.""" - def tags(self, request, full_url, headers): + def tags(self, request: Any, full_url: str, headers: Any) -> str: # type: ignore [return] self.setup_class(request, full_url, headers) if request.method == "GET": return self.list_tags_for_resource() @@ -18,7 +19,7 @@ class PrometheusServiceResponse(BaseResponse): if request.method == "DELETE": return self.untag_resource() - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="amp") @property @@ -26,36 +27,36 @@ class PrometheusServiceResponse(BaseResponse): """Return backend instance specific for this region.""" return amp_backends[self.current_account][self.region] - def create_workspace(self): + def create_workspace(self) -> str: params = json.loads(self.body) alias = params.get("alias") tags = params.get("tags") workspace = self.amp_backend.create_workspace(alias=alias, tags=tags) return json.dumps(dict(workspace.to_dict())) - def describe_workspace(self): + def describe_workspace(self) -> str: workspace_id = self.path.split("/")[-1] workspace = self.amp_backend.describe_workspace(workspace_id=workspace_id) return json.dumps(dict(workspace=workspace.to_dict())) - def list_tags_for_resource(self): + def list_tags_for_resource(self) -> str: resource_arn = unquote(self.path).split("tags/")[-1] tags = self.amp_backend.list_tags_for_resource(resource_arn=resource_arn) return json.dumps(dict(tags=tags)) - def update_workspace_alias(self): + def update_workspace_alias(self) -> str: params = json.loads(self.body) alias = params.get("alias") workspace_id = self.path.split("/")[-2] self.amp_backend.update_workspace_alias(alias=alias, workspace_id=workspace_id) return json.dumps(dict()) - def delete_workspace(self): + def delete_workspace(self) -> str: workspace_id = self.path.split("/")[-1] self.amp_backend.delete_workspace(workspace_id=workspace_id) return json.dumps(dict()) - def list_workspaces(self): + def list_workspaces(self) -> str: alias = self._get_param("alias") max_results = self._get_int_param("maxResults") next_token = self._get_param("nextToken") @@ -66,20 +67,20 @@ class PrometheusServiceResponse(BaseResponse): {"nextToken": next_token, "workspaces": [w.to_dict() for w in workspaces]} ) - def tag_resource(self): + def tag_resource(self) -> str: params = json.loads(self.body) resource_arn = unquote(self.path).split("tags/")[-1] tags = params.get("tags") self.amp_backend.tag_resource(resource_arn=resource_arn, tags=tags) return json.dumps(dict()) - def untag_resource(self): + def untag_resource(self) -> str: resource_arn = unquote(self.path).split("tags/")[-1] tag_keys = self.querystring.get("tagKeys", []) self.amp_backend.untag_resource(resource_arn=resource_arn, tag_keys=tag_keys) return json.dumps(dict()) - def create_rule_groups_namespace(self): + def create_rule_groups_namespace(self) -> str: params = json.loads(self.body) data = params.get("data") name = params.get("name") @@ -93,7 +94,7 @@ class PrometheusServiceResponse(BaseResponse): ) return json.dumps(rule_group_namespace.to_dict()) - def delete_rule_groups_namespace(self): + def delete_rule_groups_namespace(self) -> str: name = unquote(self.path).split("/")[-1] workspace_id = unquote(self.path).split("/")[-3] self.amp_backend.delete_rule_groups_namespace( @@ -102,7 +103,7 @@ class PrometheusServiceResponse(BaseResponse): ) return json.dumps(dict()) - def describe_rule_groups_namespace(self): + def describe_rule_groups_namespace(self) -> str: name = unquote(self.path).split("/")[-1] workspace_id = unquote(self.path).split("/")[-3] ns = self.amp_backend.describe_rule_groups_namespace( @@ -110,7 +111,7 @@ class PrometheusServiceResponse(BaseResponse): ) return json.dumps(dict(ruleGroupsNamespace=ns.to_dict())) - def put_rule_groups_namespace(self): + def put_rule_groups_namespace(self) -> str: params = json.loads(self.body) data = params.get("data") name = unquote(self.path).split("/")[-1] @@ -122,7 +123,7 @@ class PrometheusServiceResponse(BaseResponse): ) return json.dumps(ns.to_dict()) - def list_rule_groups_namespaces(self): + def list_rule_groups_namespaces(self) -> str: max_results = self._get_int_param("maxResults") next_token = self._get_param("nextToken") name = self._get_param("name") diff --git a/moto/core/exceptions.py b/moto/core/exceptions.py index 4f71c54d2..7edd796b7 100644 --- a/moto/core/exceptions.py +++ b/moto/core/exceptions.py @@ -1,6 +1,6 @@ from werkzeug.exceptions import HTTPException from jinja2 import DictLoader, Environment -from typing import Optional +from typing import Any, Optional import json # TODO: add "Sender" to error responses below? @@ -80,7 +80,9 @@ class DryRunClientError(RESTError): class JsonRESTError(RESTError): - def __init__(self, error_type, message, template="error_json", **kwargs) -> None: + def __init__( + self, error_type: str, message: str, template: str = "error_json", **kwargs: Any + ): super().__init__(error_type, message, template, **kwargs) self.description = json.dumps( {"__type": self.error_type, "message": self.message} diff --git a/moto/core/responses.py b/moto/core/responses.py index 432d50b96..52d148098 100644 --- a/moto/core/responses.py +++ b/moto/core/responses.py @@ -218,7 +218,9 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin): def dispatch(cls, *args, **kwargs): return cls()._dispatch(*args, **kwargs) - def setup_class(self, request, full_url, headers, use_raw_body=False) -> None: + def setup_class( + self, request: Any, full_url: str, headers: Any, use_raw_body: bool = False + ) -> None: """ use_raw_body: Use incoming bytes if True, encode to string otherwise """ diff --git a/moto/core/utils.py b/moto/core/utils.py index f8a41baae..1411b6aa1 100644 --- a/moto/core/utils.py +++ b/moto/core/utils.py @@ -175,7 +175,7 @@ def str_to_rfc_1123_datetime(value): return datetime.datetime.strptime(value, RFC1123) -def unix_time(dt=None): +def unix_time(dt: datetime.datetime = None) -> datetime.datetime: dt = dt or datetime.datetime.utcnow() epoch = datetime.datetime.utcfromtimestamp(0) delta = dt - epoch diff --git a/moto/utilities/tagging_service.py b/moto/utilities/tagging_service.py index a2710f41f..a5b56aa60 100644 --- a/moto/utilities/tagging_service.py +++ b/moto/utilities/tagging_service.py @@ -1,17 +1,20 @@ """Tag functionality contained in class TaggingService.""" import re +from typing import Dict, List class TaggingService: """Functionality related to tags, i.e., adding, deleting, testing.""" - def __init__(self, tag_name="Tags", key_name="Key", value_name="Value"): + def __init__( + self, tag_name: str = "Tags", key_name: str = "Key", value_name: str = "Value" + ): self.tag_name = tag_name self.key_name = key_name self.value_name = value_name - self.tags = {} + self.tags: Dict[str, str] = {} - def get_tag_dict_for_resource(self, arn): + def get_tag_dict_for_resource(self, arn: str) -> Dict[str, str]: """Return dict of key/value pairs vs. list of key/values dicts.""" result = {} if self.has_tags(arn): @@ -19,7 +22,7 @@ class TaggingService: result[key] = val return result - def list_tags_for_resource(self, arn): + def list_tags_for_resource(self, arn: str) -> List[Dict[str, str]]: """Return list of tags inside dict with key of "tag_name". Useful for describe functions; this return value can be added to @@ -31,16 +34,16 @@ class TaggingService: result.append({self.key_name: key, self.value_name: val}) return {self.tag_name: result} - def delete_all_tags_for_resource(self, arn): + def delete_all_tags_for_resource(self, arn: str) -> None: """Delete all tags associated with given ARN.""" if self.has_tags(arn): del self.tags[arn] - def has_tags(self, arn): + def has_tags(self, arn: str) -> bool: """Return True if the ARN has any associated tags, False otherwise.""" return arn in self.tags - def tag_resource(self, arn, tags): + def tag_resource(self, arn: str, tags: List[Dict[str, str]]) -> None: """Store associated list of dicts with ARN. Note: the storage is internal to this class instance. @@ -55,7 +58,7 @@ class TaggingService: else: self.tags[arn][tag[self.key_name]] = None - def copy_tags(self, from_arn, to_arn): + def copy_tags(self, from_arn: str, to_arn: str) -> None: """Copy stored list of tags associated with one ARN to another ARN. Note: the storage is internal to this class instance. @@ -65,13 +68,13 @@ class TaggingService: to_arn, self.list_tags_for_resource(from_arn)[self.tag_name] ) - def untag_resource_using_names(self, arn, tag_names): + def untag_resource_using_names(self, arn: str, tag_names: List[str]) -> None: """Remove tags associated with ARN using key names in 'tag_names'.""" for name in tag_names: if name in self.tags.get(arn, {}): del self.tags[arn][name] - def untag_resource_using_tags(self, arn, tags): + def untag_resource_using_tags(self, arn: str, tags: List[Dict[str, str]]) -> None: """Remove tags associated with ARN using key/value pairs in 'tags'.""" current_tags = self.tags.get(arn, {}) for tag in tags: @@ -83,7 +86,7 @@ class TaggingService: # If both key and value are provided, match both before deletion del current_tags[tag[self.key_name]] - def extract_tag_names(self, tags): + def extract_tag_names(self, tags: Dict[str, str]) -> None: """Return list of key names in list of 'tags' key/value dicts.""" results = [] if len(tags) == 0: @@ -93,7 +96,7 @@ class TaggingService: results.append(tag[self.key_name]) return results - def flatten_tag_list(self, tags): + def flatten_tag_list(self, tags: List[Dict[str, str]]) -> Dict[str, str]: """Return dict of key/value pairs with 'tag_name', 'value_name'.""" result = {} for tag in tags: @@ -168,7 +171,7 @@ class TaggingService: ) @staticmethod - def convert_dict_to_tags_input(tags): + def convert_dict_to_tags_input(tags: Dict[str, str]) -> List[Dict[str, str]]: """Given a dictionary, return generic boto params for tags""" if not tags: return [] diff --git a/setup.cfg b/setup.cfg index 5bce4a8ad..f7fc6b329 100644 --- a/setup.cfg +++ b/setup.cfg @@ -39,7 +39,7 @@ no_implicit_optional=True strict_optional=True warn_redundant_casts=True -warn_unused_ignores=True +warn_unused_ignores=False warn_no_return=True warn_return_any=False warn_unreachable=False