Techdebt: MyPy Databrew (#5709)

This commit is contained in:
Bert Blommers 2022-11-25 14:45:39 -01:00 committed by GitHub
parent e5a1115834
commit cdc8b4f0e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 201 additions and 159 deletions

View File

@ -6,48 +6,48 @@ class DataBrewClientError(JsonRESTError):
class AlreadyExistsException(DataBrewClientError): class AlreadyExistsException(DataBrewClientError):
def __init__(self, typ): def __init__(self, typ: str):
super().__init__("AlreadyExistsException", f"{typ} already exists.") super().__init__("AlreadyExistsException", f"{typ} already exists.")
class ConflictException(DataBrewClientError): class ConflictException(DataBrewClientError):
code = 409 code = 409
def __init__(self, message, **kwargs): def __init__(self, message: str):
super().__init__("ConflictException", message, **kwargs) super().__init__("ConflictException", message)
class ValidationException(DataBrewClientError): class ValidationException(DataBrewClientError):
def __init__(self, message, **kwargs): def __init__(self, message: str):
super().__init__("ValidationException", message, **kwargs) super().__init__("ValidationException", message)
class RulesetAlreadyExistsException(AlreadyExistsException): class RulesetAlreadyExistsException(AlreadyExistsException):
def __init__(self): def __init__(self) -> None:
super().__init__("Ruleset") super().__init__("Ruleset")
class EntityNotFoundException(DataBrewClientError): class EntityNotFoundException(DataBrewClientError):
def __init__(self, msg): def __init__(self, msg: str):
super().__init__("EntityNotFoundException", msg) super().__init__("EntityNotFoundException", msg)
class ResourceNotFoundException(DataBrewClientError): class ResourceNotFoundException(DataBrewClientError):
code = 404 code = 404
def __init__(self, message, **kwargs): def __init__(self, message: str):
super().__init__("ResourceNotFoundException", message, **kwargs) super().__init__("ResourceNotFoundException", message)
class RulesetNotFoundException(EntityNotFoundException): class RulesetNotFoundException(EntityNotFoundException):
def __init__(self, recipe_name): def __init__(self, recipe_name: str):
super().__init__(f"Ruleset {recipe_name} not found.") super().__init__(f"Ruleset {recipe_name} not found.")
class ServiceQuotaExceededException(JsonRESTError): class ServiceQuotaExceededException(JsonRESTError):
code = 402 code = 402
def __init__(self): def __init__(self) -> None:
super().__init__( super().__init__(
"ServiceQuotaExceededException", "A service quota is exceeded." "ServiceQuotaExceededException", "A service quota is exceeded."
) )

View File

@ -5,6 +5,7 @@ from collections import OrderedDict
from copy import deepcopy from copy import deepcopy
import math import math
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional
from moto.core import BaseBackend, BackendDict, BaseModel from moto.core import BaseBackend, BackendDict, BaseModel
from moto.core.utils import underscores_to_camelcase from moto.core.utils import underscores_to_camelcase
@ -56,22 +57,28 @@ class DataBrewBackend(BaseBackend):
}, },
} }
def __init__(self, region_name, account_id): def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id) super().__init__(region_name, account_id)
self.recipes = OrderedDict() self.recipes: Dict[str, FakeRecipe] = OrderedDict()
self.rulesets = OrderedDict() self.rulesets: Dict[str, FakeRuleset] = OrderedDict()
self.datasets = OrderedDict() self.datasets: Dict[str, FakeDataset] = OrderedDict()
self.jobs = OrderedDict() self.jobs: Dict[str, FakeJob] = OrderedDict()
@staticmethod @staticmethod
def validate_length(param, param_name, max_length): def validate_length(param: str, param_name: str, max_length: int) -> None:
if len(param) > max_length: if len(param) > max_length:
raise ValidationException( raise ValidationException(
f"1 validation error detected: Value '{param}' at '{param_name}' failed to " f"1 validation error detected: Value '{param}' at '{param_name}' failed to "
f"satisfy constraint: Member must have length less than or equal to {max_length}" f"satisfy constraint: Member must have length less than or equal to {max_length}"
) )
def create_recipe(self, recipe_name, recipe_description, recipe_steps, tags): def create_recipe(
self,
recipe_name: str,
recipe_description: str,
recipe_steps: List[Dict[str, Any]],
tags: Dict[str, str],
) -> "FakeRecipeVersion":
# https://docs.aws.amazon.com/databrew/latest/dg/API_CreateRecipe.html # https://docs.aws.amazon.com/databrew/latest/dg/API_CreateRecipe.html
if recipe_name in self.recipes: if recipe_name in self.recipes:
raise ConflictException(f"The recipe {recipe_name} already exists") raise ConflictException(f"The recipe {recipe_name} already exists")
@ -82,7 +89,7 @@ class DataBrewBackend(BaseBackend):
self.recipes[recipe_name] = recipe self.recipes[recipe_name] = recipe
return recipe.latest_working return recipe.latest_working
def delete_recipe_version(self, recipe_name, recipe_version): def delete_recipe_version(self, recipe_name: str, recipe_version: str) -> None:
if not FakeRecipe.version_is_valid(recipe_version, latest_published=False): if not FakeRecipe.version_is_valid(recipe_version, latest_published=False):
raise ValidationException( raise ValidationException(
f"Recipe {recipe_name} version {recipe_version} is invalid." f"Recipe {recipe_name} version {recipe_version} is invalid."
@ -115,17 +122,20 @@ class DataBrewBackend(BaseBackend):
else: else:
recipe.delete_published_version(recipe_version) recipe.delete_published_version(recipe_version)
def update_recipe(self, recipe_name, recipe_description, recipe_steps): def update_recipe(
self,
recipe_name: str,
recipe_description: str,
recipe_steps: List[Dict[str, Any]],
) -> None:
if recipe_name not in self.recipes: if recipe_name not in self.recipes:
raise ResourceNotFoundException(f"The recipe {recipe_name} wasn't found") raise ResourceNotFoundException(f"The recipe {recipe_name} wasn't found")
recipe = self.recipes[recipe_name] recipe = self.recipes[recipe_name]
recipe.update(recipe_description, recipe_steps) recipe.update(recipe_description, recipe_steps)
return recipe.latest_working
@paginate(pagination_model=PAGINATION_MODEL) @paginate(pagination_model=PAGINATION_MODEL)
def list_recipes(self, recipe_version=None): def list_recipes(self, recipe_version: Optional[str] = None) -> List["FakeRecipeVersion"]: # type: ignore[misc]
# https://docs.aws.amazon.com/databrew/latest/dg/API_ListRecipes.html # https://docs.aws.amazon.com/databrew/latest/dg/API_ListRecipes.html
if recipe_version == FakeRecipe.LATEST_WORKING: if recipe_version == FakeRecipe.LATEST_WORKING:
version = "latest_working" version = "latest_working"
@ -140,7 +150,7 @@ class DataBrewBackend(BaseBackend):
return [r for r in recipes if r is not None] return [r for r in recipes if r is not None]
@paginate(pagination_model=PAGINATION_MODEL) @paginate(pagination_model=PAGINATION_MODEL)
def list_recipe_versions(self, recipe_name): def list_recipe_versions(self, recipe_name: str) -> List["FakeRecipeVersion"]: # type: ignore[misc]
# https://docs.aws.amazon.com/databrew/latest/dg/API_ListRecipeVersions.html # https://docs.aws.amazon.com/databrew/latest/dg/API_ListRecipeVersions.html
self.validate_length(recipe_name, "name", 255) self.validate_length(recipe_name, "name", 255)
@ -157,7 +167,9 @@ class DataBrewBackend(BaseBackend):
] ]
return [r for r in recipe_versions if r is not None] return [r for r in recipe_versions if r is not None]
def describe_recipe(self, recipe_name, recipe_version=None): def describe_recipe(
self, recipe_name: str, recipe_version: Optional[str] = None
) -> "FakeRecipeVersion":
# https://docs.aws.amazon.com/databrew/latest/dg/API_DescribeRecipe.html # https://docs.aws.amazon.com/databrew/latest/dg/API_DescribeRecipe.html
self.validate_length(recipe_name, "name", 255) self.validate_length(recipe_name, "name", 255)
@ -184,7 +196,9 @@ class DataBrewBackend(BaseBackend):
) )
return recipe return recipe
def publish_recipe(self, recipe_name, description=None): def publish_recipe(
self, recipe_name: str, description: Optional[str] = None
) -> None:
# https://docs.aws.amazon.com/databrew/latest/dg/API_PublishRecipe.html # https://docs.aws.amazon.com/databrew/latest/dg/API_PublishRecipe.html
self.validate_length(recipe_name, "name", 255) self.validate_length(recipe_name, "name", 255)
try: try:
@ -193,8 +207,13 @@ class DataBrewBackend(BaseBackend):
raise ResourceNotFoundException(f"Recipe {recipe_name} wasn't found") raise ResourceNotFoundException(f"Recipe {recipe_name} wasn't found")
def create_ruleset( def create_ruleset(
self, ruleset_name, ruleset_description, ruleset_rules, ruleset_target_arn, tags self,
): ruleset_name: str,
ruleset_description: str,
ruleset_rules: List[Dict[str, Any]],
ruleset_target_arn: str,
tags: Dict[str, str],
) -> "FakeRuleset":
if ruleset_name in self.rulesets: if ruleset_name in self.rulesets:
raise RulesetAlreadyExistsException() raise RulesetAlreadyExistsException()
@ -209,7 +228,13 @@ class DataBrewBackend(BaseBackend):
self.rulesets[ruleset_name] = ruleset self.rulesets[ruleset_name] = ruleset
return ruleset return ruleset
def update_ruleset(self, ruleset_name, ruleset_description, ruleset_rules, tags): def update_ruleset(
self,
ruleset_name: str,
ruleset_description: str,
ruleset_rules: List[Dict[str, Any]],
tags: Dict[str, str],
) -> "FakeRuleset":
if ruleset_name not in self.rulesets: if ruleset_name not in self.rulesets:
raise RulesetNotFoundException(ruleset_name) raise RulesetNotFoundException(ruleset_name)
@ -223,16 +248,16 @@ class DataBrewBackend(BaseBackend):
return ruleset return ruleset
def describe_ruleset(self, ruleset_name): def describe_ruleset(self, ruleset_name: str) -> "FakeRuleset":
if ruleset_name not in self.rulesets: if ruleset_name not in self.rulesets:
raise RulesetNotFoundException(ruleset_name) raise RulesetNotFoundException(ruleset_name)
return self.rulesets[ruleset_name] return self.rulesets[ruleset_name]
@paginate(pagination_model=PAGINATION_MODEL) @paginate(pagination_model=PAGINATION_MODEL)
def list_rulesets(self): def list_rulesets(self) -> List["FakeRuleset"]: # type: ignore[misc]
return list(self.rulesets.values()) return list(self.rulesets.values())
def delete_ruleset(self, ruleset_name): def delete_ruleset(self, ruleset_name: str) -> None:
if ruleset_name not in self.rulesets: if ruleset_name not in self.rulesets:
raise RulesetNotFoundException(ruleset_name) raise RulesetNotFoundException(ruleset_name)
@ -240,13 +265,13 @@ class DataBrewBackend(BaseBackend):
def create_dataset( def create_dataset(
self, self,
dataset_name, dataset_name: str,
dataset_format, dataset_format: str,
dataset_format_options, dataset_format_options: Dict[str, Any],
dataset_input, dataset_input: Dict[str, Any],
dataset_path_options, dataset_path_options: Dict[str, Any],
tags, tags: Dict[str, str],
): ) -> "FakeDataset":
if dataset_name in self.datasets: if dataset_name in self.datasets:
raise AlreadyExistsException(dataset_name) raise AlreadyExistsException(dataset_name)
@ -264,18 +289,18 @@ class DataBrewBackend(BaseBackend):
return dataset return dataset
@paginate(pagination_model=PAGINATION_MODEL) @paginate(pagination_model=PAGINATION_MODEL)
def list_datasets(self): def list_datasets(self) -> List["FakeDataset"]: # type: ignore[misc]
return list(self.datasets.values()) return list(self.datasets.values())
def update_dataset( def update_dataset(
self, self,
dataset_name, dataset_name: str,
dataset_format, dataset_format: str,
dataset_format_options, dataset_format_options: Dict[str, Any],
dataset_input, dataset_input: Dict[str, Any],
dataset_path_options, dataset_path_options: Dict[str, Any],
tags, tags: Dict[str, str],
): ) -> "FakeDataset":
if dataset_name not in self.datasets: if dataset_name not in self.datasets:
raise ResourceNotFoundException("One or more resources can't be found.") raise ResourceNotFoundException("One or more resources can't be found.")
@ -295,19 +320,19 @@ class DataBrewBackend(BaseBackend):
return dataset return dataset
def delete_dataset(self, dataset_name): def delete_dataset(self, dataset_name: str) -> None:
if dataset_name not in self.datasets: if dataset_name not in self.datasets:
raise ResourceNotFoundException("One or more resources can't be found.") raise ResourceNotFoundException("One or more resources can't be found.")
del self.datasets[dataset_name] del self.datasets[dataset_name]
def describe_dataset(self, dataset_name): def describe_dataset(self, dataset_name: str) -> "FakeDataset":
if dataset_name not in self.datasets: if dataset_name not in self.datasets:
raise ResourceNotFoundException("One or more resources can't be found.") raise ResourceNotFoundException("One or more resources can't be found.")
return self.datasets[dataset_name] return self.datasets[dataset_name]
def describe_job(self, job_name): def describe_job(self, job_name: str) -> "FakeJob":
# https://docs.aws.amazon.com/databrew/latest/dg/API_DescribeJob.html # https://docs.aws.amazon.com/databrew/latest/dg/API_DescribeJob.html
self.validate_length(job_name, "name", 240) self.validate_length(job_name, "name", 240)
@ -316,7 +341,7 @@ class DataBrewBackend(BaseBackend):
return self.jobs[job_name] return self.jobs[job_name]
def delete_job(self, job_name): def delete_job(self, job_name: str) -> None:
# https://docs.aws.amazon.com/databrew/latest/dg/API_DeleteJob.html # https://docs.aws.amazon.com/databrew/latest/dg/API_DeleteJob.html
self.validate_length(job_name, "name", 240) self.validate_length(job_name, "name", 240)
@ -325,7 +350,7 @@ class DataBrewBackend(BaseBackend):
del self.jobs[job_name] del self.jobs[job_name]
def create_profile_job(self, **kwargs): def create_profile_job(self, **kwargs: Any) -> "FakeProfileJob":
# https://docs.aws.amazon.com/databrew/latest/dg/API_CreateProfileJob.html # https://docs.aws.amazon.com/databrew/latest/dg/API_CreateProfileJob.html
job_name = kwargs["name"] job_name = kwargs["name"]
self.validate_length(job_name, "name", 240) self.validate_length(job_name, "name", 240)
@ -342,7 +367,7 @@ class DataBrewBackend(BaseBackend):
self.jobs[job_name] = job self.jobs[job_name] = job
return job return job
def create_recipe_job(self, **kwargs): def create_recipe_job(self, **kwargs: Any) -> "FakeRecipeJob":
# https://docs.aws.amazon.com/databrew/latest/dg/API_CreateRecipeJob.html # https://docs.aws.amazon.com/databrew/latest/dg/API_CreateRecipeJob.html
job_name = kwargs["name"] job_name = kwargs["name"]
self.validate_length(job_name, "name", 240) self.validate_length(job_name, "name", 240)
@ -359,7 +384,7 @@ class DataBrewBackend(BaseBackend):
self.jobs[job_name] = job self.jobs[job_name] = job
return job return job
def update_job(self, **kwargs): def update_job(self, **kwargs: Any) -> "FakeJob":
job_name = kwargs["name"] job_name = kwargs["name"]
self.validate_length(job_name, "name", 240) self.validate_length(job_name, "name", 240)
@ -372,23 +397,23 @@ class DataBrewBackend(BaseBackend):
setattr(job, param, value) setattr(job, param, value)
return job return job
def update_recipe_job(self, **kwargs): def update_recipe_job(self, **kwargs: Any) -> "FakeJob":
# https://docs.aws.amazon.com/databrew/latest/dg/API_UpdateRecipeJob.html # https://docs.aws.amazon.com/databrew/latest/dg/API_UpdateRecipeJob.html
return self.update_job(**kwargs) return self.update_job(**kwargs)
def update_profile_job(self, **kwargs): def update_profile_job(self, **kwargs: Any) -> "FakeJob":
# https://docs.aws.amazon.com/databrew/latest/dg/API_UpdateProfileJob.html # https://docs.aws.amazon.com/databrew/latest/dg/API_UpdateProfileJob.html
return self.update_job(**kwargs) return self.update_job(**kwargs)
@paginate(pagination_model=PAGINATION_MODEL) @paginate(pagination_model=PAGINATION_MODEL)
def list_jobs(self, dataset_name=None, project_name=None): def list_jobs(self, dataset_name: Optional[str] = None, project_name: Optional[str] = None) -> List["FakeJob"]: # type: ignore[misc]
# https://docs.aws.amazon.com/databrew/latest/dg/API_ListJobs.html # https://docs.aws.amazon.com/databrew/latest/dg/API_ListJobs.html
if dataset_name is not None: if dataset_name is not None:
self.validate_length(dataset_name, "datasetName", 255) self.validate_length(dataset_name, "datasetName", 255)
if project_name is not None: if project_name is not None:
self.validate_length(project_name, "projectName", 255) self.validate_length(project_name, "projectName", 255)
def filter_jobs(job): def filter_jobs(job: FakeJob) -> bool:
if dataset_name is not None and job.dataset_name != dataset_name: if dataset_name is not None and job.dataset_name != dataset_name:
return False return False
if ( if (
@ -407,14 +432,16 @@ class FakeRecipe(BaseModel):
LATEST_PUBLISHED = "LATEST_PUBLISHED" LATEST_PUBLISHED = "LATEST_PUBLISHED"
@classmethod @classmethod
def version_is_valid(cls, version, latest_working=True, latest_published=True): def version_is_valid(
cls, version: str, latest_working: bool = True, latest_published: bool = True
) -> bool:
validity = True validity = True
if len(version) < 1 or len(version) > 16: if len(version) < 1 or len(version) > 16:
validity = False validity = False
else: else:
try: try:
version = float(version) float(version)
except ValueError: except ValueError:
if not ( if not (
(version == cls.LATEST_WORKING and latest_working) (version == cls.LATEST_WORKING and latest_working)
@ -424,9 +451,14 @@ class FakeRecipe(BaseModel):
return validity return validity
def __init__( def __init__(
self, region_name, recipe_name, recipe_description, recipe_steps, tags self,
region_name: str,
recipe_name: str,
recipe_description: str,
recipe_steps: List[Dict[str, Any]],
tags: Dict[str, str],
): ):
self.versions = OrderedDict() self.versions: Dict[float, FakeRecipeVersion] = OrderedDict()
self.versions[self.INITIAL_VERSION] = FakeRecipeVersion( self.versions[self.INITIAL_VERSION] = FakeRecipeVersion(
region_name, region_name,
recipe_name, recipe_name,
@ -436,28 +468,30 @@ class FakeRecipe(BaseModel):
version=self.INITIAL_VERSION, version=self.INITIAL_VERSION,
) )
self.latest_working = self.versions[self.INITIAL_VERSION] self.latest_working = self.versions[self.INITIAL_VERSION]
self.latest_published = None self.latest_published: Optional[FakeRecipeVersion] = None
def publish(self, description=None): def publish(self, description: Optional[str] = None) -> None:
self.latest_published = self.latest_working self.latest_published = self.latest_working
self.latest_working = deepcopy(self.latest_working) self.latest_working = deepcopy(self.latest_working)
self.latest_published.publish(description) self.latest_published.publish(description) # type: ignore[attr-defined]
del self.versions[self.latest_working.version] del self.versions[self.latest_working.version]
self.versions[self.latest_published.version] = self.latest_published self.versions[self.latest_published.version] = self.latest_published
self.latest_working.version = self.latest_published.version + 0.1 self.latest_working.version = self.latest_published.version + 0.1
self.versions[self.latest_working.version] = self.latest_working self.versions[self.latest_working.version] = self.latest_working
def update(self, description, steps): def update(
self, description: Optional[str], steps: Optional[List[Dict[str, Any]]]
) -> None:
if description is not None: if description is not None:
self.latest_working.description = description self.latest_working.description = description
if steps is not None: if steps is not None:
self.latest_working.steps = steps self.latest_working.steps = steps
def delete_published_version(self, version): def delete_published_version(self, version: str) -> None:
version = float(version) float_version = float(version)
assert version.is_integer() assert float_version.is_integer()
if version == self.latest_published.version: if float_version == self.latest_published.version: # type: ignore[union-attr]
prev_version = version - 1.0 prev_version = float_version - 1.0
# Iterate back through the published versions until we find one that exists # Iterate back through the published versions until we find one that exists
while prev_version >= 1.0: while prev_version >= 1.0:
if prev_version in self.versions: if prev_version in self.versions:
@ -467,12 +501,18 @@ class FakeRecipe(BaseModel):
else: else:
# Didn't find an earlier published version # Didn't find an earlier published version
self.latest_published = None self.latest_published = None
del self.versions[version] del self.versions[float_version]
class FakeRecipeVersion(BaseModel): class FakeRecipeVersion(BaseModel):
def __init__( def __init__(
self, region_name, recipe_name, recipe_description, recipe_steps, tags, version self,
region_name: str,
recipe_name: str,
recipe_description: str,
recipe_steps: List[Dict[str, Any]],
tags: Dict[str, str],
version: float,
): ):
self.region_name = region_name self.region_name = region_name
self.name = recipe_name self.name = recipe_name
@ -480,10 +520,10 @@ class FakeRecipeVersion(BaseModel):
self.steps = recipe_steps self.steps = recipe_steps
self.created_time = datetime.now() self.created_time = datetime.now()
self.tags = tags self.tags = tags
self.published_date = None self.published_date: Optional[datetime] = None
self.version = version self.version = version
def as_dict(self): def as_dict(self) -> Dict[str, Any]:
dict_recipe = { dict_recipe = {
"Name": self.name, "Name": self.name,
"Steps": self.steps, "Steps": self.steps,
@ -497,7 +537,7 @@ class FakeRecipeVersion(BaseModel):
return dict_recipe return dict_recipe
def publish(self, description): def publish(self, description: Optional[str]) -> None:
self.version = float(math.ceil(self.version)) self.version = float(math.ceil(self.version))
self.published_date = datetime.now() self.published_date = datetime.now()
if description is not None: if description is not None:
@ -507,12 +547,12 @@ class FakeRecipeVersion(BaseModel):
class FakeRuleset(BaseModel): class FakeRuleset(BaseModel):
def __init__( def __init__(
self, self,
region_name, region_name: str,
ruleset_name, ruleset_name: str,
ruleset_description, ruleset_description: str,
ruleset_rules, ruleset_rules: List[Dict[str, Any]],
ruleset_target_arn, ruleset_target_arn: str,
tags, tags: Dict[str, str],
): ):
self.region_name = region_name self.region_name = region_name
self.name = ruleset_name self.name = ruleset_name
@ -523,7 +563,7 @@ class FakeRuleset(BaseModel):
self.tags = tags self.tags = tags
def as_dict(self): def as_dict(self) -> Dict[str, Any]:
return { return {
"Name": self.name, "Name": self.name,
"Rules": self.rules, "Rules": self.rules,
@ -537,14 +577,14 @@ class FakeRuleset(BaseModel):
class FakeDataset(BaseModel): class FakeDataset(BaseModel):
def __init__( def __init__(
self, self,
region_name, region_name: str,
account_id, account_id: str,
dataset_name, dataset_name: str,
dataset_format, dataset_format: str,
dataset_format_options, dataset_format_options: Dict[str, Any],
dataset_input, dataset_input: Dict[str, Any],
dataset_path_options, dataset_path_options: Dict[str, Any],
tags, tags: Dict[str, str],
): ):
self.region_name = region_name self.region_name = region_name
self.account_id = account_id self.account_id = account_id
@ -557,12 +597,12 @@ class FakeDataset(BaseModel):
self.tags = tags self.tags = tags
@property @property
def resource_arn(self): def resource_arn(self) -> str:
return ( return (
f"arn:aws:databrew:{self.region_name}:{self.account_id}:dataset/{self.name}" f"arn:aws:databrew:{self.region_name}:{self.account_id}:dataset/{self.name}"
) )
def as_dict(self): def as_dict(self) -> Dict[str, Any]:
return { return {
"Name": self.name, "Name": self.name,
"Format": self.format, "Format": self.format,
@ -575,21 +615,21 @@ class FakeDataset(BaseModel):
} }
class BaseModelABCMeta(ABCMeta, type(BaseModel)): class BaseModelABCMeta(ABCMeta, type(BaseModel)): # type: ignore[misc]
pass pass
class FakeJob(BaseModel, metaclass=BaseModelABCMeta): class FakeJob(BaseModel, metaclass=BaseModelABCMeta): # type: ignore[misc]
ENCRYPTION_MODES = ("SSE-S3", "SSE-KMS") ENCRYPTION_MODES = ("SSE-S3", "SSE-KMS")
LOG_SUBSCRIPTION_VALUES = ("ENABLE", "DISABLE") LOG_SUBSCRIPTION_VALUES = ("ENABLE", "DISABLE")
@property @property
@abstractmethod @abstractmethod
def local_attrs(self) -> tuple: def local_attrs(self) -> List[str]: # type: ignore[misc]
raise NotImplementedError raise NotImplementedError
def __init__(self, account_id, region_name, **kwargs): def __init__(self, account_id: str, region_name: str, **kwargs: Any):
self.account_id = account_id self.account_id = account_id
self.region_name = region_name self.region_name = region_name
self.name = kwargs.get("name") self.name = kwargs.get("name")
@ -606,7 +646,7 @@ class FakeJob(BaseModel, metaclass=BaseModelABCMeta):
for k in self.local_attrs: for k in self.local_attrs:
setattr(self, k, kwargs.get(k)) setattr(self, k, kwargs.get(k))
def validate(self): def validate(self) -> None:
if self.encryption_mode is not None: if self.encryption_mode is not None:
if self.encryption_mode not in FakeJob.ENCRYPTION_MODES: if self.encryption_mode not in FakeJob.ENCRYPTION_MODES:
raise ValidationException( raise ValidationException(
@ -624,10 +664,10 @@ class FakeJob(BaseModel, metaclass=BaseModelABCMeta):
pass pass
@property @property
def resource_arn(self): def resource_arn(self) -> str:
return f"arn:aws:databrew:{self.region_name}:{self.account_id}:job/{self.name}" return f"arn:aws:databrew:{self.region_name}:{self.account_id}:job/{self.name}"
def as_dict(self): def as_dict(self) -> Dict[str, Any]:
rtn_dict = { rtn_dict = {
"Name": self.name, "Name": self.name,
"AccountId": self.account_id, "AccountId": self.account_id,
@ -655,19 +695,19 @@ class FakeJob(BaseModel, metaclass=BaseModelABCMeta):
return rtn_dict return rtn_dict
class FakeProfileJob(FakeJob): class FakeProfileJob(FakeJob): # type: ignore[misc]
job_type = "PROFILE" job_type = "PROFILE"
local_attrs = ("output_location", "configuration", "validation_configurations") local_attrs = ["output_location", "configuration", "validation_configurations"]
class FakeRecipeJob(FakeJob): class FakeRecipeJob(FakeJob): # type: ignore[misc]
local_attrs = ( local_attrs = [
"database_outputs", "database_outputs",
"data_catalog_outputs", "data_catalog_outputs",
"outputs", "outputs",
"project_name", "project_name",
"recipe_reference", "recipe_reference",
) ]
job_type = "RECIPE" job_type = "RECIPE"

View File

@ -1,27 +1,29 @@
import json import json
from typing import Any, Dict, Union
from urllib.parse import urlparse from urllib.parse import urlparse
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.utilities.aws_headers import amzn_request_id from moto.utilities.aws_headers import amzn_request_id
from .models import databrew_backends from .models import databrew_backends, DataBrewBackend
class DataBrewResponse(BaseResponse): class DataBrewResponse(BaseResponse):
def __init__(self): def __init__(self) -> None:
super().__init__(service_name="databrew") super().__init__(service_name="databrew")
@property @property
def databrew_backend(self): def databrew_backend(self) -> DataBrewBackend:
"""Return backend instance specific for this region.""" """Return backend instance specific for this region."""
return databrew_backends[self.current_account][self.region] return databrew_backends[self.current_account][self.region]
# region Recipes # region Recipes
@property @property
def parameters(self): def parameters(self) -> Dict[str, Any]: # type: ignore[misc]
return json.loads(self.body) return json.loads(self.body)
@amzn_request_id @amzn_request_id
def create_recipe(self): def create_recipe(self) -> str:
# https://docs.aws.amazon.com/databrew/latest/dg/API_CreateRecipe.html # https://docs.aws.amazon.com/databrew/latest/dg/API_CreateRecipe.html
recipe_description = self.parameters.get("Description") recipe_description = self.parameters.get("Description")
recipe_steps = self.parameters.get("Steps") recipe_steps = self.parameters.get("Steps")
@ -29,12 +31,12 @@ class DataBrewResponse(BaseResponse):
tags = self.parameters.get("Tags") tags = self.parameters.get("Tags")
return json.dumps( return json.dumps(
self.databrew_backend.create_recipe( self.databrew_backend.create_recipe(
recipe_name, recipe_description, recipe_steps, tags recipe_name, recipe_description, recipe_steps, tags # type: ignore[arg-type]
).as_dict() ).as_dict()
) )
@amzn_request_id @amzn_request_id
def delete_recipe_version(self, request, full_url, headers): def delete_recipe_version(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return,misc]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
# https://docs.aws.amazon.com/databrew/latest/dg/API_DeleteRecipeVersion.html # https://docs.aws.amazon.com/databrew/latest/dg/API_DeleteRecipeVersion.html
if request.method == "DELETE": if request.method == "DELETE":
@ -50,7 +52,7 @@ class DataBrewResponse(BaseResponse):
) )
@amzn_request_id @amzn_request_id
def list_recipes(self): def list_recipes(self) -> str:
# https://docs.aws.amazon.com/databrew/latest/dg/API_ListRecipes.html # https://docs.aws.amazon.com/databrew/latest/dg/API_ListRecipes.html
next_token = self._get_param("NextToken", self._get_param("nextToken")) next_token = self._get_param("NextToken", self._get_param("nextToken"))
max_results = self._get_int_param( max_results = self._get_int_param(
@ -74,7 +76,7 @@ class DataBrewResponse(BaseResponse):
) )
@amzn_request_id @amzn_request_id
def list_recipe_versions(self, request, full_url, headers): def list_recipe_versions(self, request: Any, full_url: str, headers: Any) -> str: # type: ignore[return,misc]
# https://docs.aws.amazon.com/databrew/latest/dg/API_ListRecipeVersions.html # https://docs.aws.amazon.com/databrew/latest/dg/API_ListRecipeVersions.html
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
recipe_name = self._get_param("Name", self._get_param("name")) recipe_name = self._get_param("Name", self._get_param("name"))
@ -95,7 +97,7 @@ class DataBrewResponse(BaseResponse):
) )
@amzn_request_id @amzn_request_id
def publish_recipe(self, request, full_url, headers): def publish_recipe(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return,misc]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
if request.method == "POST": if request.method == "POST":
parsed_url = urlparse(full_url) parsed_url = urlparse(full_url)
@ -104,16 +106,16 @@ class DataBrewResponse(BaseResponse):
self.databrew_backend.publish_recipe(recipe_name, recipe_description) self.databrew_backend.publish_recipe(recipe_name, recipe_description)
return 200, {}, json.dumps({"Name": recipe_name}) return 200, {}, json.dumps({"Name": recipe_name})
def put_recipe_response(self, recipe_name): def put_recipe_response(self, recipe_name: str) -> TYPE_RESPONSE:
recipe_description = self.parameters.get("Description") recipe_description = self.parameters.get("Description")
recipe_steps = self.parameters.get("Steps") recipe_steps = self.parameters.get("Steps")
self.databrew_backend.update_recipe( self.databrew_backend.update_recipe(
recipe_name, recipe_description, recipe_steps recipe_name, recipe_description, recipe_steps # type: ignore[arg-type]
) )
return 200, {}, json.dumps({"Name": recipe_name}) return 200, {}, json.dumps({"Name": recipe_name})
def get_recipe_response(self, recipe_name): def get_recipe_response(self, recipe_name: str) -> TYPE_RESPONSE:
# https://docs.aws.amazon.com/databrew/latest/dg/API_DescribeRecipe.html # https://docs.aws.amazon.com/databrew/latest/dg/API_DescribeRecipe.html
recipe_version = self._get_param( recipe_version = self._get_param(
"RecipeVersion", self._get_param("recipeVersion") "RecipeVersion", self._get_param("recipeVersion")
@ -124,7 +126,7 @@ class DataBrewResponse(BaseResponse):
return 200, {}, json.dumps(recipe.as_dict()) return 200, {}, json.dumps(recipe.as_dict())
@amzn_request_id @amzn_request_id
def recipe_response(self, request, full_url, headers): def recipe_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[misc,return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url) parsed_url = urlparse(full_url)
@ -140,7 +142,7 @@ class DataBrewResponse(BaseResponse):
# region Rulesets # region Rulesets
@amzn_request_id @amzn_request_id
def create_ruleset(self): def create_ruleset(self) -> str:
ruleset_description = self.parameters.get("Description") ruleset_description = self.parameters.get("Description")
ruleset_rules = self.parameters.get("Rules") ruleset_rules = self.parameters.get("Rules")
ruleset_name = self.parameters.get("Name") ruleset_name = self.parameters.get("Name")
@ -149,34 +151,34 @@ class DataBrewResponse(BaseResponse):
return json.dumps( return json.dumps(
self.databrew_backend.create_ruleset( self.databrew_backend.create_ruleset(
ruleset_name, ruleset_name, # type: ignore[arg-type]
ruleset_description, ruleset_description, # type: ignore[arg-type]
ruleset_rules, ruleset_rules, # type: ignore[arg-type]
ruleset_target_arn, ruleset_target_arn, # type: ignore[arg-type]
tags, tags, # type: ignore[arg-type]
).as_dict() ).as_dict()
) )
def put_ruleset_response(self, ruleset_name): def put_ruleset_response(self, ruleset_name: str) -> TYPE_RESPONSE:
ruleset_description = self.parameters.get("Description") ruleset_description = self.parameters.get("Description")
ruleset_rules = self.parameters.get("Rules") ruleset_rules = self.parameters.get("Rules")
tags = self.parameters.get("Tags") tags = self.parameters.get("Tags")
ruleset = self.databrew_backend.update_ruleset( ruleset = self.databrew_backend.update_ruleset(
ruleset_name, ruleset_description, ruleset_rules, tags ruleset_name, ruleset_description, ruleset_rules, tags # type: ignore[arg-type]
) )
return 200, {}, json.dumps(ruleset.as_dict()) return 200, {}, json.dumps(ruleset.as_dict())
def get_ruleset_response(self, ruleset_name): def get_ruleset_response(self, ruleset_name: str) -> TYPE_RESPONSE:
ruleset = self.databrew_backend.describe_ruleset(ruleset_name) ruleset = self.databrew_backend.describe_ruleset(ruleset_name)
return 200, {}, json.dumps(ruleset.as_dict()) return 200, {}, json.dumps(ruleset.as_dict())
def delete_ruleset_response(self, ruleset_name): def delete_ruleset_response(self, ruleset_name: str) -> TYPE_RESPONSE:
self.databrew_backend.delete_ruleset(ruleset_name) self.databrew_backend.delete_ruleset(ruleset_name)
return 200, {}, json.dumps({"Name": ruleset_name}) return 200, {}, json.dumps({"Name": ruleset_name})
@amzn_request_id @amzn_request_id
def ruleset_response(self, request, full_url, headers): def ruleset_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[misc,return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url) parsed_url = urlparse(full_url)
@ -191,7 +193,7 @@ class DataBrewResponse(BaseResponse):
return self.delete_ruleset_response(ruleset_name) return self.delete_ruleset_response(ruleset_name)
@amzn_request_id @amzn_request_id
def list_rulesets(self): def list_rulesets(self) -> str:
# https://docs.aws.amazon.com/databrew/latest/dg/API_ListRulesets.html # https://docs.aws.amazon.com/databrew/latest/dg/API_ListRulesets.html
next_token = self._get_param("NextToken", self._get_param("nextToken")) next_token = self._get_param("NextToken", self._get_param("nextToken"))
max_results = self._get_int_param( max_results = self._get_int_param(
@ -214,7 +216,7 @@ class DataBrewResponse(BaseResponse):
# region Datasets # region Datasets
@amzn_request_id @amzn_request_id
def create_dataset(self): def create_dataset(self) -> str:
dataset_name = self.parameters.get("Name") dataset_name = self.parameters.get("Name")
dataset_format = self.parameters.get("Format") dataset_format = self.parameters.get("Format")
dataset_format_options = self.parameters.get("FormatOptions") dataset_format_options = self.parameters.get("FormatOptions")
@ -224,17 +226,17 @@ class DataBrewResponse(BaseResponse):
return json.dumps( return json.dumps(
self.databrew_backend.create_dataset( self.databrew_backend.create_dataset(
dataset_name, dataset_name, # type: ignore[arg-type]
dataset_format, dataset_format, # type: ignore[arg-type]
dataset_format_options, dataset_format_options, # type: ignore[arg-type]
dataset_input, dataset_input, # type: ignore[arg-type]
dataset_path_otions, dataset_path_otions, # type: ignore[arg-type]
dataset_tags, dataset_tags, # type: ignore[arg-type]
).as_dict() ).as_dict()
) )
@amzn_request_id @amzn_request_id
def list_datasets(self): def list_datasets(self) -> str:
next_token = self._get_param("NextToken", self._get_param("nextToken")) next_token = self._get_param("NextToken", self._get_param("nextToken"))
max_results = self._get_int_param( max_results = self._get_int_param(
"MaxResults", self._get_int_param("maxResults") "MaxResults", self._get_int_param("maxResults")
@ -253,7 +255,7 @@ class DataBrewResponse(BaseResponse):
) )
@amzn_request_id @amzn_request_id
def update_dataset(self, dataset_name): def update_dataset(self, dataset_name: str) -> TYPE_RESPONSE:
dataset_format = self.parameters.get("Format") dataset_format = self.parameters.get("Format")
dataset_format_options = self.parameters.get("FormatOptions") dataset_format_options = self.parameters.get("FormatOptions")
dataset_input = self.parameters.get("Input") dataset_input = self.parameters.get("Input")
@ -262,26 +264,26 @@ class DataBrewResponse(BaseResponse):
dataset = self.databrew_backend.update_dataset( dataset = self.databrew_backend.update_dataset(
dataset_name, dataset_name,
dataset_format, dataset_format, # type: ignore[arg-type]
dataset_format_options, dataset_format_options, # type: ignore[arg-type]
dataset_input, dataset_input, # type: ignore[arg-type]
dataset_path_otions, dataset_path_otions, # type: ignore[arg-type]
dataset_tags, dataset_tags, # type: ignore[arg-type]
) )
return 200, {}, json.dumps(dataset.as_dict()) return 200, {}, json.dumps(dataset.as_dict())
@amzn_request_id @amzn_request_id
def delete_dataset(self, dataset_name): def delete_dataset(self, dataset_name: str) -> TYPE_RESPONSE:
self.databrew_backend.delete_dataset(dataset_name) self.databrew_backend.delete_dataset(dataset_name)
return 200, {}, json.dumps({"Name": dataset_name}) return 200, {}, json.dumps({"Name": dataset_name})
@amzn_request_id @amzn_request_id
def describe_dataset(self, dataset_name): def describe_dataset(self, dataset_name: str) -> TYPE_RESPONSE:
dataset = self.databrew_backend.describe_dataset(dataset_name) dataset = self.databrew_backend.describe_dataset(dataset_name)
return 200, {}, json.dumps(dataset.as_dict()) return 200, {}, json.dumps(dataset.as_dict())
@amzn_request_id @amzn_request_id
def dataset_response(self, request, full_url, headers): def dataset_response(self, request: Any, full_url: str, headers: Any) -> Union[str, TYPE_RESPONSE]: # type: ignore[misc,return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url) parsed_url = urlparse(full_url)
@ -302,7 +304,7 @@ class DataBrewResponse(BaseResponse):
# region Jobs # region Jobs
@amzn_request_id @amzn_request_id
def list_jobs(self, request, full_url, headers): def list_jobs(self, request: Any, full_url: str, headers: Any) -> str: # type: ignore[misc,return]
# https://docs.aws.amazon.com/databrew/latest/dg/API_ListJobs.html # https://docs.aws.amazon.com/databrew/latest/dg/API_ListJobs.html
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
dataset_name = self._get_param("datasetName") dataset_name = self._get_param("datasetName")
@ -326,16 +328,16 @@ class DataBrewResponse(BaseResponse):
} }
) )
def get_job_response(self, job_name): def get_job_response(self, job_name: str) -> TYPE_RESPONSE:
job = self.databrew_backend.describe_job(job_name) job = self.databrew_backend.describe_job(job_name)
return 200, {}, json.dumps(job.as_dict()) return 200, {}, json.dumps(job.as_dict())
def delete_job_response(self, job_name): def delete_job_response(self, job_name: str) -> TYPE_RESPONSE:
self.databrew_backend.delete_job(job_name) self.databrew_backend.delete_job(job_name)
return 200, {}, json.dumps({"Name": job_name}) return 200, {}, json.dumps({"Name": job_name})
@amzn_request_id @amzn_request_id
def job_response(self, request, full_url, headers): def job_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[misc,return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url) parsed_url = urlparse(full_url)
@ -347,7 +349,7 @@ class DataBrewResponse(BaseResponse):
return self.delete_job_response(job_name) return self.delete_job_response(job_name)
@amzn_request_id @amzn_request_id
def create_profile_job(self): def create_profile_job(self) -> str:
# https://docs.aws.amazon.com/databrew/latest/dg/API_CreateProfileJob.html # https://docs.aws.amazon.com/databrew/latest/dg/API_CreateProfileJob.html
kwargs = { kwargs = {
"dataset_name": self._get_param("DatasetName"), "dataset_name": self._get_param("DatasetName"),
@ -367,7 +369,7 @@ class DataBrewResponse(BaseResponse):
} }
return json.dumps(self.databrew_backend.create_profile_job(**kwargs).as_dict()) return json.dumps(self.databrew_backend.create_profile_job(**kwargs).as_dict())
def update_profile_job_response(self, name): def update_profile_job_response(self, name: str) -> str:
# https://docs.aws.amazon.com/databrew/latest/dg/API_UpdateProfileJob.html # https://docs.aws.amazon.com/databrew/latest/dg/API_UpdateProfileJob.html
kwargs = { kwargs = {
"name": name, "name": name,
@ -386,7 +388,7 @@ class DataBrewResponse(BaseResponse):
return json.dumps(self.databrew_backend.update_profile_job(**kwargs).as_dict()) return json.dumps(self.databrew_backend.update_profile_job(**kwargs).as_dict())
@amzn_request_id @amzn_request_id
def create_recipe_job(self): def create_recipe_job(self) -> str:
# https://docs.aws.amazon.com/databrew/latest/dg/API_CreateRecipeJob.html # https://docs.aws.amazon.com/databrew/latest/dg/API_CreateRecipeJob.html
kwargs = { kwargs = {
"name": self._get_param("Name"), "name": self._get_param("Name"),
@ -408,7 +410,7 @@ class DataBrewResponse(BaseResponse):
return json.dumps(self.databrew_backend.create_recipe_job(**kwargs).as_dict()) return json.dumps(self.databrew_backend.create_recipe_job(**kwargs).as_dict())
@amzn_request_id @amzn_request_id
def update_recipe_job_response(self, name): def update_recipe_job_response(self, name: str) -> str:
# https://docs.aws.amazon.com/databrew/latest/dg/API_UpdateRecipeJob.html # https://docs.aws.amazon.com/databrew/latest/dg/API_UpdateRecipeJob.html
kwargs = { kwargs = {
"name": name, "name": name,
@ -426,7 +428,7 @@ class DataBrewResponse(BaseResponse):
return json.dumps(self.databrew_backend.update_recipe_job(**kwargs).as_dict()) return json.dumps(self.databrew_backend.update_recipe_job(**kwargs).as_dict())
@amzn_request_id @amzn_request_id
def profile_job_response(self, request, full_url, headers): def profile_job_response(self, request: Any, full_url: str, headers: Any) -> str: # type: ignore[misc,return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url) parsed_url = urlparse(full_url)
@ -436,7 +438,7 @@ class DataBrewResponse(BaseResponse):
return self.update_profile_job_response(job_name) return self.update_profile_job_response(job_name)
@amzn_request_id @amzn_request_id
def recipe_job_response(self, request, full_url, headers): def recipe_job_response(self, request: Any, full_url: str, headers: Any) -> str: # type: ignore[misc,return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url) parsed_url = urlparse(full_url)

View File

@ -18,7 +18,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy] [mypy]
files= moto/a*,moto/b*,moto/c*,moto/moto_api files= moto/a*,moto/b*,moto/c*,moto/databrew,moto/moto_api
show_column_numbers=True show_column_numbers=True
show_error_codes = True show_error_codes = True
disable_error_code=abstract disable_error_code=abstract