TechDebt: Introduce MyPy typing annotations (#5535)

This commit is contained in:
Bert Blommers 2022-10-06 08:37:35 +00:00 committed by GitHub
parent 951a271b49
commit 26412e1c3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 250 additions and 149 deletions

View File

@ -27,6 +27,8 @@ lint:
black --check moto/ tests/
@echo "Running pylint..."
pylint -j 0 moto tests
@echo "Running MyPy..."
mypy --install-types --non-interactive moto/applicationautoscaling/
format:
black moto/ tests/

View File

@ -2,5 +2,5 @@ from moto.core.exceptions import JsonRESTError
class AWSValidationException(JsonRESTError):
def __init__(self, message, **kwargs):
super().__init__("ValidationException", message, **kwargs)
def __init__(self, message: str) -> None:
super().__init__("ValidationException", message)

View File

@ -5,6 +5,7 @@ from moto.moto_api._internal import mock_random
from .exceptions import AWSValidationException
from collections import OrderedDict
from enum import Enum, unique
from typing import Dict, List, Union, Optional, Tuple
import time
@ -63,21 +64,25 @@ class ScalableDimensionValueSet(Enum):
class ApplicationAutoscalingBackend(BaseBackend):
def __init__(self, region_name, account_id):
def __init__(self, region_name: str, account_id: str) -> None:
super().__init__(region_name, account_id)
self.ecs_backend = ecs_backends[account_id][region_name]
self.targets = OrderedDict()
self.policies = {}
self.scheduled_actions = list()
self.targets: Dict[str, Dict[str, FakeScalableTarget]] = OrderedDict()
self.policies: Dict[str, FakeApplicationAutoscalingPolicy] = {}
self.scheduled_actions: List[FakeScheduledAction] = list()
@staticmethod
def default_vpc_endpoint_service(service_region, zones):
def default_vpc_endpoint_service(
service_region: str, zones: List[str]
) -> List[Dict[str, str]]:
"""Default VPC endpoint service."""
return BaseBackend.default_vpc_endpoint_service_factory(
service_region, zones, "application-autoscaling"
)
def describe_scalable_targets(self, namespace, r_ids=None, dimension=None):
def describe_scalable_targets(
self, namespace: str, r_ids: Union[None, List[str]], dimension: Union[None, str]
) -> List["FakeScalableTarget"]:
"""Describe scalable targets."""
if r_ids is None:
r_ids = []
@ -88,7 +93,7 @@ class ApplicationAutoscalingBackend(BaseBackend):
targets = [t for t in targets if t.resource_id in r_ids]
return targets
def _flatten_scalable_targets(self, namespace):
def _flatten_scalable_targets(self, namespace: str) -> List["FakeScalableTarget"]:
"""Flatten scalable targets for a given service namespace down to a list."""
targets = []
for dimension in self.targets.keys():
@ -97,23 +102,41 @@ class ApplicationAutoscalingBackend(BaseBackend):
targets = [t for t in targets if t.service_namespace == namespace]
return targets
def register_scalable_target(self, namespace, r_id, dimension, **kwargs):
def register_scalable_target(
self,
namespace: str,
r_id: str,
dimension: str,
min_capacity: Optional[int],
max_capacity: Optional[int],
role_arn: str,
suspended_state: str,
) -> "FakeScalableTarget":
"""Registers or updates a scalable target."""
_ = _target_params_are_valid(namespace, r_id, dimension)
if namespace == ServiceNamespaceValueSet.ECS.value:
_ = self._ecs_service_exists_for_target(r_id)
if self._scalable_target_exists(r_id, dimension):
target = self.targets[dimension][r_id]
target.update(**kwargs)
target.update(min_capacity, max_capacity, suspended_state)
else:
target = FakeScalableTarget(self, namespace, r_id, dimension, **kwargs)
target = FakeScalableTarget(
self,
namespace,
r_id,
dimension,
min_capacity,
max_capacity,
role_arn,
suspended_state,
)
self._add_scalable_target(target)
return target
def _scalable_target_exists(self, r_id, dimension):
def _scalable_target_exists(self, r_id: str, dimension: str) -> bool:
return r_id in self.targets.get(dimension, [])
def _ecs_service_exists_for_target(self, r_id):
def _ecs_service_exists_for_target(self, r_id: str) -> bool:
"""Raises a ValidationException if an ECS service does not exist
for the specified resource ID.
"""
@ -123,14 +146,18 @@ class ApplicationAutoscalingBackend(BaseBackend):
raise AWSValidationException("ECS service doesn't exist: {}".format(r_id))
return True
def _add_scalable_target(self, target):
def _add_scalable_target(
self, target: "FakeScalableTarget"
) -> "FakeScalableTarget":
if target.scalable_dimension not in self.targets:
self.targets[target.scalable_dimension] = OrderedDict()
if target.resource_id not in self.targets[target.scalable_dimension]:
self.targets[target.scalable_dimension][target.resource_id] = target
return target
def deregister_scalable_target(self, namespace, r_id, dimension):
def deregister_scalable_target(
self, namespace: str, r_id: str, dimension: str
) -> None:
"""Registers or updates a scalable target."""
if self._scalable_target_exists(r_id, dimension):
del self.targets[dimension][r_id]
@ -143,13 +170,13 @@ class ApplicationAutoscalingBackend(BaseBackend):
def put_scaling_policy(
self,
policy_name,
service_namespace,
resource_id,
scalable_dimension,
policy_body,
policy_type=None,
):
policy_name: str,
service_namespace: str,
resource_id: str,
scalable_dimension: str,
policy_body: str,
policy_type: Optional[None],
) -> "FakeApplicationAutoscalingPolicy":
policy_key = FakeApplicationAutoscalingPolicy.formulate_key(
service_namespace, resource_id, scalable_dimension, policy_name
)
@ -177,21 +204,20 @@ class ApplicationAutoscalingBackend(BaseBackend):
self.policies[policy_key] = policy
return policy
def describe_scaling_policies(self, service_namespace, **kwargs):
policy_names = kwargs.get("policy_names")
resource_id = kwargs.get("resource_id")
scalable_dimension = kwargs.get("scalable_dimension")
max_results = kwargs.get("max_results") or 100
next_token = kwargs.get("next_token")
def describe_scaling_policies(
self,
service_namespace: str,
resource_id: str,
scalable_dimension: str,
max_results: Optional[int],
next_token: str,
) -> Tuple[Optional[str], List["FakeApplicationAutoscalingPolicy"]]:
max_results = max_results or 100
policies = [
policy
for policy in self.policies.values()
if policy.service_namespace == service_namespace
]
if policy_names:
policies = [
policy for policy in policies if policy.policy_name in policy_names
]
if resource_id:
policies = [
policy for policy in policies if policy.resource_id in resource_id
@ -209,14 +235,17 @@ class ApplicationAutoscalingBackend(BaseBackend):
return new_next_token, policies_page
def delete_scaling_policy(
self, policy_name, service_namespace, resource_id, scalable_dimension
):
self,
policy_name: str,
service_namespace: str,
resource_id: str,
scalable_dimension: str,
) -> None:
policy_key = FakeApplicationAutoscalingPolicy.formulate_key(
service_namespace, resource_id, scalable_dimension, policy_name
)
if policy_key in self.policies:
del self.policies[policy_key]
return {}
else:
raise AWSValidationException(
"No scaling policy found for service namespace: {}, resource ID: {}, scalable dimension: {}, policy name: {}".format(
@ -225,8 +254,12 @@ class ApplicationAutoscalingBackend(BaseBackend):
)
def delete_scheduled_action(
self, service_namespace, scheduled_action_name, resource_id, scalable_dimension
):
self,
service_namespace: str,
scheduled_action_name: str,
resource_id: str,
scalable_dimension: str,
) -> None:
self.scheduled_actions = [
a
for a in self.scheduled_actions
@ -239,8 +272,12 @@ class ApplicationAutoscalingBackend(BaseBackend):
]
def describe_scheduled_actions(
self, scheduled_action_names, service_namespace, resource_id, scalable_dimension
):
self,
scheduled_action_names: str,
service_namespace: str,
resource_id: str,
scalable_dimension: str,
) -> List["FakeScheduledAction"]:
"""
Pagination is not yet implemented
"""
@ -261,16 +298,16 @@ class ApplicationAutoscalingBackend(BaseBackend):
def put_scheduled_action(
self,
service_namespace,
schedule,
timezone,
scheduled_action_name,
resource_id,
scalable_dimension,
start_time,
end_time,
scalable_target_action,
):
service_namespace: str,
schedule: str,
timezone: str,
scheduled_action_name: str,
resource_id: str,
scalable_dimension: str,
start_time: str,
end_time: str,
scalable_target_action: str,
) -> None:
existing_action = next(
(
a
@ -307,7 +344,7 @@ class ApplicationAutoscalingBackend(BaseBackend):
self.scheduled_actions.append(action)
def _target_params_are_valid(namespace, r_id, dimension):
def _target_params_are_valid(namespace: str, r_id: str, dimension: str) -> bool:
"""Check whether namespace, resource_id and dimension are valid and consistent with each other."""
is_valid = True
valid_namespaces = [n.value for n in ServiceNamespaceValueSet]
@ -337,7 +374,7 @@ def _target_params_are_valid(namespace, r_id, dimension):
return is_valid
def _get_resource_type_from_resource_id(resource_id):
def _get_resource_type_from_resource_id(resource_id: str) -> str:
# AWS Application Autoscaling resource_ids are multi-component (path-like) identifiers that vary in format,
# depending on the type of resource it identifies. resource_type is one of its components.
# resource_id format variations are described in
@ -366,38 +403,51 @@ def _get_resource_type_from_resource_id(resource_id):
class FakeScalableTarget(BaseModel):
def __init__(
self, backend, service_namespace, resource_id, scalable_dimension, **kwargs
):
self,
backend: ApplicationAutoscalingBackend,
service_namespace: str,
resource_id: str,
scalable_dimension: str,
min_capacity: Optional[int],
max_capacity: Optional[int],
role_arn: str,
suspended_state: str,
) -> None:
self.applicationautoscaling_backend = backend
self.service_namespace = service_namespace
self.resource_id = resource_id
self.scalable_dimension = scalable_dimension
self.min_capacity = kwargs["min_capacity"]
self.max_capacity = kwargs["max_capacity"]
self.role_arn = kwargs["role_arn"]
self.suspended_state = kwargs["suspended_state"]
self.min_capacity = min_capacity
self.max_capacity = max_capacity
self.role_arn = role_arn
self.suspended_state = suspended_state
self.creation_time = time.time()
def update(self, **kwargs):
if kwargs["min_capacity"] is not None:
self.min_capacity = kwargs["min_capacity"]
if kwargs["max_capacity"] is not None:
self.max_capacity = kwargs["max_capacity"]
if kwargs["suspended_state"] is not None:
self.suspended_state = kwargs["suspended_state"]
def update(
self,
min_capacity: Optional[int],
max_capacity: Optional[int],
suspended_state: str,
) -> None:
if min_capacity is not None:
self.min_capacity = min_capacity
if max_capacity is not None:
self.max_capacity = max_capacity
if suspended_state is not None:
self.suspended_state = suspended_state
class FakeApplicationAutoscalingPolicy(BaseModel):
def __init__(
self,
region_name,
policy_name,
service_namespace,
resource_id,
scalable_dimension,
policy_type,
policy_body,
):
region_name: str,
policy_name: str,
service_namespace: str,
resource_id: str,
scalable_dimension: str,
policy_type: Optional[str],
policy_body: str,
) -> None:
self.step_scaling_policy_configuration = None
self.target_tracking_scaling_policy_configuration = None
@ -429,7 +479,12 @@ class FakeApplicationAutoscalingPolicy(BaseModel):
self.creation_time = time.time()
@staticmethod
def formulate_key(service_namespace, resource_id, scalable_dimension, policy_name):
def formulate_key(
service_namespace: str,
resource_id: str,
scalable_dimension: str,
policy_name: str,
) -> str:
return "{}\t{}\t{}\t{}".format(
service_namespace, resource_id, scalable_dimension, policy_name
)
@ -438,18 +493,18 @@ class FakeApplicationAutoscalingPolicy(BaseModel):
class FakeScheduledAction(BaseModel):
def __init__(
self,
service_namespace,
schedule,
timezone,
scheduled_action_name,
resource_id,
scalable_dimension,
start_time,
end_time,
scalable_target_action,
account_id,
region,
):
service_namespace: str,
schedule: str,
timezone: str,
scheduled_action_name: str,
resource_id: str,
scalable_dimension: str,
start_time: str,
end_time: str,
scalable_target_action: str,
account_id: str,
region: str,
) -> None:
self.arn = f"arn:aws:autoscaling:{region}:{account_id}:scheduledAction:{service_namespace}:scheduledActionName/{scheduled_action_name}"
self.service_namespace = service_namespace
self.schedule = schedule
@ -464,13 +519,13 @@ class FakeScheduledAction(BaseModel):
def update(
self,
schedule,
timezone,
scheduled_action_name,
start_time,
end_time,
scalable_target_action,
):
schedule: str,
timezone: str,
scheduled_action_name: str,
start_time: str,
end_time: str,
scalable_target_action: str,
) -> None:
if scheduled_action_name:
self.scheduled_action_name = scheduled_action_name
if schedule:

