diff --git a/moto/datapipeline/models.py b/moto/datapipeline/models.py index 8b737be9e..ee03740f8 100644 --- a/moto/datapipeline/models.py +++ b/moto/datapipeline/models.py @@ -3,37 +3,38 @@ import datetime from collections import OrderedDict from moto.core import BaseBackend, BackendDict, BaseModel, CloudFormationModel from .utils import get_random_pipeline_id, remove_capitalization_of_dict_keys +from typing import Any, Dict, Iterable, List class PipelineObject(BaseModel): - def __init__(self, object_id, name, fields): + def __init__(self, object_id: str, name: str, fields: Any): self.object_id = object_id self.name = name self.fields = fields - def to_json(self): + def to_json(self) -> Dict[str, Any]: return {"fields": self.fields, "id": self.object_id, "name": self.name} class Pipeline(CloudFormationModel): - def __init__(self, name, unique_id, **kwargs): + def __init__(self, name: str, unique_id: str, **kwargs: Any): self.name = name self.unique_id = unique_id self.description = kwargs.get("description", "") self.pipeline_id = get_random_pipeline_id() self.creation_time = datetime.datetime.utcnow() - self.objects = [] + self.objects: List[Any] = [] self.status = "PENDING" self.tags = kwargs.get("tags", []) @property - def physical_resource_id(self): + def physical_resource_id(self) -> str: return self.pipeline_id - def to_meta_json(self): + def to_meta_json(self) -> Dict[str, str]: return {"id": self.pipeline_id, "name": self.name} - def to_json(self): + def to_json(self) -> Dict[str, Any]: return { "description": self.description, "fields": [ @@ -58,7 +59,7 @@ class Pipeline(CloudFormationModel): "tags": self.tags, } - def set_pipeline_objects(self, pipeline_objects): + def set_pipeline_objects(self, pipeline_objects: Any) -> None: self.objects = [ PipelineObject( pipeline_object["id"], @@ -68,22 +69,27 @@ class Pipeline(CloudFormationModel): for pipeline_object in remove_capitalization_of_dict_keys(pipeline_objects) ] - def activate(self): + def activate(self) -> None: self.status = "SCHEDULED" @staticmethod - def cloudformation_name_type(): + def cloudformation_name_type() -> str: return "Name" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-datapipeline-pipeline.html return "AWS::DataPipeline::Pipeline" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Dict[str, Any], + account_id: str, + region_name: str, + **kwargs: Any + ) -> "Pipeline": datapipeline_backend = datapipeline_backends[account_id][region_name] properties = cloudformation_json["Properties"] @@ -101,19 +107,19 @@ class Pipeline(CloudFormationModel): class DataPipelineBackend(BaseBackend): - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.pipelines = OrderedDict() + self.pipelines: Dict[str, Pipeline] = OrderedDict() - def create_pipeline(self, name, unique_id, **kwargs): + def create_pipeline(self, name: str, unique_id: str, **kwargs: Any) -> Pipeline: pipeline = Pipeline(name, unique_id, **kwargs) self.pipelines[pipeline.pipeline_id] = pipeline return pipeline - def list_pipelines(self): + def list_pipelines(self) -> Iterable[Pipeline]: return self.pipelines.values() - def describe_pipelines(self, pipeline_ids): + def describe_pipelines(self, pipeline_ids: List[str]) -> List[Pipeline]: pipelines = [ pipeline for pipeline in self.pipelines.values() @@ -121,21 +127,21 @@ class DataPipelineBackend(BaseBackend): ] return pipelines - def get_pipeline(self, pipeline_id): + def get_pipeline(self, pipeline_id: str) -> Pipeline: return self.pipelines[pipeline_id] - def delete_pipeline(self, pipeline_id): + def delete_pipeline(self, pipeline_id: str) -> None: self.pipelines.pop(pipeline_id, None) - def put_pipeline_definition(self, pipeline_id, pipeline_objects): + def put_pipeline_definition(self, pipeline_id: str, pipeline_objects: Any) -> None: pipeline = self.get_pipeline(pipeline_id) pipeline.set_pipeline_objects(pipeline_objects) - def get_pipeline_definition(self, pipeline_id): + def get_pipeline_definition(self, pipeline_id: str) -> Any: pipeline = self.get_pipeline(pipeline_id) return pipeline.objects - def describe_objects(self, object_ids, pipeline_id): + def describe_objects(self, object_ids: List[str], pipeline_id: str) -> List[Any]: pipeline = self.get_pipeline(pipeline_id) pipeline_objects = [ pipeline_object @@ -144,7 +150,7 @@ class DataPipelineBackend(BaseBackend): ] return pipeline_objects - def activate_pipeline(self, pipeline_id): + def activate_pipeline(self, pipeline_id: str) -> None: pipeline = self.get_pipeline(pipeline_id) pipeline.activate() diff --git a/moto/datapipeline/responses.py b/moto/datapipeline/responses.py index f9fa3f27c..ce7bd4fdc 100644 --- a/moto/datapipeline/responses.py +++ b/moto/datapipeline/responses.py @@ -1,18 +1,18 @@ import json from moto.core.responses import BaseResponse -from .models import datapipeline_backends +from .models import datapipeline_backends, DataPipelineBackend class DataPipelineResponse(BaseResponse): - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="datapipeline") @property - def datapipeline_backend(self): + def datapipeline_backend(self) -> DataPipelineBackend: return datapipeline_backends[self.current_account][self.region] - def create_pipeline(self): + def create_pipeline(self) -> str: name = self._get_param("name") unique_id = self._get_param("uniqueId") description = self._get_param("description", "") @@ -22,7 +22,7 @@ class DataPipelineResponse(BaseResponse): ) return json.dumps({"pipelineId": pipeline.pipeline_id}) - def list_pipelines(self): + def list_pipelines(self) -> str: pipelines = list(self.datapipeline_backend.list_pipelines()) pipeline_ids = [pipeline.pipeline_id for pipeline in pipelines] max_pipelines = 50 @@ -47,7 +47,7 @@ class DataPipelineResponse(BaseResponse): } ) - def describe_pipelines(self): + def describe_pipelines(self) -> str: pipeline_ids = self._get_param("pipelineIds") pipelines = self.datapipeline_backend.describe_pipelines(pipeline_ids) @@ -55,19 +55,19 @@ class DataPipelineResponse(BaseResponse): {"pipelineDescriptionList": [pipeline.to_json() for pipeline in pipelines]} ) - def delete_pipeline(self): + def delete_pipeline(self) -> str: pipeline_id = self._get_param("pipelineId") self.datapipeline_backend.delete_pipeline(pipeline_id) return json.dumps({}) - def put_pipeline_definition(self): + def put_pipeline_definition(self) -> str: pipeline_id = self._get_param("pipelineId") pipeline_objects = self._get_param("pipelineObjects") self.datapipeline_backend.put_pipeline_definition(pipeline_id, pipeline_objects) return json.dumps({"errored": False}) - def get_pipeline_definition(self): + def get_pipeline_definition(self) -> str: pipeline_id = self._get_param("pipelineId") pipeline_definition = self.datapipeline_backend.get_pipeline_definition( pipeline_id @@ -80,7 +80,7 @@ class DataPipelineResponse(BaseResponse): } ) - def describe_objects(self): + def describe_objects(self) -> str: pipeline_id = self._get_param("pipelineId") object_ids = self._get_param("objectIds") @@ -97,7 +97,7 @@ class DataPipelineResponse(BaseResponse): } ) - def activate_pipeline(self): + def activate_pipeline(self) -> str: pipeline_id = self._get_param("pipelineId") self.datapipeline_backend.activate_pipeline(pipeline_id) return json.dumps({}) diff --git a/moto/datapipeline/utils.py b/moto/datapipeline/utils.py index b7041864e..99e552582 100644 --- a/moto/datapipeline/utils.py +++ b/moto/datapipeline/utils.py @@ -1,22 +1,23 @@ import collections.abc as collections_abc from moto.moto_api._internal import mock_random +from typing import Any -def get_random_pipeline_id(): +def get_random_pipeline_id() -> str: return f"df-{mock_random.get_random_hex(length=19)}" -def remove_capitalization_of_dict_keys(obj): +def remove_capitalization_of_dict_keys(obj: Any) -> Any: if isinstance(obj, collections_abc.Mapping): result = obj.__class__() for key, value in obj.items(): normalized_key = key[:1].lower() + key[1:] - result[normalized_key] = remove_capitalization_of_dict_keys(value) + result[normalized_key] = remove_capitalization_of_dict_keys(value) # type: ignore[index] return result elif isinstance(obj, collections_abc.Iterable) and not isinstance(obj, str): - result = obj.__class__() + result = obj.__class__() # type: ignore[assignment] for item in obj: - result += (remove_capitalization_of_dict_keys(item),) + result += (remove_capitalization_of_dict_keys(item),) # type: ignore[operator] return result else: return obj