DataPipeline - duplicate deprecated tests (#4333)

This commit is contained in:
Bert Blommers 2021-09-23 13:28:27 +00:00 committed by GitHub
parent 8f8ea45d32
commit 682c7350b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,8 +1,10 @@
from __future__ import unicode_literals
import boto.datapipeline
import boto3
import sure # noqa
from moto import mock_datapipeline
from moto import mock_datapipeline_deprecated
from moto.datapipeline.utils import remove_capitalization_of_dict_keys
@ -13,6 +15,7 @@ def get_value_from_fields(key, fields):
return field["stringValue"]
# Has boto3 equivalent
@mock_datapipeline_deprecated
def test_create_pipeline():
conn = boto.datapipeline.connect_to_region("us-west-2")
@ -34,6 +37,27 @@ def test_create_pipeline():
get_value_from_fields("uniqueId", fields).should.equal("some-unique-id")
@mock_datapipeline
def test_create_pipeline_boto3():
conn = boto3.client("datapipeline", region_name="us-west-2")
res = conn.create_pipeline(name="mypipeline", uniqueId="some-unique-id")
pipeline_id = res["pipelineId"]
pipeline_descriptions = conn.describe_pipelines(pipelineIds=[pipeline_id])[
"pipelineDescriptionList"
]
pipeline_descriptions.should.have.length_of(1)
pipeline_description = pipeline_descriptions[0]
pipeline_description["name"].should.equal("mypipeline")
pipeline_description["pipelineId"].should.equal(pipeline_id)
fields = pipeline_description["fields"]
get_value_from_fields("@pipelineState", fields).should.equal("PENDING")
get_value_from_fields("uniqueId", fields).should.equal("some-unique-id")
PIPELINE_OBJECTS = [
{
"id": "Default",
@ -63,6 +87,7 @@ PIPELINE_OBJECTS = [
]
# Has boto3 equivalent
@mock_datapipeline_deprecated
def test_creating_pipeline_definition():
conn = boto.datapipeline.connect_to_region("us-west-2")
@ -81,6 +106,27 @@ def test_creating_pipeline_definition():
)
@mock_datapipeline
def test_creating_pipeline_definition_boto3():
conn = boto3.client("datapipeline", region_name="us-west-2")
res = conn.create_pipeline(name="mypipeline", uniqueId="some-unique-id")
pipeline_id = res["pipelineId"]
conn.put_pipeline_definition(
pipelineId=pipeline_id, pipelineObjects=PIPELINE_OBJECTS
)
pipeline_definition = conn.get_pipeline_definition(pipelineId=pipeline_id)
pipeline_definition["pipelineObjects"].should.have.length_of(3)
default_object = pipeline_definition["pipelineObjects"][0]
default_object["name"].should.equal("Default")
default_object["id"].should.equal("Default")
default_object["fields"].should.equal(
[{"key": "workerGroup", "stringValue": "workerGroup"}]
)
# Has boto3 equivalent
@mock_datapipeline_deprecated
def test_describing_pipeline_objects():
conn = boto.datapipeline.connect_to_region("us-west-2")
@ -101,6 +147,29 @@ def test_describing_pipeline_objects():
)
@mock_datapipeline
def test_describing_pipeline_objects_boto3():
conn = boto3.client("datapipeline", region_name="us-west-2")
res = conn.create_pipeline(name="mypipeline", uniqueId="some-unique-id")
pipeline_id = res["pipelineId"]
conn.put_pipeline_definition(
pipelineId=pipeline_id, pipelineObjects=PIPELINE_OBJECTS
)
objects = conn.describe_objects(
pipelineId=pipeline_id, objectIds=["Schedule", "Default"]
)["pipelineObjects"]
objects.should.have.length_of(2)
default_object = [x for x in objects if x["id"] == "Default"][0]
default_object["name"].should.equal("Default")
default_object["fields"].should.equal(
[{"key": "workerGroup", "stringValue": "workerGroup"}]
)
# Has boto3 equivalent
@mock_datapipeline_deprecated
def test_activate_pipeline():
conn = boto.datapipeline.connect_to_region("us-west-2")
@ -120,6 +189,25 @@ def test_activate_pipeline():
get_value_from_fields("@pipelineState", fields).should.equal("SCHEDULED")
@mock_datapipeline
def test_activate_pipeline_boto3():
conn = boto3.client("datapipeline", region_name="us-west-2")
res = conn.create_pipeline(name="mypipeline", uniqueId="some-unique-id")
pipeline_id = res["pipelineId"]
conn.activate_pipeline(pipelineId=pipeline_id)
pipeline_descriptions = conn.describe_pipelines(pipelineIds=[pipeline_id])[
"pipelineDescriptionList"
]
pipeline_descriptions.should.have.length_of(1)
pipeline_description = pipeline_descriptions[0]
fields = pipeline_description["fields"]
get_value_from_fields("@pipelineState", fields).should.equal("SCHEDULED")
# Has boto3 equivalent
@mock_datapipeline_deprecated
def test_delete_pipeline():
conn = boto.datapipeline.connect_to_region("us-west-2")
@ -133,6 +221,20 @@ def test_delete_pipeline():
response["pipelineIdList"].should.have.length_of(0)
@mock_datapipeline
def test_delete_pipeline_boto3():
conn = boto3.client("datapipeline", region_name="us-west-2")
res = conn.create_pipeline(name="mypipeline", uniqueId="some-unique-id")
pipeline_id = res["pipelineId"]
conn.delete_pipeline(pipelineId=pipeline_id)
response = conn.list_pipelines()
response["pipelineIdList"].should.have.length_of(0)
# Has boto3 equivalent
@mock_datapipeline_deprecated
def test_listing_pipelines():
conn = boto.datapipeline.connect_to_region("us-west-2")
@ -152,6 +254,26 @@ def test_listing_pipelines():
)
@mock_datapipeline
def test_listing_pipelines_boto3():
conn = boto3.client("datapipeline", region_name="us-west-2")
res1 = conn.create_pipeline(name="mypipeline1", uniqueId="some-unique-id1")
res2 = conn.create_pipeline(name="mypipeline2", uniqueId="some-unique-id2")
response = conn.list_pipelines()
response["hasMoreResults"].should.be(False)
response.shouldnt.have.key("marker")
response["pipelineIdList"].should.have.length_of(2)
response["pipelineIdList"].should.contain(
{"id": res1["pipelineId"], "name": "mypipeline1"}
)
response["pipelineIdList"].should.contain(
{"id": res2["pipelineId"], "name": "mypipeline2"}
)
# Has boto3 equivalent
@mock_datapipeline_deprecated
def test_listing_paginated_pipelines():
conn = boto.datapipeline.connect_to_region("us-west-2")
@ -165,6 +287,19 @@ def test_listing_paginated_pipelines():
response["pipelineIdList"].should.have.length_of(50)
@mock_datapipeline
def test_listing_paginated_pipelines_boto3():
conn = boto3.client("datapipeline", region_name="us-west-2")
for i in range(100):
conn.create_pipeline(name="mypipeline%d" % i, uniqueId="some-unique-id%d" % i)
response = conn.list_pipelines()
response["hasMoreResults"].should.be(True)
response["marker"].should.equal(response["pipelineIdList"][-1]["id"])
response["pipelineIdList"].should.have.length_of(50)
# testing a helper function
def test_remove_capitalization_of_dict_keys():
result = remove_capitalization_of_dict_keys(