Add a Domain to WorkflowExecution objects

This will be needed later for finding an activity type for instance.
This commit is contained in:
Jean-Baptiste Barth 2015-10-25 11:30:11 +01:00
parent fa4608be98
commit 53630dc061
4 changed files with 50 additions and 29 deletions

View File

@ -148,7 +148,7 @@ class SWFBackend(BaseBackend):
wf_type = domain.get_type("workflow", workflow_name, workflow_version)
if wf_type.status == "DEPRECATED":
raise SWFTypeDeprecatedFault(wf_type)
wfe = WorkflowExecution(wf_type, workflow_id,
wfe = WorkflowExecution(domain, wf_type, workflow_id,
tag_list=tag_list, **kwargs)
domain.add_workflow_execution(wfe)
wfe.start()

View File

@ -39,7 +39,8 @@ class WorkflowExecution(object):
"CancelWorkflowExecution"
]
def __init__(self, workflow_type, workflow_id, **kwargs):
def __init__(self, domain, workflow_type, workflow_id, **kwargs):
self.domain = domain
self.workflow_id = workflow_id
self.run_id = uuid.uuid4().hex
# WorkflowExecutionInfo

View File

@ -14,7 +14,18 @@ from moto.swf.exceptions import (
SWFDefaultUndefinedFault,
)
from .utils import get_basic_workflow_type
from .utils import (
get_basic_domain,
get_basic_workflow_type,
)
# Some utility methods
# TODO: move them in utils
def make_workflow_execution(**kwargs):
domain = get_basic_domain()
wft = get_basic_workflow_type()
return WorkflowExecution(domain, wft, "ab1234", **kwargs)
# Domain
@ -87,13 +98,19 @@ def test_type_string_representation():
# WorkflowExecution
def test_workflow_execution_creation():
domain = get_basic_domain()
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234", child_policy="TERMINATE")
wfe = WorkflowExecution(domain, wft, "ab1234", child_policy="TERMINATE")
wfe.domain.should.equal(domain)
wfe.workflow_type.should.equal(wft)
wfe.child_policy.should.equal("TERMINATE")
def test_workflow_execution_creation_child_policy_logic():
domain = get_basic_domain()
WorkflowExecution(
domain,
WorkflowType(
"test-workflow", "v1.0",
task_list="queue", default_child_policy="ABANDON",
@ -104,6 +121,7 @@ def test_workflow_execution_creation_child_policy_logic():
).child_policy.should.equal("ABANDON")
WorkflowExecution(
domain,
WorkflowType(
"test-workflow", "v1.0", task_list="queue",
default_execution_start_to_close_timeout="300",
@ -114,41 +132,44 @@ def test_workflow_execution_creation_child_policy_logic():
).child_policy.should.equal("REQUEST_CANCEL")
WorkflowExecution.when.called_with(
domain,
WorkflowType("test-workflow", "v1.0"), "ab1234"
).should.throw(SWFDefaultUndefinedFault)
def test_workflow_execution_string_representation():
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234", child_policy="TERMINATE")
wfe = make_workflow_execution(child_policy="TERMINATE")
str(wfe).should.match(r"^WorkflowExecution\(run_id: .*\)")
def test_workflow_execution_generates_a_random_run_id():
domain = get_basic_domain()
wft = get_basic_workflow_type()
wfe1 = WorkflowExecution(wft, "ab1234", child_policy="TERMINATE")
wfe2 = WorkflowExecution(wft, "ab1235", child_policy="TERMINATE")
wfe1 = WorkflowExecution(domain, wft, "ab1234", child_policy="TERMINATE")
wfe2 = WorkflowExecution(domain, wft, "ab1235", child_policy="TERMINATE")
wfe1.run_id.should_not.equal(wfe2.run_id)
def test_workflow_execution_short_dict_representation():
domain = get_basic_domain()
wf_type = WorkflowType(
"test-workflow", "v1.0",
task_list="queue", default_child_policy="ABANDON",
default_execution_start_to_close_timeout="300",
default_task_start_to_close_timeout="300",
)
wfe = WorkflowExecution(wf_type, "ab1234")
wfe = WorkflowExecution(domain, wf_type, "ab1234")
sd = wfe.to_short_dict()
sd["workflowId"].should.equal("ab1234")
sd.should.contain("runId")
def test_workflow_execution_medium_dict_representation():
domain = get_basic_domain()
wf_type = WorkflowType(
"test-workflow", "v1.0",
task_list="queue", default_child_policy="ABANDON",
default_execution_start_to_close_timeout="300",
default_task_start_to_close_timeout="300",
)
wfe = WorkflowExecution(wf_type, "ab1234")
wfe = WorkflowExecution(domain, wf_type, "ab1234")
md = wfe.to_medium_dict()
md["execution"].should.equal(wfe.to_short_dict())
@ -163,13 +184,14 @@ def test_workflow_execution_medium_dict_representation():
md["tagList"].should.equal(["foo", "bar", "baz"])
def test_workflow_execution_full_dict_representation():
domain = get_basic_domain()
wf_type = WorkflowType(
"test-workflow", "v1.0",
task_list="queue", default_child_policy="ABANDON",
default_execution_start_to_close_timeout="300",
default_task_start_to_close_timeout="300",
)
wfe = WorkflowExecution(wf_type, "ab1234")
wfe = WorkflowExecution(domain, wf_type, "ab1234")
fd = wfe.to_full_dict()
fd["executionInfo"].should.equal(wfe.to_medium_dict())
@ -184,15 +206,13 @@ def test_workflow_execution_full_dict_representation():
})
def test_workflow_execution_schedule_decision_task():
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234")
wfe = make_workflow_execution()
wfe.open_counts["openDecisionTasks"].should.equal(0)
wfe.schedule_decision_task()
wfe.open_counts["openDecisionTasks"].should.equal(1)
def test_workflow_execution_start_decision_task():
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234")
wfe = make_workflow_execution()
wfe.schedule_decision_task()
dt = wfe.decision_tasks[0]
wfe.start_decision_task(dt.task_token, identity="srv01")
@ -202,8 +222,7 @@ def test_workflow_execution_start_decision_task():
wfe.events()[-1].identity.should.equal("srv01")
def test_workflow_execution_history_events_ids():
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234")
wfe = make_workflow_execution()
wfe._add_event("WorkflowExecutionStarted", workflow_execution=wfe)
wfe._add_event("DecisionTaskScheduled", workflow_execution=wfe)
wfe._add_event("DecisionTaskStarted", workflow_execution=wfe, scheduled_event_id=2)
@ -212,8 +231,7 @@ def test_workflow_execution_history_events_ids():
@freeze_time("2015-01-01 12:00:00")
def test_workflow_execution_start():
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234")
wfe = make_workflow_execution()
wfe.events().should.equal([])
wfe.start()
@ -224,8 +242,7 @@ def test_workflow_execution_start():
@freeze_time("2015-01-02 12:00:00")
def test_workflow_execution_complete():
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234")
wfe = make_workflow_execution()
wfe.complete(123, result="foo")
wfe.execution_status.should.equal("CLOSED")
@ -237,8 +254,7 @@ def test_workflow_execution_complete():
@freeze_time("2015-01-02 12:00:00")
def test_workflow_execution_fail():
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234")
wfe = make_workflow_execution()
wfe.fail(123, details="some details", reason="my rules")
wfe.execution_status.should.equal("CLOSED")
@ -278,8 +294,7 @@ def test_history_event_breaks_on_initialization_if_not_implemented():
# DecisionTask
def test_decision_task_creation():
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234")
wfe = make_workflow_execution()
dt = DecisionTask(wfe, 123)
dt.workflow_execution.should.equal(wfe)
dt.state.should.equal("SCHEDULED")
@ -287,8 +302,8 @@ def test_decision_task_creation():
dt.started_event_id.should.be.none
def test_decision_task_full_dict_representation():
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234")
wfe = make_workflow_execution()
wft = wfe.workflow_type
dt = DecisionTask(wfe, 123)
fd = dt.to_full_dict()
@ -306,8 +321,7 @@ def test_decision_task_full_dict_representation():
# ActivityTask
def test_activity_task_creation():
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234")
wfe = make_workflow_execution()
task = ActivityTask(
activity_id="my-activity-123",
activity_type="foo",

View File

@ -1,8 +1,14 @@
from moto.swf.models import (
Domain,
WorkflowType,
)
# A test Domain
def get_basic_domain():
return Domain("test-domain", "90")
# A generic test WorkflowType
def _generic_workflow_type_attributes():
return [