Improve workflow selection before raising a WorkflowExecutionAlreadyStartedFault
This commit is contained in:
parent
96d6bb056b
commit
804d2e91b5
@ -71,33 +71,32 @@ class Domain(object):
|
|||||||
|
|
||||||
def add_workflow_execution(self, workflow_execution):
|
def add_workflow_execution(self, workflow_execution):
|
||||||
_id = workflow_execution.workflow_id
|
_id = workflow_execution.workflow_id
|
||||||
# TODO: handle this better: this should raise ONLY if there's an OPEN wfe with this ID
|
if self.get_workflow_execution(_id, raise_if_none=False):
|
||||||
if any(wfe.workflow_id == _id for wfe in self.workflow_executions):
|
|
||||||
raise SWFWorkflowExecutionAlreadyStartedFault()
|
raise SWFWorkflowExecutionAlreadyStartedFault()
|
||||||
self.workflow_executions.append(workflow_execution)
|
self.workflow_executions.append(workflow_execution)
|
||||||
|
|
||||||
def get_workflow_execution(self, workflow_id, run_id=None, raise_if_closed=False):
|
def get_workflow_execution(self, workflow_id, run_id=None,
|
||||||
|
raise_if_none=True, raise_if_closed=False):
|
||||||
|
# query
|
||||||
if run_id:
|
if run_id:
|
||||||
_all = [w for w in self.workflow_executions
|
_all = [w for w in self.workflow_executions
|
||||||
if w.workflow_id == workflow_id and w.run_id == run_id]
|
if w.workflow_id == workflow_id and w.run_id == run_id]
|
||||||
else:
|
else:
|
||||||
_all = [w for w in self.workflow_executions
|
_all = [w for w in self.workflow_executions
|
||||||
if w.workflow_id == workflow_id and w.execution_status == "OPEN"]
|
if w.workflow_id == workflow_id and w.execution_status == "OPEN"]
|
||||||
|
# reduce
|
||||||
wfe = _all[0] if _all else None
|
wfe = _all[0] if _all else None
|
||||||
|
# raise if closed / none
|
||||||
if raise_if_closed and wfe and wfe.execution_status == "CLOSED":
|
if raise_if_closed and wfe and wfe.execution_status == "CLOSED":
|
||||||
wfe = None
|
wfe = None
|
||||||
if run_id:
|
if not wfe and raise_if_none:
|
||||||
if not wfe or wfe.run_id != run_id:
|
if run_id:
|
||||||
raise SWFUnknownResourceFault(
|
args = ["execution", "WorkflowExecution=[workflowId={}, runId={}]".format(
|
||||||
"execution",
|
workflow_id, run_id)]
|
||||||
"WorkflowExecution=[workflowId={}, runId={}]".format(
|
else:
|
||||||
workflow_id, run_id
|
args = ["execution, workflowId = {}".format(workflow_id)]
|
||||||
)
|
raise SWFUnknownResourceFault(*args)
|
||||||
)
|
# at last return workflow execution
|
||||||
elif not wfe:
|
|
||||||
raise SWFUnknownResourceFault(
|
|
||||||
"execution, workflowId = {}".format(workflow_id)
|
|
||||||
)
|
|
||||||
return wfe
|
return wfe
|
||||||
|
|
||||||
def add_to_activity_task_list(self, task_list, obj):
|
def add_to_activity_task_list(self, task_list, obj):
|
||||||
|
@ -97,3 +97,6 @@ def test_domain_get_workflow_execution():
|
|||||||
SWFUnknownResourceFault,
|
SWFUnknownResourceFault,
|
||||||
"Unknown execution: WorkflowExecution=[workflowId=wf-id-3, runId=run-id-4]"
|
"Unknown execution: WorkflowExecution=[workflowId=wf-id-3, runId=run-id-4]"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# raise_if_none attribute
|
||||||
|
domain.get_workflow_execution("foo", raise_if_none=False).should.be.none
|
||||||
|
Loading…
x
Reference in New Issue
Block a user