Techdebt: MyPy DataPipeline (#5754)

This commit is contained in:
Bert Blommers 2022-12-11 09:48:12 -01:00 committed by GitHub
parent 6654f6ee9a
commit 56335e2d93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 41 deletions

View File

@ -3,37 +3,38 @@ import datetime
from collections import OrderedDict from collections import OrderedDict
from moto.core import BaseBackend, BackendDict, BaseModel, CloudFormationModel from moto.core import BaseBackend, BackendDict, BaseModel, CloudFormationModel
from .utils import get_random_pipeline_id, remove_capitalization_of_dict_keys from .utils import get_random_pipeline_id, remove_capitalization_of_dict_keys
from typing import Any, Dict, Iterable, List
class PipelineObject(BaseModel): class PipelineObject(BaseModel):
def __init__(self, object_id, name, fields): def __init__(self, object_id: str, name: str, fields: Any):
self.object_id = object_id self.object_id = object_id
self.name = name self.name = name
self.fields = fields self.fields = fields
def to_json(self): def to_json(self) -> Dict[str, Any]:
return {"fields": self.fields, "id": self.object_id, "name": self.name} return {"fields": self.fields, "id": self.object_id, "name": self.name}
class Pipeline(CloudFormationModel): class Pipeline(CloudFormationModel):
def __init__(self, name, unique_id, **kwargs): def __init__(self, name: str, unique_id: str, **kwargs: Any):
self.name = name self.name = name
self.unique_id = unique_id self.unique_id = unique_id
self.description = kwargs.get("description", "") self.description = kwargs.get("description", "")
self.pipeline_id = get_random_pipeline_id() self.pipeline_id = get_random_pipeline_id()
self.creation_time = datetime.datetime.utcnow() self.creation_time = datetime.datetime.utcnow()
self.objects = [] self.objects: List[Any] = []
self.status = "PENDING" self.status = "PENDING"
self.tags = kwargs.get("tags", []) self.tags = kwargs.get("tags", [])
@property @property
def physical_resource_id(self): def physical_resource_id(self) -> str:
return self.pipeline_id return self.pipeline_id
def to_meta_json(self): def to_meta_json(self) -> Dict[str, str]:
return {"id": self.pipeline_id, "name": self.name} return {"id": self.pipeline_id, "name": self.name}
def to_json(self): def to_json(self) -> Dict[str, Any]:
return { return {
"description": self.description, "description": self.description,
"fields": [ "fields": [
@ -58,7 +59,7 @@ class Pipeline(CloudFormationModel):
"tags": self.tags, "tags": self.tags,
} }
def set_pipeline_objects(self, pipeline_objects): def set_pipeline_objects(self, pipeline_objects: Any) -> None:
self.objects = [ self.objects = [
PipelineObject( PipelineObject(
pipeline_object["id"], pipeline_object["id"],
@ -68,22 +69,27 @@ class Pipeline(CloudFormationModel):
for pipeline_object in remove_capitalization_of_dict_keys(pipeline_objects) for pipeline_object in remove_capitalization_of_dict_keys(pipeline_objects)
] ]
def activate(self): def activate(self) -> None:
self.status = "SCHEDULED" self.status = "SCHEDULED"
@staticmethod @staticmethod
def cloudformation_name_type(): def cloudformation_name_type() -> str:
return "Name" return "Name"
@staticmethod @staticmethod
def cloudformation_type(): def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-datapipeline-pipeline.html # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-datapipeline-pipeline.html
return "AWS::DataPipeline::Pipeline" return "AWS::DataPipeline::Pipeline"
@classmethod @classmethod
def create_from_cloudformation_json( def create_from_cloudformation_json( # type: ignore[misc]
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs cls,
): resource_name: str,
cloudformation_json: Dict[str, Any],
account_id: str,
region_name: str,
**kwargs: Any
) -> "Pipeline":
datapipeline_backend = datapipeline_backends[account_id][region_name] datapipeline_backend = datapipeline_backends[account_id][region_name]
properties = cloudformation_json["Properties"] properties = cloudformation_json["Properties"]
@ -101,19 +107,19 @@ class Pipeline(CloudFormationModel):
class DataPipelineBackend(BaseBackend): class DataPipelineBackend(BaseBackend):
def __init__(self, region_name, account_id): def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id) super().__init__(region_name, account_id)
self.pipelines = OrderedDict() self.pipelines: Dict[str, Pipeline] = OrderedDict()
def create_pipeline(self, name, unique_id, **kwargs): def create_pipeline(self, name: str, unique_id: str, **kwargs: Any) -> Pipeline:
pipeline = Pipeline(name, unique_id, **kwargs) pipeline = Pipeline(name, unique_id, **kwargs)
self.pipelines[pipeline.pipeline_id] = pipeline self.pipelines[pipeline.pipeline_id] = pipeline
return pipeline return pipeline
def list_pipelines(self): def list_pipelines(self) -> Iterable[Pipeline]:
return self.pipelines.values() return self.pipelines.values()
def describe_pipelines(self, pipeline_ids): def describe_pipelines(self, pipeline_ids: List[str]) -> List[Pipeline]:
pipelines = [ pipelines = [
pipeline pipeline
for pipeline in self.pipelines.values() for pipeline in self.pipelines.values()
@ -121,21 +127,21 @@ class DataPipelineBackend(BaseBackend):
] ]
return pipelines return pipelines
def get_pipeline(self, pipeline_id): def get_pipeline(self, pipeline_id: str) -> Pipeline:
return self.pipelines[pipeline_id] return self.pipelines[pipeline_id]
def delete_pipeline(self, pipeline_id): def delete_pipeline(self, pipeline_id: str) -> None:
self.pipelines.pop(pipeline_id, None) self.pipelines.pop(pipeline_id, None)
def put_pipeline_definition(self, pipeline_id, pipeline_objects): def put_pipeline_definition(self, pipeline_id: str, pipeline_objects: Any) -> None:
pipeline = self.get_pipeline(pipeline_id) pipeline = self.get_pipeline(pipeline_id)
pipeline.set_pipeline_objects(pipeline_objects) pipeline.set_pipeline_objects(pipeline_objects)
def get_pipeline_definition(self, pipeline_id): def get_pipeline_definition(self, pipeline_id: str) -> Any:
pipeline = self.get_pipeline(pipeline_id) pipeline = self.get_pipeline(pipeline_id)
return pipeline.objects return pipeline.objects
def describe_objects(self, object_ids, pipeline_id): def describe_objects(self, object_ids: List[str], pipeline_id: str) -> List[Any]:
pipeline = self.get_pipeline(pipeline_id) pipeline = self.get_pipeline(pipeline_id)
pipeline_objects = [ pipeline_objects = [
pipeline_object pipeline_object
@ -144,7 +150,7 @@ class DataPipelineBackend(BaseBackend):
] ]
return pipeline_objects return pipeline_objects
def activate_pipeline(self, pipeline_id): def activate_pipeline(self, pipeline_id: str) -> None:
pipeline = self.get_pipeline(pipeline_id) pipeline = self.get_pipeline(pipeline_id)
pipeline.activate() pipeline.activate()

View File

@ -1,18 +1,18 @@
import json import json
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from .models import datapipeline_backends from .models import datapipeline_backends, DataPipelineBackend
class DataPipelineResponse(BaseResponse): class DataPipelineResponse(BaseResponse):
def __init__(self): def __init__(self) -> None:
super().__init__(service_name="datapipeline") super().__init__(service_name="datapipeline")
@property @property
def datapipeline_backend(self): def datapipeline_backend(self) -> DataPipelineBackend:
return datapipeline_backends[self.current_account][self.region] return datapipeline_backends[self.current_account][self.region]
def create_pipeline(self): def create_pipeline(self) -> str:
name = self._get_param("name") name = self._get_param("name")
unique_id = self._get_param("uniqueId") unique_id = self._get_param("uniqueId")
description = self._get_param("description", "") description = self._get_param("description", "")
@ -22,7 +22,7 @@ class DataPipelineResponse(BaseResponse):
) )
return json.dumps({"pipelineId": pipeline.pipeline_id}) return json.dumps({"pipelineId": pipeline.pipeline_id})
def list_pipelines(self): def list_pipelines(self) -> str:
pipelines = list(self.datapipeline_backend.list_pipelines()) pipelines = list(self.datapipeline_backend.list_pipelines())
pipeline_ids = [pipeline.pipeline_id for pipeline in pipelines] pipeline_ids = [pipeline.pipeline_id for pipeline in pipelines]
max_pipelines = 50 max_pipelines = 50
@ -47,7 +47,7 @@ class DataPipelineResponse(BaseResponse):
} }
) )
def describe_pipelines(self): def describe_pipelines(self) -> str:
pipeline_ids = self._get_param("pipelineIds") pipeline_ids = self._get_param("pipelineIds")
pipelines = self.datapipeline_backend.describe_pipelines(pipeline_ids) pipelines = self.datapipeline_backend.describe_pipelines(pipeline_ids)
@ -55,19 +55,19 @@ class DataPipelineResponse(BaseResponse):
{"pipelineDescriptionList": [pipeline.to_json() for pipeline in pipelines]} {"pipelineDescriptionList": [pipeline.to_json() for pipeline in pipelines]}
) )
def delete_pipeline(self): def delete_pipeline(self) -> str:
pipeline_id = self._get_param("pipelineId") pipeline_id = self._get_param("pipelineId")
self.datapipeline_backend.delete_pipeline(pipeline_id) self.datapipeline_backend.delete_pipeline(pipeline_id)
return json.dumps({}) return json.dumps({})
def put_pipeline_definition(self): def put_pipeline_definition(self) -> str:
pipeline_id = self._get_param("pipelineId") pipeline_id = self._get_param("pipelineId")
pipeline_objects = self._get_param("pipelineObjects") pipeline_objects = self._get_param("pipelineObjects")
self.datapipeline_backend.put_pipeline_definition(pipeline_id, pipeline_objects) self.datapipeline_backend.put_pipeline_definition(pipeline_id, pipeline_objects)
return json.dumps({"errored": False}) return json.dumps({"errored": False})
def get_pipeline_definition(self): def get_pipeline_definition(self) -> str:
pipeline_id = self._get_param("pipelineId") pipeline_id = self._get_param("pipelineId")
pipeline_definition = self.datapipeline_backend.get_pipeline_definition( pipeline_definition = self.datapipeline_backend.get_pipeline_definition(
pipeline_id pipeline_id
@ -80,7 +80,7 @@ class DataPipelineResponse(BaseResponse):
} }
) )
def describe_objects(self): def describe_objects(self) -> str:
pipeline_id = self._get_param("pipelineId") pipeline_id = self._get_param("pipelineId")
object_ids = self._get_param("objectIds") object_ids = self._get_param("objectIds")
@ -97,7 +97,7 @@ class DataPipelineResponse(BaseResponse):
} }
) )
def activate_pipeline(self): def activate_pipeline(self) -> str:
pipeline_id = self._get_param("pipelineId") pipeline_id = self._get_param("pipelineId")
self.datapipeline_backend.activate_pipeline(pipeline_id) self.datapipeline_backend.activate_pipeline(pipeline_id)
return json.dumps({}) return json.dumps({})

