Improve SWF timeouts processing: now processed in order, one by one
This commit is contained in:
parent
7f2cbb79b0
commit
248975d4e6
@ -149,30 +149,66 @@ class WorkflowExecution(object):
|
|||||||
return hsh
|
return hsh
|
||||||
|
|
||||||
def _process_timeouts(self):
|
def _process_timeouts(self):
|
||||||
self.should_schedule_decision_next = False
|
"""
|
||||||
|
SWF timeouts can happen on different objects (workflow executions,
|
||||||
|
activity tasks, decision tasks) and should be processed in order.
|
||||||
|
|
||||||
|
A specific timeout can change the workflow execution state and have an
|
||||||
|
impact on other timeouts: for instance, if the workflow execution
|
||||||
|
timeouts, subsequent timeouts on activity or decision tasks are
|
||||||
|
irrelevant ; if an activity task timeouts, other timeouts on this task
|
||||||
|
are irrelevant, and a new decision is fired, which could well timeout
|
||||||
|
before the end of the workflow.
|
||||||
|
|
||||||
|
So the idea here is to find the earliest timeout that would have been
|
||||||
|
triggered, process it, then make the workflow state progress and repeat
|
||||||
|
the whole process.
|
||||||
|
"""
|
||||||
|
timeout_candidates = []
|
||||||
|
|
||||||
# workflow execution timeout
|
# workflow execution timeout
|
||||||
_timeout = self.first_timeout()
|
timeout_candidates.append(self.first_timeout())
|
||||||
if _timeout:
|
|
||||||
self.timeout(_timeout)
|
|
||||||
|
|
||||||
# decision tasks timeouts
|
# decision tasks timeouts
|
||||||
for task in self.decision_tasks:
|
for task in self.decision_tasks:
|
||||||
_timeout = task.first_timeout()
|
timeout_candidates.append(task.first_timeout())
|
||||||
if _timeout:
|
|
||||||
self.timeout_decision_task(_timeout)
|
|
||||||
|
|
||||||
# activity tasks timeouts
|
# activity tasks timeouts
|
||||||
for task in self.activity_tasks:
|
for task in self.activity_tasks:
|
||||||
_timeout = task.first_timeout()
|
timeout_candidates.append(task.first_timeout())
|
||||||
if _timeout:
|
|
||||||
self.timeout_activity_task(_timeout)
|
# remove blank values (foo.first_timeout() is a Timeout or None)
|
||||||
|
timeout_candidates = filter(None, timeout_candidates)
|
||||||
|
|
||||||
|
# now find the first timeout to process
|
||||||
|
first_timeout = None
|
||||||
|
if timeout_candidates:
|
||||||
|
first_timeout = min(
|
||||||
|
timeout_candidates,
|
||||||
|
key=lambda t: t.timestamp
|
||||||
|
)
|
||||||
|
|
||||||
|
if first_timeout:
|
||||||
|
should_schedule_decision_next = False
|
||||||
|
if isinstance(first_timeout.obj, WorkflowExecution):
|
||||||
|
self.timeout(first_timeout)
|
||||||
|
elif isinstance(first_timeout.obj, DecisionTask):
|
||||||
|
self.timeout_decision_task(first_timeout)
|
||||||
|
should_schedule_decision_next = True
|
||||||
|
elif isinstance(first_timeout.obj, ActivityTask):
|
||||||
|
self.timeout_activity_task(first_timeout)
|
||||||
|
should_schedule_decision_next = True
|
||||||
|
else:
|
||||||
|
raise NotImplementedError("Unhandled timeout object")
|
||||||
|
|
||||||
# schedule decision task if needed
|
# schedule decision task if needed
|
||||||
# TODO: make decision appear as if it has been scheduled immediately after the timeout
|
if should_schedule_decision_next:
|
||||||
if self.should_schedule_decision_next:
|
|
||||||
self.schedule_decision_task()
|
self.schedule_decision_task()
|
||||||
|
|
||||||
|
# the workflow execution progressed, let's see if another
|
||||||
|
# timeout should be processed
|
||||||
|
self._process_timeouts()
|
||||||
|
|
||||||
def events(self, reverse_order=False):
|
def events(self, reverse_order=False):
|
||||||
if reverse_order:
|
if reverse_order:
|
||||||
return reversed(self._events)
|
return reversed(self._events)
|
||||||
@ -196,7 +232,7 @@ class WorkflowExecution(object):
|
|||||||
)
|
)
|
||||||
self.schedule_decision_task()
|
self.schedule_decision_task()
|
||||||
|
|
||||||
def schedule_decision_task(self):
|
def _schedule_decision_task(self):
|
||||||
evt = self._add_event(
|
evt = self._add_event(
|
||||||
"DecisionTaskScheduled",
|
"DecisionTaskScheduled",
|
||||||
workflow_execution=self,
|
workflow_execution=self,
|
||||||
@ -207,6 +243,15 @@ class WorkflowExecution(object):
|
|||||||
)
|
)
|
||||||
self.open_counts["openDecisionTasks"] += 1
|
self.open_counts["openDecisionTasks"] += 1
|
||||||
|
|
||||||
|
def schedule_decision_task(self):
|
||||||
|
self._schedule_decision_task()
|
||||||
|
|
||||||
|
# Shortcut for tests: helps having auto-starting decision tasks when needed
|
||||||
|
def schedule_and_start_decision_task(self, identity=None):
|
||||||
|
self._schedule_decision_task()
|
||||||
|
decision_task = self.decision_tasks[-1]
|
||||||
|
self.start_decision_task(decision_task.task_token, identity=identity)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def decision_tasks(self):
|
def decision_tasks(self):
|
||||||
return [t for t in self.domain.decision_tasks
|
return [t for t in self.domain.decision_tasks
|
||||||
@ -528,7 +573,6 @@ class WorkflowExecution(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def timeout_decision_task(self, _timeout):
|
def timeout_decision_task(self, _timeout):
|
||||||
self.should_schedule_decision_next = True
|
|
||||||
task = _timeout.obj
|
task = _timeout.obj
|
||||||
task.timeout(_timeout)
|
task.timeout(_timeout)
|
||||||
self._add_event(
|
self._add_event(
|
||||||
@ -540,7 +584,6 @@ class WorkflowExecution(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def timeout_activity_task(self, _timeout):
|
def timeout_activity_task(self, _timeout):
|
||||||
self.should_schedule_decision_next = True
|
|
||||||
task = _timeout.obj
|
task = _timeout.obj
|
||||||
task.timeout(_timeout)
|
task.timeout(_timeout)
|
||||||
self._add_event(
|
self._add_event(
|
||||||
|
@ -12,6 +12,7 @@ from moto.swf.exceptions import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
from ..utils import (
|
from ..utils import (
|
||||||
|
auto_start_decision_tasks,
|
||||||
get_basic_domain,
|
get_basic_domain,
|
||||||
get_basic_workflow_type,
|
get_basic_workflow_type,
|
||||||
make_workflow_execution,
|
make_workflow_execution,
|
||||||
@ -407,3 +408,36 @@ def test_first_timeout():
|
|||||||
with freeze_time("2015-01-01 14:01"):
|
with freeze_time("2015-01-01 14:01"):
|
||||||
# 2 hours timeout reached
|
# 2 hours timeout reached
|
||||||
wfe.first_timeout().should.be.a(Timeout)
|
wfe.first_timeout().should.be.a(Timeout)
|
||||||
|
|
||||||
|
# See moto/swf/models/workflow_execution.py "_process_timeouts()" for more details
|
||||||
|
def test_timeouts_are_processed_in_order_and_reevaluated():
|
||||||
|
# Let's make a Workflow Execution with the following properties:
|
||||||
|
# - execution start to close timeout of 8 mins
|
||||||
|
# - (decision) task start to close timeout of 5 mins
|
||||||
|
#
|
||||||
|
# Now start the workflow execution, and look at the history 15 mins later:
|
||||||
|
# - a first decision task is fired just after workflow execution start
|
||||||
|
# - the first decision task should have timed out after 5 mins
|
||||||
|
# - that fires a new decision task (which we hack to start automatically)
|
||||||
|
# - then the workflow timeouts after 8 mins (shows gradual reevaluation)
|
||||||
|
# - but the last scheduled decision task should *not* timeout (workflow closed)
|
||||||
|
with freeze_time("2015-01-01 12:00:00"):
|
||||||
|
wfe = make_workflow_execution(
|
||||||
|
execution_start_to_close_timeout=8*60,
|
||||||
|
task_start_to_close_timeout=5*60,
|
||||||
|
)
|
||||||
|
# decision will automatically start
|
||||||
|
wfe = auto_start_decision_tasks(wfe)
|
||||||
|
wfe.start()
|
||||||
|
event_idx = len(wfe.events())
|
||||||
|
|
||||||
|
with freeze_time("2015-01-01 12:08:00"):
|
||||||
|
wfe._process_timeouts()
|
||||||
|
|
||||||
|
event_types = [e.event_type for e in wfe.events()[event_idx:]]
|
||||||
|
event_types.should.equal([
|
||||||
|
"DecisionTaskTimedOut",
|
||||||
|
"DecisionTaskScheduled",
|
||||||
|
"DecisionTaskStarted",
|
||||||
|
"WorkflowExecutionTimedOut",
|
||||||
|
])
|
||||||
|
@ -66,6 +66,12 @@ def make_workflow_execution(**kwargs):
|
|||||||
return WorkflowExecution(domain, wft, "ab1234", **kwargs)
|
return WorkflowExecution(domain, wft, "ab1234", **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
# Makes decision tasks start automatically on a given workflow
|
||||||
|
def auto_start_decision_tasks(wfe):
|
||||||
|
wfe.schedule_decision_task = wfe.schedule_and_start_decision_task
|
||||||
|
return wfe
|
||||||
|
|
||||||
|
|
||||||
# Setup a complete example workflow and return the connection object
|
# Setup a complete example workflow and return the connection object
|
||||||
@mock_swf
|
@mock_swf
|
||||||
def setup_workflow():
|
def setup_workflow():
|
||||||
|
Loading…
x
Reference in New Issue
Block a user