Databrew: Add datasets support (#5095)

This commit is contained in:
Steven Church 2022-05-13 11:48:04 +01:00 committed by GitHub
parent 957b3148e0
commit babbd21814
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 499 additions and 14 deletions

View File

@ -1069,30 +1069,30 @@
## databrew ## databrew
<details> <details>
<summary>22% implemented</summary> <summary>27% implemented</summary>
- [ ] batch_delete_recipe_version - [ ] batch_delete_recipe_version
- [ ] create_dataset - [X] create_dataset
- [ ] create_profile_job - [ ] create_profile_job
- [ ] create_project - [ ] create_project
- [X] create_recipe - [X] create_recipe
- [ ] create_recipe_job - [ ] create_recipe_job
- [X] create_ruleset - [X] create_ruleset
- [ ] create_schedule - [ ] create_schedule
- [ ] delete_dataset - [X] delete_dataset
- [ ] delete_job - [ ] delete_job
- [ ] delete_project - [ ] delete_project
- [X] delete_recipe_version - [X] delete_recipe_version
- [X] delete_ruleset - [X] delete_ruleset
- [ ] delete_schedule - [ ] delete_schedule
- [ ] describe_dataset - [X] describe_dataset
- [ ] describe_job - [ ] describe_job
- [ ] describe_job_run - [ ] describe_job_run
- [ ] describe_project - [ ] describe_project
- [ ] describe_recipe - [ ] describe_recipe
- [ ] describe_ruleset - [ ] describe_ruleset
- [ ] describe_schedule - [ ] describe_schedule
- [ ] list_datasets - [X] list_datasets
- [ ] list_job_runs - [ ] list_job_runs
- [ ] list_jobs - [ ] list_jobs
- [ ] list_projects - [ ] list_projects
@ -1108,7 +1108,7 @@
- [ ] stop_job_run - [ ] stop_job_run
- [ ] tag_resource - [ ] tag_resource
- [ ] untag_resource - [ ] untag_resource
- [ ] update_dataset - [X] update_dataset
- [ ] update_profile_job - [ ] update_profile_job
- [ ] update_project - [ ] update_project
- [X] update_recipe - [X] update_recipe

View File

@ -26,27 +26,27 @@ databrew
|start-h3| Implemented features for this service |end-h3| |start-h3| Implemented features for this service |end-h3|
- [ ] batch_delete_recipe_version - [ ] batch_delete_recipe_version
- [ ] create_dataset - [X] create_dataset
- [ ] create_profile_job - [ ] create_profile_job
- [ ] create_project - [ ] create_project
- [X] create_recipe - [X] create_recipe
- [ ] create_recipe_job - [ ] create_recipe_job
- [X] create_ruleset - [X] create_ruleset
- [ ] create_schedule - [ ] create_schedule
- [ ] delete_dataset - [X] delete_dataset
- [ ] delete_job - [ ] delete_job
- [ ] delete_project - [ ] delete_project
- [X] delete_recipe_version - [X] delete_recipe_version
- [X] delete_ruleset - [X] delete_ruleset
- [ ] delete_schedule - [ ] delete_schedule
- [ ] describe_dataset - [X] describe_dataset
- [ ] describe_job - [ ] describe_job
- [ ] describe_job_run - [ ] describe_job_run
- [ ] describe_project - [ ] describe_project
- [ ] describe_recipe - [ ] describe_recipe
- [ ] describe_ruleset - [ ] describe_ruleset
- [ ] describe_schedule - [ ] describe_schedule
- [ ] list_datasets - [X] list_datasets
- [ ] list_job_runs - [ ] list_job_runs
- [ ] list_jobs - [ ] list_jobs
- [ ] list_projects - [ ] list_projects
@ -62,7 +62,7 @@ databrew
- [ ] stop_job_run - [ ] stop_job_run
- [ ] tag_resource - [ ] tag_resource
- [ ] untag_resource - [ ] untag_resource
- [ ] update_dataset - [X] update_dataset
- [ ] update_profile_job - [ ] update_profile_job
- [ ] update_project - [ ] update_project
- [X] update_recipe - [X] update_recipe

View File

@ -1,4 +1,4 @@
# autogenerated by scripts/update_backend_index.py # autogenerated by ./scripts/update_backend_index.py
import re import re
backend_url_patterns = [ backend_url_patterns = [

View File

@ -42,3 +42,12 @@ class ResourceNotFoundException(DataBrewClientError):
class RulesetNotFoundException(EntityNotFoundException): class RulesetNotFoundException(EntityNotFoundException):
def __init__(self, recipe_name): def __init__(self, recipe_name):
super().__init__("Ruleset %s not found." % recipe_name) super().__init__("Ruleset %s not found." % recipe_name)
class ServiceQuotaExceededException(JsonRESTError):
code = 402
def __init__(self):
super().__init__(
"ServiceQuotaExceededException", "A service quota is exceeded."
)

View File

@ -6,12 +6,15 @@ from datetime import datetime
from moto.core import BaseBackend, BaseModel from moto.core import BaseBackend, BaseModel
from moto.core.utils import BackendDict from moto.core.utils import BackendDict
from moto.utilities.paginator import paginate from moto.utilities.paginator import paginate
from .exceptions import ( from .exceptions import (
AlreadyExistsException,
ConflictException, ConflictException,
ResourceNotFoundException,
ValidationException, ValidationException,
RulesetAlreadyExistsException,
RulesetNotFoundException,
ResourceNotFoundException,
) )
from .exceptions import RulesetAlreadyExistsException, RulesetNotFoundException
class DataBrewBackend(BaseBackend): class DataBrewBackend(BaseBackend):
@ -34,12 +37,19 @@ class DataBrewBackend(BaseBackend):
"limit_default": 100, "limit_default": 100,
"unique_attribute": "name", "unique_attribute": "name",
}, },
"list_datasets": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "name",
},
} }
def __init__(self, region_name): def __init__(self, region_name):
self.region_name = region_name self.region_name = region_name
self.recipes = OrderedDict() self.recipes = OrderedDict()
self.rulesets = OrderedDict() self.rulesets = OrderedDict()
self.datasets = OrderedDict()
def reset(self): def reset(self):
"""Re-initialize all attributes for this instance.""" """Re-initialize all attributes for this instance."""
@ -221,6 +231,74 @@ class DataBrewBackend(BaseBackend):
del self.rulesets[ruleset_name] del self.rulesets[ruleset_name]
def create_dataset(
self,
dataset_name,
dataset_format,
dataset_format_options,
dataset_input,
dataset_path_options,
tags,
):
if dataset_name in self.datasets:
raise AlreadyExistsException(dataset_name)
dataset = FakeDataset(
self.region_name,
dataset_name,
dataset_format,
dataset_format_options,
dataset_input,
dataset_path_options,
tags,
)
self.datasets[dataset_name] = dataset
return dataset
@paginate(pagination_model=PAGINATION_MODEL)
def list_datasets(self):
return list(self.datasets.values())
def update_dataset(
self,
dataset_name,
dataset_format,
dataset_format_options,
dataset_input,
dataset_path_options,
tags,
):
if dataset_name not in self.datasets:
raise ResourceNotFoundException("One or more resources can't be found.")
dataset = self.datasets[dataset_name]
if dataset_format is not None:
dataset.format = dataset_format
if dataset_format_options is not None:
dataset.format_options = dataset_format_options
if dataset_input is not None:
dataset.input = dataset_input
if dataset_path_options is not None:
dataset.path_options = dataset_path_options
if tags is not None:
dataset.tags = tags
return dataset
def delete_dataset(self, dataset_name):
if dataset_name not in self.datasets:
raise ResourceNotFoundException("One or more resources can't be found.")
del self.datasets[dataset_name]
def describe_dataset(self, dataset_name):
if dataset_name not in self.datasets:
raise ResourceNotFoundException("One or more resources can't be found.")
return self.datasets[dataset_name]
class FakeRecipe(BaseModel): class FakeRecipe(BaseModel):
INITIAL_VERSION = 0.1 INITIAL_VERSION = 0.1
@ -355,4 +433,36 @@ class FakeRuleset(BaseModel):
} }
class FakeDataset(BaseModel):
def __init__(
self,
region_name,
dataset_name,
dataset_format,
dataset_format_options,
dataset_input,
dataset_path_options,
tags,
):
self.region_name = region_name
self.name = dataset_name
self.format = dataset_format
self.format_options = dataset_format_options
self.input = dataset_input
self.path_options = dataset_path_options
self.created_time = datetime.now()
self.tags = tags
def as_dict(self):
return {
"Name": self.name,
"Format": self.format,
"FormatOptions": self.format_options,
"Input": self.input,
"PathOptions": self.path_options,
"CreateTime": self.created_time.isoformat(),
"Tags": self.tags or dict(),
}
databrew_backends = BackendDict(DataBrewBackend, "databrew") databrew_backends = BackendDict(DataBrewBackend, "databrew")

