diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index 0c8202bda..4b55b8cbd 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -149,29 +149,65 @@ class WorkflowExecution(object): return hsh def _process_timeouts(self): - self.should_schedule_decision_next = False + """ + SWF timeouts can happen on different objects (workflow executions, + activity tasks, decision tasks) and should be processed in order. + + A specific timeout can change the workflow execution state and have an + impact on other timeouts: for instance, if the workflow execution + timeouts, subsequent timeouts on activity or decision tasks are + irrelevant ; if an activity task timeouts, other timeouts on this task + are irrelevant, and a new decision is fired, which could well timeout + before the end of the workflow. + + So the idea here is to find the earliest timeout that would have been + triggered, process it, then make the workflow state progress and repeat + the whole process. + """ + timeout_candidates = [] # workflow execution timeout - _timeout = self.first_timeout() - if _timeout: - self.timeout(_timeout) + timeout_candidates.append(self.first_timeout()) # decision tasks timeouts for task in self.decision_tasks: - _timeout = task.first_timeout() - if _timeout: - self.timeout_decision_task(_timeout) + timeout_candidates.append(task.first_timeout()) # activity tasks timeouts for task in self.activity_tasks: - _timeout = task.first_timeout() - if _timeout: - self.timeout_activity_task(_timeout) + timeout_candidates.append(task.first_timeout()) - # schedule decision task if needed - # TODO: make decision appear as if it has been scheduled immediately after the timeout - if self.should_schedule_decision_next: - self.schedule_decision_task() + # remove blank values (foo.first_timeout() is a Timeout or None) + timeout_candidates = filter(None, timeout_candidates) + + # now find the first timeout to process + first_timeout = None + if timeout_candidates: + first_timeout = min( + timeout_candidates, + key=lambda t: t.timestamp + ) + + if first_timeout: + should_schedule_decision_next = False + if isinstance(first_timeout.obj, WorkflowExecution): + self.timeout(first_timeout) + elif isinstance(first_timeout.obj, DecisionTask): + self.timeout_decision_task(first_timeout) + should_schedule_decision_next = True + elif isinstance(first_timeout.obj, ActivityTask): + self.timeout_activity_task(first_timeout) + should_schedule_decision_next = True + else: + raise NotImplementedError("Unhandled timeout object") + + # schedule decision task if needed + if should_schedule_decision_next: + self.schedule_decision_task() + + # the workflow execution progressed, let's see if another + # timeout should be processed + self._process_timeouts() def events(self, reverse_order=False): if reverse_order: @@ -196,7 +232,7 @@ class WorkflowExecution(object): ) self.schedule_decision_task() - def schedule_decision_task(self): + def _schedule_decision_task(self): evt = self._add_event( "DecisionTaskScheduled", workflow_execution=self, @@ -207,6 +243,15 @@ class WorkflowExecution(object): ) self.open_counts["openDecisionTasks"] += 1 + def schedule_decision_task(self): + self._schedule_decision_task() + + # Shortcut for tests: helps having auto-starting decision tasks when needed + def schedule_and_start_decision_task(self, identity=None): + self._schedule_decision_task() + decision_task = self.decision_tasks[-1] + self.start_decision_task(decision_task.task_token, identity=identity) + @property def decision_tasks(self): return [t for t in self.domain.decision_tasks @@ -528,7 +573,6 @@ class WorkflowExecution(object): ) def timeout_decision_task(self, _timeout): - self.should_schedule_decision_next = True task = _timeout.obj task.timeout(_timeout) self._add_event( @@ -540,7 +584,6 @@ class WorkflowExecution(object): ) def timeout_activity_task(self, _timeout): - self.should_schedule_decision_next = True task = _timeout.obj task.timeout(_timeout) self._add_event( diff --git a/tests/test_swf/models/test_workflow_execution.py b/tests/test_swf/models/test_workflow_execution.py index e533f925b..8a010eb42 100644 --- a/tests/test_swf/models/test_workflow_execution.py +++ b/tests/test_swf/models/test_workflow_execution.py @@ -12,6 +12,7 @@ from moto.swf.exceptions import ( ) from ..utils import ( + auto_start_decision_tasks, get_basic_domain, get_basic_workflow_type, make_workflow_execution, @@ -407,3 +408,36 @@ def test_first_timeout(): with freeze_time("2015-01-01 14:01"): # 2 hours timeout reached wfe.first_timeout().should.be.a(Timeout) + +# See moto/swf/models/workflow_execution.py "_process_timeouts()" for more details +def test_timeouts_are_processed_in_order_and_reevaluated(): + # Let's make a Workflow Execution with the following properties: + # - execution start to close timeout of 8 mins + # - (decision) task start to close timeout of 5 mins + # + # Now start the workflow execution, and look at the history 15 mins later: + # - a first decision task is fired just after workflow execution start + # - the first decision task should have timed out after 5 mins + # - that fires a new decision task (which we hack to start automatically) + # - then the workflow timeouts after 8 mins (shows gradual reevaluation) + # - but the last scheduled decision task should *not* timeout (workflow closed) + with freeze_time("2015-01-01 12:00:00"): + wfe = make_workflow_execution( + execution_start_to_close_timeout=8*60, + task_start_to_close_timeout=5*60, + ) + # decision will automatically start + wfe = auto_start_decision_tasks(wfe) + wfe.start() + event_idx = len(wfe.events()) + + with freeze_time("2015-01-01 12:08:00"): + wfe._process_timeouts() + + event_types = [e.event_type for e in wfe.events()[event_idx:]] + event_types.should.equal([ + "DecisionTaskTimedOut", + "DecisionTaskScheduled", + "DecisionTaskStarted", + "WorkflowExecutionTimedOut", + ]) diff --git a/tests/test_swf/utils.py b/tests/test_swf/utils.py index d98294ea2..352118340 100644 --- a/tests/test_swf/utils.py +++ b/tests/test_swf/utils.py @@ -66,6 +66,12 @@ def make_workflow_execution(**kwargs): return WorkflowExecution(domain, wft, "ab1234", **kwargs) +# Makes decision tasks start automatically on a given workflow +def auto_start_decision_tasks(wfe): + wfe.schedule_decision_task = wfe.schedule_and_start_decision_task + return wfe + + # Setup a complete example workflow and return the connection object @mock_swf def setup_workflow():