From 95169c6011e4a887f86b8e5c4c6df492ace5c1a9 Mon Sep 17 00:00:00 2001 From: Steve Pulec Date: Wed, 16 Sep 2015 10:00:38 -0400 Subject: [PATCH] First version of datapipelines. --- moto/__init__.py | 1 + moto/backends.py | 2 + moto/datapipeline/__init__.py | 12 ++ moto/datapipeline/models.py | 122 ++++++++++++++++++ moto/datapipeline/responses.py | 68 ++++++++++ moto/datapipeline/urls.py | 10 ++ moto/datapipeline/utils.py | 5 + tests/test_datapipeline/test_datapipeline.py | 126 +++++++++++++++++++ tests/test_datapipeline/test_server.py | 25 ++++ 9 files changed, 371 insertions(+) create mode 100644 moto/datapipeline/__init__.py create mode 100644 moto/datapipeline/models.py create mode 100644 moto/datapipeline/responses.py create mode 100644 moto/datapipeline/urls.py create mode 100644 moto/datapipeline/utils.py create mode 100644 tests/test_datapipeline/test_datapipeline.py create mode 100644 tests/test_datapipeline/test_server.py diff --git a/moto/__init__.py b/moto/__init__.py index e5a94fd80..2dac1f683 100644 --- a/moto/__init__.py +++ b/moto/__init__.py @@ -8,6 +8,7 @@ __version__ = '0.4.12' from .autoscaling import mock_autoscaling # flake8: noqa from .cloudformation import mock_cloudformation # flake8: noqa from .cloudwatch import mock_cloudwatch # flake8: noqa +from .datapipeline import mock_datapipeline # flake8: noqa from .dynamodb import mock_dynamodb # flake8: noqa from .dynamodb2 import mock_dynamodb2 # flake8: noqa from .ec2 import mock_ec2 # flake8: noqa diff --git a/moto/backends.py b/moto/backends.py index b46d56c06..cb040ab93 100644 --- a/moto/backends.py +++ b/moto/backends.py @@ -2,6 +2,7 @@ from __future__ import unicode_literals from moto.autoscaling import autoscaling_backend from moto.cloudwatch import cloudwatch_backend from moto.cloudformation import cloudformation_backend +from moto.datapipeline import datapipeline_backend from moto.dynamodb import dynamodb_backend from moto.dynamodb2 import dynamodb_backend2 from moto.ec2 import ec2_backend @@ -25,6 +26,7 @@ BACKENDS = { 'autoscaling': autoscaling_backend, 'cloudformation': cloudformation_backend, 'cloudwatch': cloudwatch_backend, + 'datapipeline': datapipeline_backend, 'dynamodb': dynamodb_backend, 'dynamodb2': dynamodb_backend2, 'ec2': ec2_backend, diff --git a/moto/datapipeline/__init__.py b/moto/datapipeline/__init__.py new file mode 100644 index 000000000..dcfe2f427 --- /dev/null +++ b/moto/datapipeline/__init__.py @@ -0,0 +1,12 @@ +from __future__ import unicode_literals +from .models import datapipeline_backends +from ..core.models import MockAWS + +datapipeline_backend = datapipeline_backends['us-east-1'] + + +def mock_datapipeline(func=None): + if func: + return MockAWS(datapipeline_backends)(func) + else: + return MockAWS(datapipeline_backends) diff --git a/moto/datapipeline/models.py b/moto/datapipeline/models.py new file mode 100644 index 000000000..393218558 --- /dev/null +++ b/moto/datapipeline/models.py @@ -0,0 +1,122 @@ +from __future__ import unicode_literals + +import datetime +import boto.datapipeline +from moto.core import BaseBackend +from .utils import get_random_pipeline_id + + +class PipelineObject(object): + def __init__(self, object_id, name, fields): + self.object_id = object_id + self.name = name + self.fields = fields + + def to_json(self): + return { + "Fields": self.fields, + "Id": self.object_id, + "Name": self.name, + } + + +class Pipeline(object): + def __init__(self, name, unique_id): + self.name = name + self.unique_id = unique_id + self.description = "" + self.pipeline_id = get_random_pipeline_id() + self.creation_time = datetime.datetime.utcnow() + self.objects = [] + + def to_json(self): + return { + "Description": self.description, + "Fields": [{ + "key": "@pipelineState", + "stringValue": "SCHEDULED" + }, { + "key": "description", + "stringValue": self.description + }, { + "key": "name", + "stringValue": self.name + }, { + "key": "@creationTime", + "stringValue": datetime.datetime.strftime(self.creation_time, '%Y-%m-%dT%H-%M-%S'), + }, { + "key": "@id", + "stringValue": self.pipeline_id, + }, { + "key": "@sphere", + "stringValue": "PIPELINE" + }, { + "key": "@version", + "stringValue": "1" + }, { + "key": "@userId", + "stringValue": "924374875933" + }, { + "key": "@accountId", + "stringValue": "924374875933" + }, { + "key": "uniqueId", + "stringValue": self.unique_id + }], + "Name": self.name, + "PipelineId": self.pipeline_id, + "Tags": [ + ] + } + + def set_pipeline_objects(self, pipeline_objects): + self.objects = [ + PipelineObject(pipeline_object['id'], pipeline_object['name'], pipeline_object['fields']) + for pipeline_object in pipeline_objects + ] + + def activate(self): + pass + + +class DataPipelineBackend(BaseBackend): + + def __init__(self): + self.pipelines = {} + + def create_pipeline(self, name, unique_id): + pipeline = Pipeline(name, unique_id) + self.pipelines[pipeline.pipeline_id] = pipeline + return pipeline + + def describe_pipelines(self, pipeline_ids): + pipelines = [pipeline for pipeline in self.pipelines.values() if pipeline.pipeline_id in pipeline_ids] + return pipelines + + def get_pipeline(self, pipeline_id): + return self.pipelines[pipeline_id] + + def put_pipeline_definition(self, pipeline_id, pipeline_objects): + pipeline = self.get_pipeline(pipeline_id) + pipeline.set_pipeline_objects(pipeline_objects) + + def get_pipeline_definition(self, pipeline_id): + pipeline = self.get_pipeline(pipeline_id) + return pipeline.objects + + def describe_objects(self, object_ids, pipeline_id): + pipeline = self.get_pipeline(pipeline_id) + pipeline_objects = [ + pipeline_object for pipeline_object in pipeline.objects + if pipeline_object.object_id in object_ids + ] + return pipeline_objects + + def activate_pipeline(self, pipeline_id): + pipeline = self.get_pipeline(pipeline_id) + pipeline.activate() + + +datapipeline_backends = {} +for region in boto.datapipeline.regions(): + datapipeline_backends[region.name] = DataPipelineBackend() diff --git a/moto/datapipeline/responses.py b/moto/datapipeline/responses.py new file mode 100644 index 000000000..a61ec2514 --- /dev/null +++ b/moto/datapipeline/responses.py @@ -0,0 +1,68 @@ +from __future__ import unicode_literals + +import json + +from moto.core.responses import BaseResponse +from .models import datapipeline_backends + + +class DataPipelineResponse(BaseResponse): + + @property + def parameters(self): + return json.loads(self.body.decode("utf-8")) + + @property + def datapipeline_backend(self): + return datapipeline_backends[self.region] + + def create_pipeline(self): + name = self.parameters['name'] + unique_id = self.parameters['uniqueId'] + pipeline = self.datapipeline_backend.create_pipeline(name, unique_id) + return json.dumps({ + "pipelineId": pipeline.pipeline_id, + }) + + def describe_pipelines(self): + pipeline_ids = self.parameters["pipelineIds"] + pipelines = self.datapipeline_backend.describe_pipelines(pipeline_ids) + + return json.dumps({ + "PipelineDescriptionList": [ + pipeline.to_json() for pipeline in pipelines + ] + }) + + def put_pipeline_definition(self): + pipeline_id = self.parameters["pipelineId"] + pipeline_objects = self.parameters["pipelineObjects"] + + self.datapipeline_backend.put_pipeline_definition(pipeline_id, pipeline_objects) + return json.dumps({"errored": False}) + + def get_pipeline_definition(self): + pipeline_id = self.parameters["pipelineId"] + pipeline_definition = self.datapipeline_backend.get_pipeline_definition(pipeline_id) + return json.dumps({ + "pipelineObjects": [pipeline_object.to_json() for pipeline_object in pipeline_definition] + }) + + def describe_objects(self): + pipeline_id = self.parameters["pipelineId"] + object_ids = self.parameters["objectIds"] + + pipeline_objects = self.datapipeline_backend.describe_objects(object_ids, pipeline_id) + + return json.dumps({ + "HasMoreResults": False, + "Marker": None, + "PipelineObjects": [ + pipeline_object.to_json() for pipeline_object in pipeline_objects + ] + }) + + def activate_pipeline(self): + pipeline_id = self.parameters["pipelineId"] + self.datapipeline_backend.activate_pipeline(pipeline_id) + return json.dumps({}) diff --git a/moto/datapipeline/urls.py b/moto/datapipeline/urls.py new file mode 100644 index 000000000..40805874b --- /dev/null +++ b/moto/datapipeline/urls.py @@ -0,0 +1,10 @@ +from __future__ import unicode_literals +from .responses import DataPipelineResponse + +url_bases = [ + "https?://datapipeline.(.+).amazonaws.com", +] + +url_paths = { + '{0}/$': DataPipelineResponse.dispatch, +} diff --git a/moto/datapipeline/utils.py b/moto/datapipeline/utils.py new file mode 100644 index 000000000..7de9d2732 --- /dev/null +++ b/moto/datapipeline/utils.py @@ -0,0 +1,5 @@ +from moto.core.utils import get_random_hex + + +def get_random_pipeline_id(): + return "df-{0}".format(get_random_hex(length=19)) diff --git a/tests/test_datapipeline/test_datapipeline.py b/tests/test_datapipeline/test_datapipeline.py new file mode 100644 index 000000000..e48606780 --- /dev/null +++ b/tests/test_datapipeline/test_datapipeline.py @@ -0,0 +1,126 @@ +from __future__ import unicode_literals + +import boto.datapipeline +import sure # noqa + +from moto import mock_datapipeline + + +@mock_datapipeline +def test_create_pipeline(): + conn = boto.datapipeline.connect_to_region("us-west-2") + + res = conn.create_pipeline("mypipeline", "some-unique-id") + + pipeline_id = res["pipelineId"] + pipeline_descriptions = conn.describe_pipelines([pipeline_id])["PipelineDescriptionList"] + pipeline_descriptions.should.have.length_of(1) + + pipeline_description = pipeline_descriptions[0] + pipeline_description['Name'].should.equal("mypipeline") + pipeline_description["PipelineId"].should.equal(pipeline_id) + fields = pipeline_description['Fields'] + + def get_value_from_fields(key, fields): + for field in fields: + if field['key'] == key: + return field['stringValue'] + + get_value_from_fields('@pipelineState', fields).should.equal("SCHEDULED") + get_value_from_fields('uniqueId', fields).should.equal("some-unique-id") + + +PIPELINE_OBJECTS = [ + { + "id": "Default", + "name": "Default", + "fields": [{ + "key": "workerGroup", + "stringValue": "workerGroup" + }] + }, + { + "id": "Schedule", + "name": "Schedule", + "fields": [{ + "key": "startDateTime", + "stringValue": "2012-12-12T00:00:00" + }, { + "key": "type", + "stringValue": "Schedule" + }, { + "key": "period", + "stringValue": "1 hour" + }, { + "key": "endDateTime", + "stringValue": "2012-12-21T18:00:00" + }] + }, + { + "id": "SayHello", + "name": "SayHello", + "fields": [{ + "key": "type", + "stringValue": "ShellCommandActivity" + }, { + "key": "command", + "stringValue": "echo hello" + }, { + "key": "parent", + "refValue": "Default" + }, { + "key": "schedule", + "refValue": "Schedule" + }] + } +] + + +@mock_datapipeline +def test_creating_pipeline_definition(): + conn = boto.datapipeline.connect_to_region("us-west-2") + res = conn.create_pipeline("mypipeline", "some-unique-id") + pipeline_id = res["pipelineId"] + + conn.put_pipeline_definition(PIPELINE_OBJECTS, pipeline_id) + + pipeline_definition = conn.get_pipeline_definition(pipeline_id) + pipeline_definition['pipelineObjects'].should.have.length_of(3) + default_object = pipeline_definition['pipelineObjects'][0] + default_object['Name'].should.equal("Default") + default_object['Id'].should.equal("Default") + default_object['Fields'].should.equal([{ + "key": "workerGroup", + "stringValue": "workerGroup" + }]) + + +@mock_datapipeline +def test_describing_pipeline_objects(): + conn = boto.datapipeline.connect_to_region("us-west-2") + res = conn.create_pipeline("mypipeline", "some-unique-id") + pipeline_id = res["pipelineId"] + + conn.put_pipeline_definition(PIPELINE_OBJECTS, pipeline_id) + + objects = conn.describe_objects(["Schedule", "Default"], pipeline_id)['PipelineObjects'] + + objects.should.have.length_of(2) + default_object = [x for x in objects if x['Id'] == 'Default'][0] + default_object['Name'].should.equal("Default") + default_object['Fields'].should.equal([{ + "key": "workerGroup", + "stringValue": "workerGroup" + }]) + + +@mock_datapipeline +def test_activate_pipeline(): + conn = boto.datapipeline.connect_to_region("us-west-2") + + res = conn.create_pipeline("mypipeline", "some-unique-id") + + pipeline_id = res["pipelineId"] + conn.activate_pipeline(pipeline_id) + + # TODO what do we need to assert here. Change in pipeline status? diff --git a/tests/test_datapipeline/test_server.py b/tests/test_datapipeline/test_server.py new file mode 100644 index 000000000..0ff03aa8d --- /dev/null +++ b/tests/test_datapipeline/test_server.py @@ -0,0 +1,25 @@ +from __future__ import unicode_literals + +import json +import sure # noqa + +import moto.server as server +from moto import mock_datapipeline + +''' +Test the different server responses +''' + + +@mock_datapipeline +def test_list_streams(): + backend = server.create_backend_app("datapipeline") + test_client = backend.test_client() + + res = test_client.get('/?Action=ListStreams') + + json_data = json.loads(res.data.decode("utf-8")) + json_data.should.equal({ + "HasMoreStreams": False, + "StreamNames": [], + })