Simplify decision task handling in SWF backend
This commit is contained in:
parent
a0e484fa6d
commit
d650f71d9c
@ -180,16 +180,16 @@ class SWFBackend(BaseBackend):
|
|||||||
# aren't distributed.
|
# aren't distributed.
|
||||||
#
|
#
|
||||||
# TODO: handle long polling (case 2) for decision tasks
|
# TODO: handle long polling (case 2) for decision tasks
|
||||||
decision_candidates = []
|
candidates = []
|
||||||
for _, wfe in domain.workflow_executions.iteritems():
|
for _task_list, tasks in domain.decision_task_lists.iteritems():
|
||||||
if wfe.task_list == task_list:
|
if _task_list == task_list:
|
||||||
decision_candidates += wfe.scheduled_decision_tasks
|
candidates += filter(lambda t: t.state == "SCHEDULED", tasks)
|
||||||
if any(decision_candidates):
|
if any(candidates):
|
||||||
# TODO: handle task priorities (but not supported by boto for now)
|
# TODO: handle task priorities (but not supported by boto for now)
|
||||||
decision = min(decision_candidates, key=lambda d: d.scheduled_at)
|
task = min(candidates, key=lambda d: d.scheduled_at)
|
||||||
wfe = decision.workflow_execution
|
wfe = task.workflow_execution
|
||||||
wfe.start_decision_task(decision.task_token, identity=identity)
|
wfe.start_decision_task(task.task_token, identity=identity)
|
||||||
return decision
|
return task
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -182,13 +182,6 @@ class WorkflowExecution(object):
|
|||||||
self.domain.decision_tasks
|
self.domain.decision_tasks
|
||||||
)
|
)
|
||||||
|
|
||||||
@property
|
|
||||||
def scheduled_decision_tasks(self):
|
|
||||||
return filter(
|
|
||||||
lambda t: t.state == "SCHEDULED",
|
|
||||||
self.decision_tasks
|
|
||||||
)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def activity_tasks(self):
|
def activity_tasks(self):
|
||||||
return filter(
|
return filter(
|
||||||
|
Loading…
Reference in New Issue
Block a user