diff --git a/moto/core/common_types.py b/moto/core/common_types.py index 3f04dfbf6..0e1741721 100644 --- a/moto/core/common_types.py +++ b/moto/core/common_types.py @@ -1,5 +1,5 @@ -from typing import Dict, Tuple, TypeVar, Union +from typing import Any, Dict, Tuple, TypeVar, Union -TYPE_RESPONSE = Tuple[int, Dict[str, str], Union[str, bytes]] +TYPE_RESPONSE = Tuple[int, Dict[str, Any], Union[str, bytes]] TYPE_IF_NONE = TypeVar("TYPE_IF_NONE") diff --git a/moto/elastictranscoder/models.py b/moto/elastictranscoder/models.py index 53bb42526..d836c4c98 100644 --- a/moto/elastictranscoder/models.py +++ b/moto/elastictranscoder/models.py @@ -1,3 +1,4 @@ +from typing import Any, Dict, List, Tuple from moto.core import BaseBackend, BackendDict, BaseModel from moto.moto_api._internal import mock_random as random import string @@ -6,14 +7,14 @@ import string class Pipeline(BaseModel): def __init__( self, - account_id, - region, - name, - input_bucket, - output_bucket, - role, - content_config, - thumbnail_config, + account_id: str, + region: str, + name: str, + input_bucket: str, + output_bucket: str, + role: str, + content_config: Dict[str, Any], + thumbnail_config: Dict[str, Any], ): a = "".join(random.choice(string.digits) for _ in range(13)) b = "".join(random.choice(string.ascii_lowercase) for _ in range(6)) @@ -31,7 +32,7 @@ class Pipeline(BaseModel): if "Permissions" not in self.thumbnail_config: self.thumbnail_config["Permissions"] = [] - def update(self, name, input_bucket, role): + def update(self, name: str, input_bucket: str, role: str) -> None: if name: self.name = name if input_bucket: @@ -39,7 +40,7 @@ class Pipeline(BaseModel): if role: self.role = role - def to_dict(self): + def to_dict(self) -> Dict[str, Any]: return { "Id": self.id, "Name": self.name, @@ -60,19 +61,19 @@ class Pipeline(BaseModel): class ElasticTranscoderBackend(BaseBackend): - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.pipelines = {} + self.pipelines: Dict[str, Pipeline] = {} def create_pipeline( self, - name, - input_bucket, - output_bucket, - role, - content_config, - thumbnail_config, - ): + name: str, + input_bucket: str, + output_bucket: str, + role: str, + content_config: Dict[str, Any], + thumbnail_config: Dict[str, Any], + ) -> Tuple[Pipeline, List[str]]: """ The following parameters are not yet implemented: AWSKMSKeyArn, Notifications @@ -88,26 +89,28 @@ class ElasticTranscoderBackend(BaseBackend): thumbnail_config, ) self.pipelines[pipeline.id] = pipeline - warnings = [] + warnings: List[str] = [] return pipeline, warnings - def list_pipelines(self): + def list_pipelines(self) -> List[Dict[str, Any]]: return [p.to_dict() for _, p in self.pipelines.items()] - def read_pipeline(self, pipeline_id): + def read_pipeline(self, pipeline_id: str) -> Pipeline: return self.pipelines[pipeline_id] - def update_pipeline(self, pipeline_id, name, input_bucket, role): + def update_pipeline( + self, pipeline_id: str, name: str, input_bucket: str, role: str + ) -> Tuple[Pipeline, List[str]]: """ The following parameters are not yet implemented: AWSKMSKeyArn, Notifications, ContentConfig, ThumbnailConfig """ pipeline = self.read_pipeline(pipeline_id) pipeline.update(name, input_bucket, role) - warnings = [] + warnings: List[str] = [] return pipeline, warnings - def delete_pipeline(self, pipeline_id): + def delete_pipeline(self, pipeline_id: str) -> None: self.pipelines.pop(pipeline_id) diff --git a/moto/elastictranscoder/responses.py b/moto/elastictranscoder/responses.py index 476f1eef8..f2aa81091 100644 --- a/moto/elastictranscoder/responses.py +++ b/moto/elastictranscoder/responses.py @@ -1,18 +1,20 @@ +from typing import Any, Optional +from moto.core.common_types import TYPE_RESPONSE from moto.core.responses import BaseResponse -from .models import elastictranscoder_backends +from .models import elastictranscoder_backends, ElasticTranscoderBackend import json import re class ElasticTranscoderResponse(BaseResponse): - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="elastictranscoder") @property - def elastictranscoder_backend(self): + def elastictranscoder_backend(self) -> ElasticTranscoderBackend: return elastictranscoder_backends[self.current_account][self.region] - def pipelines(self, request, full_url, headers): + def pipelines(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: self.setup_class(request, full_url, headers) if request.method == "POST": return self.create_pipeline() @@ -21,7 +23,9 @@ class ElasticTranscoderResponse(BaseResponse): else: return self.update_pipeline() - def individual_pipeline(self, request, full_url, headers): + def individual_pipeline( + self, request: Any, full_url: str, headers: Any + ) -> TYPE_RESPONSE: self.setup_class(request, full_url, headers) if request.method == "GET": return self.read_pipeline() @@ -30,7 +34,7 @@ class ElasticTranscoderResponse(BaseResponse): else: return self.update_pipeline() - def create_pipeline(self): + def create_pipeline(self) -> TYPE_RESPONSE: name = self._get_param("Name") input_bucket = self._get_param("InputBucket") output_bucket = self._get_param("OutputBucket") @@ -65,14 +69,14 @@ class ElasticTranscoderResponse(BaseResponse): json.dumps({"Pipeline": pipeline.to_dict(), "Warnings": warnings}), ) - def list_pipelines(self): + def list_pipelines(self) -> TYPE_RESPONSE: return ( 200, {}, json.dumps({"Pipelines": self.elastictranscoder_backend.list_pipelines()}), ) - def validate_pipeline_id(self, pipeline_id): + def validate_pipeline_id(self, pipeline_id: str) -> Optional[TYPE_RESPONSE]: r = "^\\d{13}-\\w{6}$" if not re.match(r, pipeline_id): msg = f"1 validation error detected: Value '{pipeline_id}' at 'id' failed to satisfy constraint: Member must satisfy regular expression pattern: {r}" @@ -88,7 +92,7 @@ class ElasticTranscoderResponse(BaseResponse): json.dumps({"message": msg}), ) - def read_pipeline(self): + def read_pipeline(self) -> TYPE_RESPONSE: _id = self.path.rsplit("/", 1)[-1] err = self.validate_pipeline_id(_id) if err: @@ -96,7 +100,7 @@ class ElasticTranscoderResponse(BaseResponse): pipeline = self.elastictranscoder_backend.read_pipeline(_id) return 200, {}, json.dumps({"Pipeline": pipeline.to_dict()}) - def update_pipeline(self): + def update_pipeline(self) -> TYPE_RESPONSE: _id = self.path.rsplit("/", 1)[-1] name = self._get_param("Name") input_bucket = self._get_param("InputBucket") @@ -114,12 +118,12 @@ class ElasticTranscoderResponse(BaseResponse): json.dumps({"Pipeline": pipeline.to_dict(), "Warnings": warnings}), ) - def delete_pipeline(self): + def delete_pipeline(self) -> TYPE_RESPONSE: _id = self.path.rsplit("/", 1)[-1] self.elastictranscoder_backend.delete_pipeline(pipeline_id=_id) return 200, {}, "{}" - def err(self, msg): + def err(self, msg: str) -> TYPE_RESPONSE: return ( 400, {"status": 400, "x-amzn-ErrorType": "ValidationException"}, diff --git a/setup.cfg b/setup.cfg index 9140bce3f..9cec5d4fa 100644 --- a/setup.cfg +++ b/setup.cfg @@ -229,7 +229,7 @@ disable = W,C,R,E enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import [mypy] -files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/eks,moto/elasticache,moto/elasticbeanstalk,moto/es,moto/moto_api +files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/eks,moto/elasticache,moto/elasticbeanstalk,moto/elastictranscoder,moto/es,moto/moto_api show_column_numbers=True show_error_codes = True disable_error_code=abstract