View File

@ -14,6 +14,7 @@ class DataBrewResponse(BaseResponse):
"""Return backend instance specific for this region.""" """Return backend instance specific for this region."""
return databrew_backends[self.region] return databrew_backends[self.region]
# region Recipes
@property @property
def parameters(self): def parameters(self):
return json.loads(self.body) return json.loads(self.body)
@ -133,6 +134,10 @@ class DataBrewResponse(BaseResponse):
elif request.method == "GET": elif request.method == "GET":
return self.get_recipe_response(recipe_name) return self.get_recipe_response(recipe_name)
# endregion
# region Rulesets
@amzn_request_id @amzn_request_id
def create_ruleset(self): def create_ruleset(self):
ruleset_description = self.parameters.get("Description") ruleset_description = self.parameters.get("Description")
@ -202,3 +207,94 @@ class DataBrewResponse(BaseResponse):
"NextToken": next_token, "NextToken": next_token,
} }
) )
# endregion
# region Datasets
@amzn_request_id
def create_dataset(self):
dataset_name = self.parameters.get("Name")
dataset_format = self.parameters.get("Format")
dataset_format_options = self.parameters.get("FormatOptions")
dataset_input = self.parameters.get("Input")
dataset_path_otions = self.parameters.get("PathOptions")
dataset_tags = self.parameters.get("Tags")
return json.dumps(
self.databrew_backend.create_dataset(
dataset_name,
dataset_format,
dataset_format_options,
dataset_input,
dataset_path_otions,
dataset_tags,
).as_dict()
)
@amzn_request_id
def list_datasets(self):
next_token = self._get_param("NextToken", self._get_param("nextToken"))
max_results = self._get_int_param(
"MaxResults", self._get_int_param("maxResults")
)
# pylint: disable=unexpected-keyword-arg, unbalanced-tuple-unpacking
dataset_list, next_token = self.databrew_backend.list_datasets(
next_token=next_token, max_results=max_results
)
return json.dumps(
{
"Datasets": [dataset.as_dict() for dataset in dataset_list],
"NextToken": next_token,
}
)
@amzn_request_id
def update_dataset(self, dataset_name):
dataset_format = self.parameters.get("Format")
dataset_format_options = self.parameters.get("FormatOptions")
dataset_input = self.parameters.get("Input")
dataset_path_otions = self.parameters.get("PathOptions")
dataset_tags = self.parameters.get("Tags")
dataset = self.databrew_backend.update_dataset(
dataset_name,
dataset_format,
dataset_format_options,
dataset_input,
dataset_path_otions,
dataset_tags,
)
return 200, {}, json.dumps(dataset.as_dict())
@amzn_request_id
def delete_dataset(self, dataset_name):
self.databrew_backend.delete_dataset(dataset_name)
return 200, {}, json.dumps({"Name": dataset_name})
@amzn_request_id
def describe_dataset(self, dataset_name):
dataset = self.databrew_backend.describe_dataset(dataset_name)
return 200, {}, json.dumps(dataset.as_dict())
@amzn_request_id
def dataset_response(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url)
dataset_name = parsed_url.path.split("/")[-1]
if request.method == "POST":
return self.create_dataset()
elif request.method == "GET" and dataset_name:
return self.describe_dataset(dataset_name)
elif request.method == "GET":
return self.list_datasets()
elif request.method == "DELETE":
return self.delete_dataset(dataset_name)
elif request.method == "PUT":
return self.update_dataset(dataset_name)
# endregion

