SWF: Schedule a single task if another is started (#5671)
This commit is contained in:
parent
e410347520
commit
fe64a02851
@ -263,6 +263,18 @@ class WorkflowExecution(BaseModel):
|
|||||||
self.schedule_decision_task()
|
self.schedule_decision_task()
|
||||||
|
|
||||||
def _schedule_decision_task(self):
|
def _schedule_decision_task(self):
|
||||||
|
has_scheduled_task = False
|
||||||
|
has_started_task = False
|
||||||
|
for task in self.decision_tasks:
|
||||||
|
if task.state == "STARTED":
|
||||||
|
has_started_task = True
|
||||||
|
elif task.state == "SCHEDULED":
|
||||||
|
has_scheduled_task = True
|
||||||
|
# If a decision task is already running, we cannot schedule more than one additional task
|
||||||
|
# See https://docs.aws.amazon.com/amazonswf/latest/developerguide/swf-dg-dev-deciders.html#swf-dg-deciders-launch
|
||||||
|
if has_started_task and has_scheduled_task:
|
||||||
|
return
|
||||||
|
|
||||||
evt = self._add_event(
|
evt = self._add_event(
|
||||||
"DecisionTaskScheduled",
|
"DecisionTaskScheduled",
|
||||||
start_to_close_timeout=self.task_start_to_close_timeout,
|
start_to_close_timeout=self.task_start_to_close_timeout,
|
||||||
|
@ -220,6 +220,33 @@ def test_workflow_execution_schedule_decision_task():
|
|||||||
wfe.open_counts["openDecisionTasks"].should.equal(1)
|
wfe.open_counts["openDecisionTasks"].should.equal(1)
|
||||||
|
|
||||||
|
|
||||||
|
def test_workflow_execution_dont_schedule_decision_if_existing_started_and_other_scheduled():
|
||||||
|
wfe = make_workflow_execution()
|
||||||
|
wfe.open_counts["openDecisionTasks"].should.equal(0)
|
||||||
|
|
||||||
|
wfe.schedule_decision_task()
|
||||||
|
wfe.open_counts["openDecisionTasks"].should.equal(1)
|
||||||
|
|
||||||
|
wfe.decision_tasks[0].start("evt_id")
|
||||||
|
|
||||||
|
wfe.schedule_decision_task()
|
||||||
|
wfe.schedule_decision_task()
|
||||||
|
wfe.open_counts["openDecisionTasks"].should.equal(2)
|
||||||
|
|
||||||
|
|
||||||
|
def test_workflow_execution_schedule_decision_if_existing_started_and_no_other_scheduled():
|
||||||
|
wfe = make_workflow_execution()
|
||||||
|
wfe.open_counts["openDecisionTasks"].should.equal(0)
|
||||||
|
|
||||||
|
wfe.schedule_decision_task()
|
||||||
|
wfe.open_counts["openDecisionTasks"].should.equal(1)
|
||||||
|
|
||||||
|
wfe.decision_tasks[0].start("evt_id")
|
||||||
|
|
||||||
|
wfe.schedule_decision_task()
|
||||||
|
wfe.open_counts["openDecisionTasks"].should.equal(2)
|
||||||
|
|
||||||
|
|
||||||
def test_workflow_execution_start_decision_task():
|
def test_workflow_execution_start_decision_task():
|
||||||
wfe = make_workflow_execution()
|
wfe = make_workflow_execution()
|
||||||
wfe.schedule_decision_task()
|
wfe.schedule_decision_task()
|
||||||
|
Loading…
Reference in New Issue
Block a user