Merge pull request #419 from spulec/data-pipeline-improvements

Data pipeline improvements: Add support to ListPipelines & change CloudFormation to support Data Pipelines
This commit is contained in:
Steve Pulec 2015-09-16 18:08:45 -04:00
commit a7e23e45fd
7 changed files with 172 additions and 2 deletions

View File

@ -4,6 +4,7 @@ import functools
import logging
from moto.autoscaling import models as autoscaling_models
from moto.datapipeline import models as datapipeline_models
from moto.ec2 import models as ec2_models
from moto.elb import models as elb_models
from moto.iam import models as iam_models
@ -36,6 +37,7 @@ MODEL_MAP = {
"AWS::EC2::VPCGatewayAttachment": ec2_models.VPCGatewayAttachment,
"AWS::EC2::VPCPeeringConnection": ec2_models.VPCPeeringConnection,
"AWS::ElasticLoadBalancing::LoadBalancer": elb_models.FakeLoadBalancer,
"AWS::DataPipeline::Pipeline": datapipeline_models.Pipeline,
"AWS::IAM::InstanceProfile": iam_models.InstanceProfile,
"AWS::IAM::Role": iam_models.Role,
"AWS::RDS::DBInstance": rds_models.Database,

View File

@ -3,7 +3,7 @@ from __future__ import unicode_literals
import datetime
import boto.datapipeline
from moto.core import BaseBackend
from .utils import get_random_pipeline_id
from .utils import get_random_pipeline_id, remove_capitalization_of_dict_keys
class PipelineObject(object):
@ -73,12 +73,25 @@ class Pipeline(object):
def set_pipeline_objects(self, pipeline_objects):
self.objects = [
PipelineObject(pipeline_object['id'], pipeline_object['name'], pipeline_object['fields'])
for pipeline_object in pipeline_objects
for pipeline_object in remove_capitalization_of_dict_keys(pipeline_objects)
]
def activate(self):
self.status = "SCHEDULED"
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
datapipeline_backend = datapipeline_backends[region_name]
properties = cloudformation_json["Properties"]
cloudformation_unique_id = "cf-" + properties["Name"]
pipeline = datapipeline_backend.create_pipeline(properties["Name"], cloudformation_unique_id)
datapipeline_backend.put_pipeline_definition(pipeline.pipeline_id, properties["PipelineObjects"])
if properties["Activate"]:
pipeline.activate()
return pipeline
class DataPipelineBackend(BaseBackend):
@ -90,6 +103,9 @@ class DataPipelineBackend(BaseBackend):
self.pipelines[pipeline.pipeline_id] = pipeline
return pipeline
def list_pipelines(self):
return self.pipelines.values()
def describe_pipelines(self, pipeline_ids):
pipelines = [pipeline for pipeline in self.pipelines.values() if pipeline.pipeline_id in pipeline_ids]
return pipelines

View File

@ -28,6 +28,16 @@ class DataPipelineResponse(BaseResponse):
"pipelineId": pipeline.pipeline_id,
})
def list_pipelines(self):
pipelines = self.datapipeline_backend.list_pipelines()
return json.dumps({
"HasMoreResults": False,
"Marker": None,
"PipelineIdList": [
{"Id": pipeline.pipeline_id, "Name": pipeline.name} for pipeline in pipelines
]
})
def describe_pipelines(self):
pipeline_ids = self.parameters["pipelineIds"]
pipelines = self.datapipeline_backend.describe_pipelines(pipeline_ids)

View File

@ -1,5 +1,23 @@
import collections
import six
from moto.core.utils import get_random_hex
def get_random_pipeline_id():
return "df-{0}".format(get_random_hex(length=19))
def remove_capitalization_of_dict_keys(obj):
if isinstance(obj, collections.Mapping):
result = obj.__class__()
for key, value in obj.items():
normalized_key = key[:1].lower() + key[1:]
result[normalized_key] = remove_capitalization_of_dict_keys(value)
return result
elif isinstance(obj, collections.Iterable) and not isinstance(obj, six.string_types):
result = obj.__class__()
for item in obj:
result += (remove_capitalization_of_dict_keys(item),)
return result
else:
return obj