View File

@ -10,4 +10,6 @@ url_paths = {
"{0}/recipes/(?P<recipe_name>[^/]+)/publishRecipe$": DataBrewResponse().publish_recipe, "{0}/recipes/(?P<recipe_name>[^/]+)/publishRecipe$": DataBrewResponse().publish_recipe,
"{0}/rulesets$": DataBrewResponse.dispatch, "{0}/rulesets$": DataBrewResponse.dispatch,
"{0}/rulesets/(?P<ruleset_name>[^/]+)$": DataBrewResponse().ruleset_response, "{0}/rulesets/(?P<ruleset_name>[^/]+)$": DataBrewResponse().ruleset_response,
"{0}/datasets$": DataBrewResponse.dispatch,
"{0}/datasets/(?P<dataset_name>[^/]+)$": DataBrewResponse().dataset_response,
} }

View File

@ -0,0 +1,268 @@
import uuid
import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_databrew
def _create_databrew_client():
client = boto3.client("databrew", region_name="us-west-1")
return client
def _create_test_dataset(
client,
tags=None,
dataset_name=None,
dataset_format="JSON",
dataset_format_options=None,
):
if dataset_name is None:
dataset_name = str(uuid.uuid4())
if not dataset_format_options:
if dataset_format == "JSON":
dataset_format_options = {"Json": {"MultiLine": True}}
elif dataset_format == "CSV":
dataset_format_options = {"Csv": {"Delimiter": ",", "HeaderRow": False}}
elif dataset_format == "EXCEL":
dataset_format_options = {
"Excel": {
"SheetNames": [
"blaa",
],
"SheetIndexes": [
123,
],
"HeaderRow": True,
}
}
return client.create_dataset(
Name=dataset_name,
Format=dataset_format,
FormatOptions=dataset_format_options,
Input={
"S3InputDefinition": {
"Bucket": "somerandombucketname",
},
"DataCatalogInputDefinition": {
"DatabaseName": "somedbname",
"TableName": "sometablename",
"TempDirectory": {
"Bucket": "sometempbucketname",
},
},
"DatabaseInputDefinition": {
"GlueConnectionName": "someglueconnectionname",
"TempDirectory": {
"Bucket": "sometempbucketname",
},
},
},
PathOptions={
"LastModifiedDateCondition": {
"Expression": "string",
"ValuesMap": {"string": "string"},
},
"FilesLimit": {
"MaxFiles": 123,
"OrderedBy": "LAST_MODIFIED_DATE",
"Order": "ASCENDING",
},
"Parameters": {
"string": {
"Name": "string",
"Type": "string",
"CreateColumn": False,
"Filter": {
"Expression": "string",
"ValuesMap": {"string": "string"},
},
}
},
},
Tags=tags or {},
)
def _create_test_datasets(client, count):
for _ in range(count):
_create_test_dataset(client)
@mock_databrew
def test_dataset_list_when_empty():
client = _create_databrew_client()
response = client.list_datasets()
response.should.have.key("Datasets")
response["Datasets"].should.have.length_of(0)
@mock_databrew
def test_list_datasets_with_max_results():
client = _create_databrew_client()
_create_test_datasets(client, 4)
response = client.list_datasets(MaxResults=2)
response["Datasets"].should.have.length_of(2)
response.should.have.key("NextToken")
@mock_databrew
def test_list_datasets_from_next_token():
client = _create_databrew_client()
_create_test_datasets(client, 10)
first_response = client.list_datasets(MaxResults=3)
response = client.list_datasets(NextToken=first_response["NextToken"])
response["Datasets"].should.have.length_of(7)
@mock_databrew
def test_list_datasets_with_max_results_greater_than_actual_results():
client = _create_databrew_client()
_create_test_datasets(client, 4)
response = client.list_datasets(MaxResults=10)
response["Datasets"].should.have.length_of(4)
@mock_databrew
def test_describe_dataset():
client = _create_databrew_client()
# region basic test
response = _create_test_dataset(client)
dataset = client.describe_dataset(Name=response["Name"])
dataset["Name"].should.equal(response["Name"])
# endregion
# region JSON test
response = _create_test_dataset(client, dataset_format="CSV")
dataset = client.describe_dataset(Name=response["Name"])
dataset["Format"].should.equal("CSV")
# endregion
@mock_databrew
def test_describe_dataset_that_does_not_exist():
client = _create_databrew_client()
with pytest.raises(ClientError) as exc:
client.describe_dataset(Name="DoseNotExist")
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal("One or more resources can't be found.")
@mock_databrew
def test_create_dataset_that_already_exists():
client = _create_databrew_client()
response = _create_test_dataset(client)
with pytest.raises(ClientError) as exc:
_create_test_dataset(client, dataset_name=response["Name"])
err = exc.value.response["Error"]
err["Code"].should.equal("AlreadyExistsException")
err["Message"].should.equal(f"{response['Name']} already exists.")
@mock_databrew
def test_delete_dataset():
client = _create_databrew_client()
response = _create_test_dataset(client)
# Check dataset exists
dataset = client.describe_dataset(Name=response["Name"])
dataset["Name"].should.equal(response["Name"])
# Delete the dataset
client.delete_dataset(Name=response["Name"])
# Check it does not exist anymore
with pytest.raises(ClientError) as exc:
client.describe_dataset(Name=response["Name"])
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal("One or more resources can't be found.")
# Check that a dataset that does not exist errors
with pytest.raises(ClientError) as exc:
client.delete_dataset(Name=response["Name"])
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal("One or more resources can't be found.")
@mock_databrew
def test_update_dataset():
client = _create_databrew_client()
response = _create_test_dataset(client)
# Update the dataset and check response
dataset = client.update_dataset(
Name=response["Name"],
Format="TEST",
Input={
"S3InputDefinition": {
"Bucket": "somerandombucketname",
},
"DataCatalogInputDefinition": {
"DatabaseName": "somedbname",
"TableName": "sometablename",
"TempDirectory": {
"Bucket": "sometempbucketname",
},
},
"DatabaseInputDefinition": {
"GlueConnectionName": "someglueconnectionname",
"TempDirectory": {
"Bucket": "sometempbucketname",
},
},
},
)
dataset["Name"].should.equal(response["Name"])
# Describe the dataset and check the changes
dataset = client.describe_dataset(Name=response["Name"])
dataset["Name"].should.equal(response["Name"])
dataset["Format"].should.equal("TEST")
@mock_databrew
def test_update_dataset_that_does_not_exist():
client = _create_databrew_client()
# Update the dataset and check response
with pytest.raises(ClientError) as exc:
client.update_dataset(
Name="RANDOMNAME",
Format="TEST",
Input={
"S3InputDefinition": {
"Bucket": "somerandombucketname",
},
"DataCatalogInputDefinition": {
"DatabaseName": "somedbname",
"TableName": "sometablename",
"TempDirectory": {
"Bucket": "sometempbucketname",
},
},
"DatabaseInputDefinition": {
"GlueConnectionName": "someglueconnectionname",
"TempDirectory": {
"Bucket": "sometempbucketname",
},
},
},
)
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal("One or more resources can't be found.")