View File

@ -1,22 +1,23 @@
import collections.abc as collections_abc import collections.abc as collections_abc
from moto.moto_api._internal import mock_random from moto.moto_api._internal import mock_random
from typing import Any
def get_random_pipeline_id(): def get_random_pipeline_id() -> str:
return f"df-{mock_random.get_random_hex(length=19)}" return f"df-{mock_random.get_random_hex(length=19)}"
def remove_capitalization_of_dict_keys(obj): def remove_capitalization_of_dict_keys(obj: Any) -> Any:
if isinstance(obj, collections_abc.Mapping): if isinstance(obj, collections_abc.Mapping):
result = obj.__class__() result = obj.__class__()
for key, value in obj.items(): for key, value in obj.items():
normalized_key = key[:1].lower() + key[1:] normalized_key = key[:1].lower() + key[1:]
result[normalized_key] = remove_capitalization_of_dict_keys(value) result[normalized_key] = remove_capitalization_of_dict_keys(value) # type: ignore[index]
return result return result
elif isinstance(obj, collections_abc.Iterable) and not isinstance(obj, str): elif isinstance(obj, collections_abc.Iterable) and not isinstance(obj, str):
result = obj.__class__() result = obj.__class__() # type: ignore[assignment]
for item in obj: for item in obj:
result += (remove_capitalization_of_dict_keys(item),) result += (remove_capitalization_of_dict_keys(item),) # type: ignore[operator]
return result return result
else: else:
return obj return obj