View File

@ -1,27 +1,32 @@
from moto.core.responses import BaseResponse
from typing import Any, Dict
import json
from .models import (
applicationautoscaling_backends,
ScalableDimensionValueSet,
ServiceNamespaceValueSet,
ApplicationAutoscalingBackend,
FakeScalableTarget,
FakeApplicationAutoscalingPolicy,
FakeScheduledAction,
)
from .exceptions import AWSValidationException
class ApplicationAutoScalingResponse(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="application-autoscaling")
@property
def applicationautoscaling_backend(self):
def applicationautoscaling_backend(self) -> ApplicationAutoscalingBackend:
return applicationautoscaling_backends[self.current_account][self.region]
def describe_scalable_targets(self):
def describe_scalable_targets(self) -> str:
self._validate_params()
service_namespace = self._get_param("ServiceNamespace")
resource_ids = self._get_param("ResourceIds")
scalable_dimension = self._get_param("ScalableDimension")
max_results = self._get_int_param("MaxResults", 50)
max_results = self._get_int_param("MaxResults") or 50
marker = self._get_param("NextToken")
all_scalable_targets = (
self.applicationautoscaling_backend.describe_scalable_targets(
@ -36,7 +41,7 @@ class ApplicationAutoScalingResponse(BaseResponse):
targets = [_build_target(t) for t in scalable_targets_resp]
return json.dumps({"ScalableTargets": targets, "NextToken": next_token})
def register_scalable_target(self):
def register_scalable_target(self) -> str:
"""Registers or updates a scalable target."""
self._validate_params()
self.applicationautoscaling_backend.register_scalable_target(
@ -50,7 +55,7 @@ class ApplicationAutoScalingResponse(BaseResponse):
)
return json.dumps({})
def deregister_scalable_target(self):
def deregister_scalable_target(self) -> str:
"""Deregisters a scalable target."""
self._validate_params()
self.applicationautoscaling_backend.deregister_scalable_target(
@ -60,7 +65,7 @@ class ApplicationAutoScalingResponse(BaseResponse):
)
return json.dumps({})
def put_scaling_policy(self):
def put_scaling_policy(self) -> str:
policy = self.applicationautoscaling_backend.put_scaling_policy(
policy_name=self._get_param("PolicyName"),
service_namespace=self._get_param("ServiceNamespace"),
@ -74,7 +79,7 @@ class ApplicationAutoScalingResponse(BaseResponse):
)
return json.dumps({"PolicyARN": policy.policy_arn, "Alarms": []}) # ToDo
def describe_scaling_policies(self):
def describe_scaling_policies(self) -> str:
(
next_token,
policy_page,
@ -82,15 +87,16 @@ class ApplicationAutoScalingResponse(BaseResponse):
service_namespace=self._get_param("ServiceNamespace"),
resource_id=self._get_param("ResourceId"),
scalable_dimension=self._get_param("ScalableDimension"),
max_results=self._get_param("MaxResults"),
max_results=self._get_int_param("MaxResults"),
next_token=self._get_param("NextToken"),
)
response_obj = {"ScalingPolicies": [_build_policy(p) for p in policy_page]}
if next_token:
response_obj["NextToken"] = next_token
response_obj = {
"ScalingPolicies": [_build_policy(p) for p in policy_page],
"NextToken": next_token,
}
return json.dumps(response_obj)
def delete_scaling_policy(self):
def delete_scaling_policy(self) -> str:
self.applicationautoscaling_backend.delete_scaling_policy(
policy_name=self._get_param("PolicyName"),
service_namespace=self._get_param("ServiceNamespace"),
@ -99,7 +105,7 @@ class ApplicationAutoScalingResponse(BaseResponse):
)
return json.dumps({})
def _validate_params(self):
def _validate_params(self) -> None:
"""Validate parameters.
TODO Integrate this validation with the validation in models.py
"""
@ -130,7 +136,7 @@ class ApplicationAutoScalingResponse(BaseResponse):
if message:
raise AWSValidationException(message)
def delete_scheduled_action(self):
def delete_scheduled_action(self) -> str:
params = json.loads(self.body)
service_namespace = params.get("ServiceNamespace")
scheduled_action_name = params.get("ScheduledActionName")
@ -144,7 +150,7 @@ class ApplicationAutoScalingResponse(BaseResponse):
)
return json.dumps(dict())
def put_scheduled_action(self):
def put_scheduled_action(self) -> str:
params = json.loads(self.body)
service_namespace = params.get("ServiceNamespace")
schedule = params.get("Schedule")
@ -168,7 +174,7 @@ class ApplicationAutoScalingResponse(BaseResponse):
)
return json.dumps(dict())
def describe_scheduled_actions(self):
def describe_scheduled_actions(self) -> str:
params = json.loads(self.body)
scheduled_action_names = params.get("ScheduledActionNames")
service_namespace = params.get("ServiceNamespace")
@ -188,7 +194,7 @@ class ApplicationAutoScalingResponse(BaseResponse):
return json.dumps(response_obj)
def _build_target(t):
def _build_target(t: FakeScalableTarget) -> Dict[str, Any]:
return {
"CreationTime": t.creation_time,
"ServiceNamespace": t.service_namespace,
@ -201,7 +207,7 @@ def _build_target(t):
}
def _build_policy(p):
def _build_policy(p: FakeApplicationAutoscalingPolicy) -> Dict[str, Any]:
response = {
"PolicyARN": p.policy_arn,
"PolicyName": p.policy_name,
@ -220,7 +226,7 @@ def _build_policy(p):
return response
def _build_scheduled_action(a):
def _build_scheduled_action(a: FakeScheduledAction) -> Dict[str, Any]:
response = {
"ScheduledActionName": a.scheduled_action_name,
"ScheduledActionARN": a.arn,

View File

@ -1,7 +1,7 @@
from urllib.parse import urlparse
def region_from_applicationautoscaling_url(url):
def region_from_applicationautoscaling_url(url: str) -> str:
domain = urlparse(url).netloc
if "." in domain:

View File

@ -1,6 +1,7 @@
import re
import string
from collections import defaultdict
from typing import List, Dict
from .utils import convert_regex_to_flask_path
@ -21,7 +22,7 @@ class InstanceTrackerMeta(type):
class BaseBackend:
def __init__(self, region_name, account_id=None):
def __init__(self, region_name, account_id=None) -> None:
self.region_name = region_name
self.account_id = account_id
@ -127,7 +128,7 @@ class BaseBackend:
special_service_name="",
policy_supported=True,
base_endpoint_dns_names=None,
): # pylint: disable=too-many-arguments
) -> List[Dict[str, str]]: # pylint: disable=too-many-arguments
"""List of dicts representing default VPC endpoints for this service."""
if special_service_name:
service_name = f"com.amazonaws.{service_region}.{special_service_name}"

