Techdebt: MyPy ElasticTranscoder (#5981)

This commit is contained in:
Bert Blommers 2023-02-25 10:19:51 -01:00 committed by GitHub
parent 5d7d096bed
commit c4a3644902
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 40 deletions

View File

@ -1,5 +1,5 @@
from typing import Dict, Tuple, TypeVar, Union
from typing import Any, Dict, Tuple, TypeVar, Union
TYPE_RESPONSE = Tuple[int, Dict[str, str], Union[str, bytes]]
TYPE_RESPONSE = Tuple[int, Dict[str, Any], Union[str, bytes]]
TYPE_IF_NONE = TypeVar("TYPE_IF_NONE")

View File

@ -1,3 +1,4 @@
from typing import Any, Dict, List, Tuple
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.moto_api._internal import mock_random as random
import string
@ -6,14 +7,14 @@ import string
class Pipeline(BaseModel):
def __init__(
self,
account_id,
region,
name,
input_bucket,
output_bucket,
role,
content_config,
thumbnail_config,
account_id: str,
region: str,
name: str,
input_bucket: str,
output_bucket: str,
role: str,
content_config: Dict[str, Any],
thumbnail_config: Dict[str, Any],
):
a = "".join(random.choice(string.digits) for _ in range(13))
b = "".join(random.choice(string.ascii_lowercase) for _ in range(6))
@ -31,7 +32,7 @@ class Pipeline(BaseModel):
if "Permissions" not in self.thumbnail_config:
self.thumbnail_config["Permissions"] = []
def update(self, name, input_bucket, role):
def update(self, name: str, input_bucket: str, role: str) -> None:
if name:
self.name = name
if input_bucket:
@ -39,7 +40,7 @@ class Pipeline(BaseModel):
if role:
self.role = role
def to_dict(self):
def to_dict(self) -> Dict[str, Any]:
return {
"Id": self.id,
"Name": self.name,
@ -60,19 +61,19 @@ class Pipeline(BaseModel):
class ElasticTranscoderBackend(BaseBackend):
def __init__(self, region_name, account_id):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.pipelines = {}
self.pipelines: Dict[str, Pipeline] = {}
def create_pipeline(
self,
name,
input_bucket,
output_bucket,
role,
content_config,
thumbnail_config,
):
name: str,
input_bucket: str,
output_bucket: str,
role: str,
content_config: Dict[str, Any],
thumbnail_config: Dict[str, Any],
) -> Tuple[Pipeline, List[str]]:
"""
The following parameters are not yet implemented:
AWSKMSKeyArn, Notifications
@ -88,26 +89,28 @@ class ElasticTranscoderBackend(BaseBackend):
thumbnail_config,
)
self.pipelines[pipeline.id] = pipeline
warnings = []
warnings: List[str] = []
return pipeline, warnings
def list_pipelines(self):
def list_pipelines(self) -> List[Dict[str, Any]]:
return [p.to_dict() for _, p in self.pipelines.items()]
def read_pipeline(self, pipeline_id):
def read_pipeline(self, pipeline_id: str) -> Pipeline:
return self.pipelines[pipeline_id]
def update_pipeline(self, pipeline_id, name, input_bucket, role):
def update_pipeline(
self, pipeline_id: str, name: str, input_bucket: str, role: str
) -> Tuple[Pipeline, List[str]]:
"""
The following parameters are not yet implemented:
AWSKMSKeyArn, Notifications, ContentConfig, ThumbnailConfig
"""
pipeline = self.read_pipeline(pipeline_id)
pipeline.update(name, input_bucket, role)
warnings = []
warnings: List[str] = []
return pipeline, warnings
def delete_pipeline(self, pipeline_id):
def delete_pipeline(self, pipeline_id: str) -> None:
self.pipelines.pop(pipeline_id)

View File

@ -1,18 +1,20 @@
from typing import Any, Optional
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse
from .models import elastictranscoder_backends
from .models import elastictranscoder_backends, ElasticTranscoderBackend
import json
import re
class ElasticTranscoderResponse(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="elastictranscoder")
@property
def elastictranscoder_backend(self):
def elastictranscoder_backend(self) -> ElasticTranscoderBackend:
return elastictranscoder_backends[self.current_account][self.region]
def pipelines(self, request, full_url, headers):
def pipelines(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE:
self.setup_class(request, full_url, headers)
if request.method == "POST":
return self.create_pipeline()
@ -21,7 +23,9 @@ class ElasticTranscoderResponse(BaseResponse):
else:
return self.update_pipeline()
def individual_pipeline(self, request, full_url, headers):
def individual_pipeline(
self, request: Any, full_url: str, headers: Any
) -> TYPE_RESPONSE:
self.setup_class(request, full_url, headers)
if request.method == "GET":
return self.read_pipeline()
@ -30,7 +34,7 @@ class ElasticTranscoderResponse(BaseResponse):
else:
return self.update_pipeline()
def create_pipeline(self):
def create_pipeline(self) -> TYPE_RESPONSE:
name = self._get_param("Name")
input_bucket = self._get_param("InputBucket")
output_bucket = self._get_param("OutputBucket")
@ -65,14 +69,14 @@ class ElasticTranscoderResponse(BaseResponse):
json.dumps({"Pipeline": pipeline.to_dict(), "Warnings": warnings}),
)
def list_pipelines(self):
def list_pipelines(self) -> TYPE_RESPONSE:
return (
200,
{},
json.dumps({"Pipelines": self.elastictranscoder_backend.list_pipelines()}),
)
def validate_pipeline_id(self, pipeline_id):
def validate_pipeline_id(self, pipeline_id: str) -> Optional[TYPE_RESPONSE]:
r = "^\\d{13}-\\w{6}$"
if not re.match(r, pipeline_id):
msg = f"1 validation error detected: Value '{pipeline_id}' at 'id' failed to satisfy constraint: Member must satisfy regular expression pattern: {r}"
@ -88,7 +92,7 @@ class ElasticTranscoderResponse(BaseResponse):
json.dumps({"message": msg}),
)
def read_pipeline(self):
def read_pipeline(self) -> TYPE_RESPONSE:
_id = self.path.rsplit("/", 1)[-1]
err = self.validate_pipeline_id(_id)
if err:
@ -96,7 +100,7 @@ class ElasticTranscoderResponse(BaseResponse):
pipeline = self.elastictranscoder_backend.read_pipeline(_id)
return 200, {}, json.dumps({"Pipeline": pipeline.to_dict()})
def update_pipeline(self):
def update_pipeline(self) -> TYPE_RESPONSE:
_id = self.path.rsplit("/", 1)[-1]
name = self._get_param("Name")
input_bucket = self._get_param("InputBucket")
@ -114,12 +118,12 @@ class ElasticTranscoderResponse(BaseResponse):
json.dumps({"Pipeline": pipeline.to_dict(), "Warnings": warnings}),
)
def delete_pipeline(self):
def delete_pipeline(self) -> TYPE_RESPONSE:
_id = self.path.rsplit("/", 1)[-1]
self.elastictranscoder_backend.delete_pipeline(pipeline_id=_id)
return 200, {}, "{}"
def err(self, msg):
def err(self, msg: str) -> TYPE_RESPONSE:
return (
400,
{"status": 400, "x-amzn-ErrorType": "ValidationException"},

View File

@ -229,7 +229,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/eks,moto/elasticache,moto/elasticbeanstalk,moto/es,moto/moto_api
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/eks,moto/elasticache,moto/elasticbeanstalk,moto/elastictranscoder,moto/es,moto/moto_api
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract