moto/tests/test_swf/test_workflow_executions.py

103 lines
3.6 KiB
Python

import boto
from sure import expect
from moto import mock_swf
from moto.swf.exceptions import (
SWFWorkflowExecutionAlreadyStartedFault,
SWFTypeDeprecatedFault,
SWFUnknownResourceFault,
)
# Utils
@mock_swf
def setup_swf_environment():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60", description="A test domain")
conn.register_workflow_type(
"test-domain", "test-workflow", "v1.0",
task_list="queue", default_child_policy="TERMINATE",
default_execution_start_to_close_timeout="300",
default_task_start_to_close_timeout="300",
)
conn.register_activity_type("test-domain", "test-activity", "v1.1")
return conn
# StartWorkflowExecution endpoint
@mock_swf
def test_start_workflow_execution():
conn = setup_swf_environment()
wf = conn.start_workflow_execution("test-domain", "uid-abcd1234", "test-workflow", "v1.0")
wf.should.contain("runId")
@mock_swf
def test_start_already_started_workflow_execution():
conn = setup_swf_environment()
conn.start_workflow_execution("test-domain", "uid-abcd1234", "test-workflow", "v1.0")
conn.start_workflow_execution.when.called_with(
"test-domain", "uid-abcd1234", "test-workflow", "v1.0"
).should.throw(SWFWorkflowExecutionAlreadyStartedFault)
@mock_swf
def test_start_workflow_execution_on_deprecated_type():
conn = setup_swf_environment()
conn.deprecate_workflow_type("test-domain", "test-workflow", "v1.0")
conn.start_workflow_execution.when.called_with(
"test-domain", "uid-abcd1234", "test-workflow", "v1.0"
).should.throw(SWFTypeDeprecatedFault)
# DescribeWorkflowExecution endpoint
@mock_swf
def test_describe_workflow_execution():
conn = setup_swf_environment()
hsh = conn.start_workflow_execution("test-domain", "uid-abcd1234", "test-workflow", "v1.0")
run_id = hsh["runId"]
wfe = conn.describe_workflow_execution("test-domain", run_id, "uid-abcd1234")
wfe["executionInfo"]["execution"]["workflowId"].should.equal("uid-abcd1234")
wfe["executionInfo"]["executionStatus"].should.equal("OPEN")
@mock_swf
def test_describe_non_existent_workflow_execution():
conn = setup_swf_environment()
conn.describe_workflow_execution.when.called_with(
"test-domain", "wrong-run-id", "wrong-workflow-id"
).should.throw(SWFUnknownResourceFault)
# GetWorkflowExecutionHistory endpoint
@mock_swf
def test_get_workflow_execution_history():
conn = setup_swf_environment()
hsh = conn.start_workflow_execution("test-domain", "uid-abcd1234", "test-workflow", "v1.0")
run_id = hsh["runId"]
resp = conn.get_workflow_execution_history("test-domain", run_id, "uid-abcd1234")
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal(["WorkflowExecutionStarted", "DecisionTaskScheduled"])
@mock_swf
def test_get_workflow_execution_history_with_reverse_order():
conn = setup_swf_environment()
hsh = conn.start_workflow_execution("test-domain", "uid-abcd1234", "test-workflow", "v1.0")
run_id = hsh["runId"]
resp = conn.get_workflow_execution_history("test-domain", run_id, "uid-abcd1234",
reverse_order=True)
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal(["DecisionTaskScheduled", "WorkflowExecutionStarted"])
@mock_swf
def test_get_workflow_execution_history_on_non_existent_workflow_execution():
conn = setup_swf_environment()
conn.get_workflow_execution_history.when.called_with(
"test-domain", "wrong-run-id", "wrong-workflow-id"
).should.throw(SWFUnknownResourceFault)