View File

@ -6,3 +6,4 @@ coverage
freezegun
flask
boto3
six

View File

@ -3,6 +3,7 @@ import json
import boto
import boto.cloudformation
import boto.datapipeline
import boto.ec2
import boto.ec2.autoscale
import boto.ec2.elb
@ -17,6 +18,7 @@ import sure # noqa
from moto import (
mock_autoscaling,
mock_cloudformation,
mock_datapipeline,
mock_ec2,
mock_elb,
mock_iam,
@ -1395,3 +1397,79 @@ def test_subnets_should_be_created_with_availability_zone():
)
subnet = vpc_conn.get_all_subnets(filters={'cidrBlock': '10.0.0.0/24'})[0]
subnet.availability_zone.should.equal('us-west-1b')
@mock_cloudformation
@mock_datapipeline
def test_datapipeline():
dp_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"dataPipeline": {
"Properties": {
"Activate": "true",
"Name": "testDataPipeline",
"PipelineObjects": [
{
"Fields": [
{
"Key": "failureAndRerunMode",
"StringValue": "CASCADE"
},
{
"Key": "scheduleType",
"StringValue": "cron"
},
{
"Key": "schedule",
"RefValue": "DefaultSchedule"
},
{
"Key": "pipelineLogUri",
"StringValue": "s3://bucket/logs"
},
{
"Key": "type",
"StringValue": "Default"
},
],
"Id": "Default",
"Name": "Default"
},
{
"Fields": [
{
"Key": "startDateTime",
"StringValue": "1970-01-01T01:00:00"
},
{
"Key": "period",
"StringValue": "1 Day"
},
{
"Key": "type",
"StringValue": "Schedule"
}
],
"Id": "DefaultSchedule",
"Name": "RunOnce"
}
],
"PipelineTags": []
},
"Type": "AWS::DataPipeline::Pipeline"
}
}
}
cf_conn = boto.cloudformation.connect_to_region("us-east-1")
template_json = json.dumps(dp_template)
cf_conn.create_stack(
"test_stack",
template_body=template_json,
)
dp_conn = boto.datapipeline.connect_to_region('us-east-1')
data_pipelines = dp_conn.list_pipelines()
data_pipelines['PipelineIdList'].should.have.length_of(1)
data_pipelines['PipelineIdList'][0]['Name'].should.equal('testDataPipeline')

View File

@ -4,6 +4,7 @@ import boto.datapipeline
import sure # noqa
from moto import mock_datapipeline
from moto.datapipeline.utils import remove_capitalization_of_dict_keys
def get_value_from_fields(key, fields):
@ -130,3 +131,47 @@ def test_activate_pipeline():
fields = pipeline_description['Fields']
get_value_from_fields('@pipelineState', fields).should.equal("SCHEDULED")
@mock_datapipeline
def test_listing_pipelines():
conn = boto.datapipeline.connect_to_region("us-west-2")
res1 = conn.create_pipeline("mypipeline1", "some-unique-id1")
res2 = conn.create_pipeline("mypipeline2", "some-unique-id2")
pipeline_id1 = res1["pipelineId"]
pipeline_id2 = res2["pipelineId"]
response = conn.list_pipelines()
response["HasMoreResults"].should.be(False)
response["Marker"].should.be.none
response["PipelineIdList"].should.have.length_of(2)
response["PipelineIdList"].should.contain({
"Id": res1["pipelineId"],
"Name": "mypipeline1",
})
response["PipelineIdList"].should.contain({
"Id": res2["pipelineId"],
"Name": "mypipeline2"
})
# testing a helper function
def test_remove_capitalization_of_dict_keys():
result = remove_capitalization_of_dict_keys(
{
"Id": "IdValue",
"Fields": [{
"Key": "KeyValue",
"StringValue": "StringValueValue"
}]
}
)
result.should.equal({
"id": "IdValue",
"fields": [{
"key": "KeyValue",
"stringValue": "StringValueValue"
}],
})