TechDebt: Enable MyPy on AMP (#5546)
This commit is contained in:
parent
4db1f25cbc
commit
d60aa64abb
2
Makefile
2
Makefile
@ -28,7 +28,7 @@ lint:
|
||||
@echo "Running pylint..."
|
||||
pylint -j 0 moto tests
|
||||
@echo "Running MyPy..."
|
||||
mypy --install-types --non-interactive moto/acm moto/applicationautoscaling/
|
||||
mypy --install-types --non-interactive moto/acm moto/amp moto/applicationautoscaling/
|
||||
|
||||
format:
|
||||
black moto/ tests/
|
||||
|
@ -7,7 +7,7 @@ class AmpException(JsonRESTError):
|
||||
|
||||
|
||||
class ResourceNotFoundException(AmpException):
|
||||
def __init__(self, message, resource_id, resource_type):
|
||||
def __init__(self, message: str, resource_id: str, resource_type: str):
|
||||
super().__init__("ResourceNotFoundException", message)
|
||||
self.description = json.dumps(
|
||||
{
|
||||
@ -21,7 +21,7 @@ class ResourceNotFoundException(AmpException):
|
||||
class WorkspaceNotFound(ResourceNotFoundException):
|
||||
code = 404
|
||||
|
||||
def __init__(self, workspace_id):
|
||||
def __init__(self, workspace_id: str):
|
||||
super().__init__(
|
||||
"Workspace not found",
|
||||
resource_id=workspace_id,
|
||||
@ -32,7 +32,7 @@ class WorkspaceNotFound(ResourceNotFoundException):
|
||||
class RuleGroupNamespaceNotFound(ResourceNotFoundException):
|
||||
code = 404
|
||||
|
||||
def __init__(self, name):
|
||||
def __init__(self, name: str):
|
||||
super().__init__(
|
||||
"RuleGroupNamespace not found",
|
||||
resource_id=name,
|
||||
|
@ -5,13 +5,21 @@ from moto.core.utils import BackendDict, unix_time
|
||||
from moto.moto_api._internal import mock_random
|
||||
from moto.utilities.paginator import paginate
|
||||
from moto.utilities.tagging_service import TaggingService
|
||||
from typing import Dict
|
||||
from typing import Any, Callable, Dict, List
|
||||
from .exceptions import RuleGroupNamespaceNotFound, WorkspaceNotFound
|
||||
from .utils import PAGINATION_MODEL
|
||||
|
||||
|
||||
class RuleGroupNamespace(BaseModel):
|
||||
def __init__(self, account_id, region, workspace_id, name, data, tag_fn):
|
||||
def __init__(
|
||||
self,
|
||||
account_id: str,
|
||||
region: str,
|
||||
workspace_id: str,
|
||||
name: str,
|
||||
data: str,
|
||||
tag_fn: Callable[[str], Dict[str, str]],
|
||||
):
|
||||
self.name = name
|
||||
self.data = data
|
||||
self.tag_fn = tag_fn
|
||||
@ -19,11 +27,11 @@ class RuleGroupNamespace(BaseModel):
|
||||
self.created_at = unix_time()
|
||||
self.modified_at = self.created_at
|
||||
|
||||
def update(self, new_data):
|
||||
def update(self, new_data: str) -> None:
|
||||
self.data = new_data
|
||||
self.modified_at = unix_time()
|
||||
|
||||
def to_dict(self):
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"name": self.name,
|
||||
"arn": self.arn,
|
||||
@ -36,7 +44,13 @@ class RuleGroupNamespace(BaseModel):
|
||||
|
||||
|
||||
class Workspace(BaseModel):
|
||||
def __init__(self, account_id, region, alias, tag_fn):
|
||||
def __init__(
|
||||
self,
|
||||
account_id: str,
|
||||
region: str,
|
||||
alias: str,
|
||||
tag_fn: Callable[[str], Dict[str, str]],
|
||||
):
|
||||
self.alias = alias
|
||||
self.workspace_id = f"ws-{mock_random.uuid4()}"
|
||||
self.arn = f"arn:aws:aps:{region}:{account_id}:workspace/{self.workspace_id}"
|
||||
@ -44,9 +58,9 @@ class Workspace(BaseModel):
|
||||
self.status = {"statusCode": "ACTIVE"}
|
||||
self.created_at = unix_time()
|
||||
self.tag_fn = tag_fn
|
||||
self.rule_group_namespaces = dict()
|
||||
self.rule_group_namespaces: Dict[str, RuleGroupNamespace] = dict()
|
||||
|
||||
def to_dict(self):
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"alias": self.alias,
|
||||
"arn": self.arn,
|
||||
@ -61,12 +75,12 @@ class Workspace(BaseModel):
|
||||
class PrometheusServiceBackend(BaseBackend):
|
||||
"""Implementation of PrometheusService APIs."""
|
||||
|
||||
def __init__(self, region_name, account_id):
|
||||
def __init__(self, region_name: str, account_id: str):
|
||||
super().__init__(region_name, account_id)
|
||||
self.workspaces: Dict(str, Workspace) = dict()
|
||||
self.workspaces: Dict[str, Workspace] = dict()
|
||||
self.tagger = TaggingService()
|
||||
|
||||
def create_workspace(self, alias, tags):
|
||||
def create_workspace(self, alias: str, tags: Dict[str, str]) -> Workspace:
|
||||
"""
|
||||
The ClientToken-parameter is not yet implemented
|
||||
"""
|
||||
@ -80,41 +94,41 @@ class PrometheusServiceBackend(BaseBackend):
|
||||
self.tag_resource(workspace.arn, tags)
|
||||
return workspace
|
||||
|
||||
def describe_workspace(self, workspace_id) -> Workspace:
|
||||
def describe_workspace(self, workspace_id: str) -> Workspace:
|
||||
if workspace_id not in self.workspaces:
|
||||
raise WorkspaceNotFound(workspace_id)
|
||||
return self.workspaces[workspace_id]
|
||||
|
||||
def list_tags_for_resource(self, resource_arn):
|
||||
def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]:
|
||||
return self.tagger.get_tag_dict_for_resource(resource_arn)
|
||||
|
||||
def update_workspace_alias(self, alias, workspace_id):
|
||||
def update_workspace_alias(self, alias: str, workspace_id: str) -> None:
|
||||
"""
|
||||
The ClientToken-parameter is not yet implemented
|
||||
"""
|
||||
self.workspaces[workspace_id].alias = alias
|
||||
|
||||
def delete_workspace(self, workspace_id):
|
||||
def delete_workspace(self, workspace_id: str) -> None:
|
||||
"""
|
||||
The ClientToken-parameter is not yet implemented
|
||||
"""
|
||||
self.workspaces.pop(workspace_id, None)
|
||||
|
||||
@paginate(pagination_model=PAGINATION_MODEL)
|
||||
def list_workspaces(self, alias):
|
||||
@paginate(pagination_model=PAGINATION_MODEL) # type: ignore
|
||||
def list_workspaces(self, alias: str) -> List[Workspace]: # type: ignore[misc]
|
||||
if alias:
|
||||
return [w for w in self.workspaces.values() if w.alias == alias]
|
||||
return list(self.workspaces.values())
|
||||
|
||||
def tag_resource(self, resource_arn, tags):
|
||||
tags = self.tagger.convert_dict_to_tags_input(tags)
|
||||
self.tagger.tag_resource(resource_arn, tags)
|
||||
def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None:
|
||||
tag_list = self.tagger.convert_dict_to_tags_input(tags)
|
||||
self.tagger.tag_resource(resource_arn, tag_list)
|
||||
|
||||
def untag_resource(self, resource_arn, tag_keys):
|
||||
def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None:
|
||||
self.tagger.untag_resource_using_names(resource_arn, tag_keys)
|
||||
|
||||
def create_rule_groups_namespace(
|
||||
self, data, name, tags, workspace_id
|
||||
self, data: str, name: str, tags: Dict[str, str], workspace_id: str
|
||||
) -> RuleGroupNamespace:
|
||||
"""
|
||||
The ClientToken-parameter is not yet implemented
|
||||
@ -132,20 +146,24 @@ class PrometheusServiceBackend(BaseBackend):
|
||||
self.tag_resource(group.arn, tags)
|
||||
return group
|
||||
|
||||
def delete_rule_groups_namespace(self, name, workspace_id) -> None:
|
||||
def delete_rule_groups_namespace(self, name: str, workspace_id: str) -> None:
|
||||
"""
|
||||
The ClientToken-parameter is not yet implemented
|
||||
"""
|
||||
ws = self.describe_workspace(workspace_id)
|
||||
ws.rule_group_namespaces.pop(name, None)
|
||||
|
||||
def describe_rule_groups_namespace(self, name, workspace_id) -> RuleGroupNamespace:
|
||||
def describe_rule_groups_namespace(
|
||||
self, name: str, workspace_id: str
|
||||
) -> RuleGroupNamespace:
|
||||
ws = self.describe_workspace(workspace_id)
|
||||
if name not in ws.rule_group_namespaces:
|
||||
raise RuleGroupNamespaceNotFound(name=name)
|
||||
return ws.rule_group_namespaces[name]
|
||||
|
||||
def put_rule_groups_namespace(self, data, name, workspace_id) -> RuleGroupNamespace:
|
||||
def put_rule_groups_namespace(
|
||||
self, data: str, name: str, workspace_id: str
|
||||
) -> RuleGroupNamespace:
|
||||
"""
|
||||
The ClientToken-parameter is not yet implemented
|
||||
"""
|
||||
@ -153,8 +171,8 @@ class PrometheusServiceBackend(BaseBackend):
|
||||
ns.update(data)
|
||||
return ns
|
||||
|
||||
@paginate(pagination_model=PAGINATION_MODEL)
|
||||
def list_rule_groups_namespaces(self, name, workspace_id):
|
||||
@paginate(pagination_model=PAGINATION_MODEL) # type: ignore
|
||||
def list_rule_groups_namespaces(self, name: str, workspace_id: str) -> List[RuleGroupNamespace]: # type: ignore
|
||||
ws = self.describe_workspace(workspace_id)
|
||||
if name:
|
||||
return [
|
||||
|
@ -3,13 +3,14 @@ import json
|
||||
|
||||
from moto.core.responses import BaseResponse
|
||||
from .models import amp_backends, PrometheusServiceBackend
|
||||
from typing import Any
|
||||
from urllib.parse import unquote
|
||||
|
||||
|
||||
class PrometheusServiceResponse(BaseResponse):
|
||||
"""Handler for PrometheusService requests and responses."""
|
||||
|
||||
def tags(self, request, full_url, headers):
|
||||
def tags(self, request: Any, full_url: str, headers: Any) -> str: # type: ignore [return]
|
||||
self.setup_class(request, full_url, headers)
|
||||
if request.method == "GET":
|
||||
return self.list_tags_for_resource()
|
||||
@ -18,7 +19,7 @@ class PrometheusServiceResponse(BaseResponse):
|
||||
if request.method == "DELETE":
|
||||
return self.untag_resource()
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(service_name="amp")
|
||||
|
||||
@property
|
||||
@ -26,36 +27,36 @@ class PrometheusServiceResponse(BaseResponse):
|
||||
"""Return backend instance specific for this region."""
|
||||
return amp_backends[self.current_account][self.region]
|
||||
|
||||
def create_workspace(self):
|
||||
def create_workspace(self) -> str:
|
||||
params = json.loads(self.body)
|
||||
alias = params.get("alias")
|
||||
tags = params.get("tags")
|
||||
workspace = self.amp_backend.create_workspace(alias=alias, tags=tags)
|
||||
return json.dumps(dict(workspace.to_dict()))
|
||||
|
||||
def describe_workspace(self):
|
||||
def describe_workspace(self) -> str:
|
||||
workspace_id = self.path.split("/")[-1]
|
||||
workspace = self.amp_backend.describe_workspace(workspace_id=workspace_id)
|
||||
return json.dumps(dict(workspace=workspace.to_dict()))
|
||||
|
||||
def list_tags_for_resource(self):
|
||||
def list_tags_for_resource(self) -> str:
|
||||
resource_arn = unquote(self.path).split("tags/")[-1]
|
||||
tags = self.amp_backend.list_tags_for_resource(resource_arn=resource_arn)
|
||||
return json.dumps(dict(tags=tags))
|
||||
|
||||
def update_workspace_alias(self):
|
||||
def update_workspace_alias(self) -> str:
|
||||
params = json.loads(self.body)
|
||||
alias = params.get("alias")
|
||||
workspace_id = self.path.split("/")[-2]
|
||||
self.amp_backend.update_workspace_alias(alias=alias, workspace_id=workspace_id)
|
||||
return json.dumps(dict())
|
||||
|
||||
def delete_workspace(self):
|
||||
def delete_workspace(self) -> str:
|
||||
workspace_id = self.path.split("/")[-1]
|
||||
self.amp_backend.delete_workspace(workspace_id=workspace_id)
|
||||
return json.dumps(dict())
|
||||
|
||||
def list_workspaces(self):
|
||||
def list_workspaces(self) -> str:
|
||||
alias = self._get_param("alias")
|
||||
max_results = self._get_int_param("maxResults")
|
||||
next_token = self._get_param("nextToken")
|
||||
@ -66,20 +67,20 @@ class PrometheusServiceResponse(BaseResponse):
|
||||
{"nextToken": next_token, "workspaces": [w.to_dict() for w in workspaces]}
|
||||
)
|
||||
|
||||
def tag_resource(self):
|
||||
def tag_resource(self) -> str:
|
||||
params = json.loads(self.body)
|
||||
resource_arn = unquote(self.path).split("tags/")[-1]
|
||||
tags = params.get("tags")
|
||||
self.amp_backend.tag_resource(resource_arn=resource_arn, tags=tags)
|
||||
return json.dumps(dict())
|
||||
|
||||
def untag_resource(self):
|
||||
def untag_resource(self) -> str:
|
||||
resource_arn = unquote(self.path).split("tags/")[-1]
|
||||
tag_keys = self.querystring.get("tagKeys", [])
|
||||
self.amp_backend.untag_resource(resource_arn=resource_arn, tag_keys=tag_keys)
|
||||
return json.dumps(dict())
|
||||
|
||||
def create_rule_groups_namespace(self):
|
||||
def create_rule_groups_namespace(self) -> str:
|
||||
params = json.loads(self.body)
|
||||
data = params.get("data")
|
||||
name = params.get("name")
|
||||
@ -93,7 +94,7 @@ class PrometheusServiceResponse(BaseResponse):
|
||||
)
|
||||
return json.dumps(rule_group_namespace.to_dict())
|
||||
|
||||
def delete_rule_groups_namespace(self):
|
||||
def delete_rule_groups_namespace(self) -> str:
|
||||
name = unquote(self.path).split("/")[-1]
|
||||
workspace_id = unquote(self.path).split("/")[-3]
|
||||
self.amp_backend.delete_rule_groups_namespace(
|
||||
@ -102,7 +103,7 @@ class PrometheusServiceResponse(BaseResponse):
|
||||
)
|
||||
return json.dumps(dict())
|
||||
|
||||
def describe_rule_groups_namespace(self):
|
||||
def describe_rule_groups_namespace(self) -> str:
|
||||
name = unquote(self.path).split("/")[-1]
|
||||
workspace_id = unquote(self.path).split("/")[-3]
|
||||
ns = self.amp_backend.describe_rule_groups_namespace(
|
||||
@ -110,7 +111,7 @@ class PrometheusServiceResponse(BaseResponse):
|
||||
)
|
||||
return json.dumps(dict(ruleGroupsNamespace=ns.to_dict()))
|
||||
|
||||
def put_rule_groups_namespace(self):
|
||||
def put_rule_groups_namespace(self) -> str:
|
||||
params = json.loads(self.body)
|
||||
data = params.get("data")
|
||||
name = unquote(self.path).split("/")[-1]
|
||||
@ -122,7 +123,7 @@ class PrometheusServiceResponse(BaseResponse):
|
||||
)
|
||||
return json.dumps(ns.to_dict())
|
||||
|
||||
def list_rule_groups_namespaces(self):
|
||||
def list_rule_groups_namespaces(self) -> str:
|
||||
max_results = self._get_int_param("maxResults")
|
||||
next_token = self._get_param("nextToken")
|
||||
name = self._get_param("name")
|
||||
|
@ -1,6 +1,6 @@
|
||||
from werkzeug.exceptions import HTTPException
|
||||
from jinja2 import DictLoader, Environment
|
||||
from typing import Optional
|
||||
from typing import Any, Optional
|
||||
import json
|
||||
|
||||
# TODO: add "<Type>Sender</Type>" to error responses below?
|
||||
@ -80,7 +80,9 @@ class DryRunClientError(RESTError):
|
||||
|
||||
|
||||
class JsonRESTError(RESTError):
|
||||
def __init__(self, error_type, message, template="error_json", **kwargs) -> None:
|
||||
def __init__(
|
||||
self, error_type: str, message: str, template: str = "error_json", **kwargs: Any
|
||||
):
|
||||
super().__init__(error_type, message, template, **kwargs)
|
||||
self.description = json.dumps(
|
||||
{"__type": self.error_type, "message": self.message}
|
||||
|
@ -218,7 +218,9 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
|
||||
def dispatch(cls, *args, **kwargs):
|
||||
return cls()._dispatch(*args, **kwargs)
|
||||
|
||||
def setup_class(self, request, full_url, headers, use_raw_body=False) -> None:
|
||||
def setup_class(
|
||||
self, request: Any, full_url: str, headers: Any, use_raw_body: bool = False
|
||||
) -> None:
|
||||
"""
|
||||
use_raw_body: Use incoming bytes if True, encode to string otherwise
|
||||
"""
|
||||
|
@ -175,7 +175,7 @@ def str_to_rfc_1123_datetime(value):
|
||||
return datetime.datetime.strptime(value, RFC1123)
|
||||
|
||||
|
||||
def unix_time(dt=None):
|
||||
def unix_time(dt: datetime.datetime = None) -> datetime.datetime:
|
||||
dt = dt or datetime.datetime.utcnow()
|
||||
epoch = datetime.datetime.utcfromtimestamp(0)
|
||||
delta = dt - epoch
|
||||
|
@ -1,17 +1,20 @@
|
||||
"""Tag functionality contained in class TaggingService."""
|
||||
import re
|
||||
from typing import Dict, List
|
||||
|
||||
|
||||
class TaggingService:
|
||||
"""Functionality related to tags, i.e., adding, deleting, testing."""
|
||||
|
||||
def __init__(self, tag_name="Tags", key_name="Key", value_name="Value"):
|
||||
def __init__(
|
||||
self, tag_name: str = "Tags", key_name: str = "Key", value_name: str = "Value"
|
||||
):
|
||||
self.tag_name = tag_name
|
||||
self.key_name = key_name
|
||||
self.value_name = value_name
|
||||
self.tags = {}
|
||||
self.tags: Dict[str, str] = {}
|
||||
|
||||
def get_tag_dict_for_resource(self, arn):
|
||||
def get_tag_dict_for_resource(self, arn: str) -> Dict[str, str]:
|
||||
"""Return dict of key/value pairs vs. list of key/values dicts."""
|
||||
result = {}
|
||||
if self.has_tags(arn):
|
||||
@ -19,7 +22,7 @@ class TaggingService:
|
||||
result[key] = val
|
||||
return result
|
||||
|
||||
def list_tags_for_resource(self, arn):
|
||||
def list_tags_for_resource(self, arn: str) -> List[Dict[str, str]]:
|
||||
"""Return list of tags inside dict with key of "tag_name".
|
||||
|
||||
Useful for describe functions; this return value can be added to
|
||||
@ -31,16 +34,16 @@ class TaggingService:
|
||||
result.append({self.key_name: key, self.value_name: val})
|
||||
return {self.tag_name: result}
|
||||
|
||||
def delete_all_tags_for_resource(self, arn):
|
||||
def delete_all_tags_for_resource(self, arn: str) -> None:
|
||||
"""Delete all tags associated with given ARN."""
|
||||
if self.has_tags(arn):
|
||||
del self.tags[arn]
|
||||
|
||||
def has_tags(self, arn):
|
||||
def has_tags(self, arn: str) -> bool:
|
||||
"""Return True if the ARN has any associated tags, False otherwise."""
|
||||
return arn in self.tags
|
||||
|
||||
def tag_resource(self, arn, tags):
|
||||
def tag_resource(self, arn: str, tags: List[Dict[str, str]]) -> None:
|
||||
"""Store associated list of dicts with ARN.
|
||||
|
||||
Note: the storage is internal to this class instance.
|
||||
@ -55,7 +58,7 @@ class TaggingService:
|
||||
else:
|
||||
self.tags[arn][tag[self.key_name]] = None
|
||||
|
||||
def copy_tags(self, from_arn, to_arn):
|
||||
def copy_tags(self, from_arn: str, to_arn: str) -> None:
|
||||
"""Copy stored list of tags associated with one ARN to another ARN.
|
||||
|
||||
Note: the storage is internal to this class instance.
|
||||
@ -65,13 +68,13 @@ class TaggingService:
|
||||
to_arn, self.list_tags_for_resource(from_arn)[self.tag_name]
|
||||
)
|
||||
|
||||
def untag_resource_using_names(self, arn, tag_names):
|
||||
def untag_resource_using_names(self, arn: str, tag_names: List[str]) -> None:
|
||||
"""Remove tags associated with ARN using key names in 'tag_names'."""
|
||||
for name in tag_names:
|
||||
if name in self.tags.get(arn, {}):
|
||||
del self.tags[arn][name]
|
||||
|
||||
def untag_resource_using_tags(self, arn, tags):
|
||||
def untag_resource_using_tags(self, arn: str, tags: List[Dict[str, str]]) -> None:
|
||||
"""Remove tags associated with ARN using key/value pairs in 'tags'."""
|
||||
current_tags = self.tags.get(arn, {})
|
||||
for tag in tags:
|
||||
@ -83,7 +86,7 @@ class TaggingService:
|
||||
# If both key and value are provided, match both before deletion
|
||||
del current_tags[tag[self.key_name]]
|
||||
|
||||
def extract_tag_names(self, tags):
|
||||
def extract_tag_names(self, tags: Dict[str, str]) -> None:
|
||||
"""Return list of key names in list of 'tags' key/value dicts."""
|
||||
results = []
|
||||
if len(tags) == 0:
|
||||
@ -93,7 +96,7 @@ class TaggingService:
|
||||
results.append(tag[self.key_name])
|
||||
return results
|
||||
|
||||
def flatten_tag_list(self, tags):
|
||||
def flatten_tag_list(self, tags: List[Dict[str, str]]) -> Dict[str, str]:
|
||||
"""Return dict of key/value pairs with 'tag_name', 'value_name'."""
|
||||
result = {}
|
||||
for tag in tags:
|
||||
@ -168,7 +171,7 @@ class TaggingService:
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def convert_dict_to_tags_input(tags):
|
||||
def convert_dict_to_tags_input(tags: Dict[str, str]) -> List[Dict[str, str]]:
|
||||
"""Given a dictionary, return generic boto params for tags"""
|
||||
if not tags:
|
||||
return []
|
||||
|
Loading…
Reference in New Issue
Block a user