View File

@ -79,14 +79,14 @@ class DryRunClientError(RESTError):
class JsonRESTError(RESTError):
def __init__(self, error_type, message, template="error_json", **kwargs):
def __init__(self, error_type, message, template="error_json", **kwargs) -> None:
super().__init__(error_type, message, template, **kwargs)
self.description = json.dumps(
{"__type": self.error_type, "message": self.message}
)
self.content_type = "application/json"
def get_body(self, *args, **kwargs):
def get_body(self, *args, **kwargs) -> str:
return self.description

View File

@ -14,6 +14,7 @@ from botocore.config import Config
from botocore.handlers import BUILTIN_HANDLERS
from moto import settings
from moto.core.utils import BackendDict
from .botocore_stubber import BotocoreStubber
from .custom_responses_mock import (
get_response_mock,
@ -390,7 +391,7 @@ class ServerModeMockAWS(BaseMockAWS):
class base_decorator:
mock_backend = MockAWS
def __init__(self, backends):
def __init__(self, backends: BackendDict):
self.backends = backends
def __call__(self, func=None):

View File

@ -1,28 +1,23 @@
import boto3
import functools
from collections import defaultdict
import datetime
import json
import logging
import os
import re
import requests
import pytz
from moto.core.exceptions import DryRunClientError
from jinja2 import Environment, DictLoader
from urllib.parse import parse_qs, parse_qsl, urlparse
import xmltodict
from werkzeug.exceptions import HTTPException
import boto3
from collections import OrderedDict
from collections import defaultdict, OrderedDict
from moto import settings
from moto.core.exceptions import DryRunClientError
from moto.core.utils import camelcase_to_underscores, method_names_from_class
from moto.utilities.utils import load_resource
from moto import settings
from jinja2 import Environment, DictLoader
from typing import Dict, List, Union, Any, Optional
from urllib.parse import parse_qs, parse_qsl, urlparse
from werkzeug.exceptions import HTTPException
log = logging.getLogger(__name__)
@ -208,7 +203,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
)
aws_service_spec = None
def __init__(self, service_name=None):
def __init__(self, service_name=None) -> None:
super().__init__()
self.service_name = service_name
@ -216,7 +211,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
def dispatch(cls, *args, **kwargs):
return cls()._dispatch(*args, **kwargs)
def setup_class(self, request, full_url, headers, use_raw_body=False):
def setup_class(self, request, full_url, headers, use_raw_body=False) -> None:
"""
use_raw_body: Use incoming bytes if True, encode to string otherwise
"""
@ -473,7 +468,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
headers["status"] = str(headers["status"])
return status, headers, body
def _get_param(self, param_name, if_none=None):
def _get_param(self, param_name, if_none=None) -> Any:
val = self.querystring.get(param_name)
if val is not None:
return val[0]
@ -495,13 +490,13 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
pass
return if_none
def _get_int_param(self, param_name, if_none=None):
def _get_int_param(self, param_name, if_none: int = None) -> Optional[int]:
val = self._get_param(param_name)
if val is not None:
return int(val)
return if_none
def _get_bool_param(self, param_name, if_none=None):
def _get_bool_param(self, param_name, if_none: bool = None) -> Optional[bool]:
val = self._get_param(param_name)
if val is not None:
val = str(val)
@ -511,7 +506,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return False
return if_none
def _get_multi_param_dict(self, param_prefix):
def _get_multi_param_dict(self, param_prefix) -> Dict:
return self._get_multi_param_helper(param_prefix, skip_result_conversion=True)
def _get_multi_param_helper(
@ -582,7 +577,9 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return value_dict
def _get_multi_param(self, param_prefix, skip_result_conversion=False):
def _get_multi_param(
self, param_prefix, skip_result_conversion=False
) -> List[Union[str, Dict]]:
"""
Given a querystring of ?LaunchConfigurationNames.member.1=my-test-1&LaunchConfigurationNames.member.2=my-test-2
this will return ['my-test-1', 'my-test-2']
@ -605,7 +602,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return values
def _get_dict_param(self, param_prefix):
def _get_dict_param(self, param_prefix) -> Dict:
"""
Given a parameter dict of
{
@ -627,7 +624,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
]
return params
def _get_params(self):
def _get_params(self) -> Dict[str, Union[str, List, Dict]]:
"""
Given a querystring of
{

View File

@ -7,6 +7,7 @@ from botocore.exceptions import ClientError
from boto3 import Session
from moto.settings import allow_unknown_region
from threading import RLock
from typing import Any, Optional, List
from urllib.parse import urlparse
from uuid import uuid4
@ -382,7 +383,11 @@ class BackendDict(dict):
"""
def __init__(
self, backend, service_name, use_boto3_regions=True, additional_regions=None
self,
backend: Any,
service_name: str,
use_boto3_regions: bool = True,
additional_regions: Optional[List[str]] = None,
):
self.backend = backend
self.service_name = service_name

View File

@ -1,6 +1,6 @@
from random import Random
import string
import uuid
from uuid import UUID
HEX_CHARS = list(range(10)) + ["a", "b", "c", "d", "e", "f"]
@ -13,16 +13,18 @@ class MotoRandom(Random):
This Singleton can be seeded to make identifiers deterministic.
"""
def uuid1(self):
return uuid.UUID(int=self.getrandbits(128), version=1)
def uuid1(self) -> UUID:
return UUID(int=self.getrandbits(128), version=1)
def uuid4(self):
return uuid.UUID(int=self.getrandbits(128), version=4)
def uuid4(self) -> UUID:
return UUID(int=self.getrandbits(128), version=4)
def get_random_hex(self, length=8):
def get_random_hex(self, length: int = 8) -> str:
return "".join(str(self.choice(HEX_CHARS)) for _ in range(length))
def get_random_string(self, length=20, include_digits=True, lower_case=False):
def get_random_string(
self, length: int = 20, include_digits: bool = True, lower_case: bool = False
) -> str:
pool = string.ascii_letters
if include_digits:
pool += string.digits

View File

@ -6,5 +6,6 @@ flake8==4.0.1
click
inflection
lxml
mypy
packaging
prompt_toolkit

View File

@ -16,3 +16,34 @@ ignore-paths=moto/packages
disable = W,C,R,E
# future sensible checks = super-init-not-called, unspecified-encoding, undefined-loop-variable
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
exclude = tests
show_column_numbers=True
show_error_codes = True
disallow_any_unimported=False
disallow_any_expr=False
disallow_any_decorated=True
disallow_any_explicit=False
disallow_any_generics=True
disallow_subclassing_any=True
disallow_untyped_calls=True
disallow_untyped_defs=True
disallow_incomplete_defs=True
check_untyped_defs=True
disallow_untyped_decorators=True
no_implicit_optional=True
strict_optional=True
warn_redundant_casts=True
warn_unused_ignores=True
warn_no_return=True
warn_return_any=False
warn_unreachable=False
strict_equality=True
ignore_missing_imports=True
follow_imports=silent