From 7f2cbb79b03987e0c35ff67431b6b2bb2c9d0ce8 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Barth Date: Mon, 9 Nov 2015 22:06:03 +0100 Subject: [PATCH] Refactor SWF workflow execution to ease next timeout change --- moto/swf/models/workflow_execution.py | 69 +++++++++++++++------------ 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index 9a54f1729..0c8202bda 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -155,42 +155,19 @@ class WorkflowExecution(object): _timeout = self.first_timeout() if _timeout: self.timeout(_timeout) - # TODO: process child policy on child workflows here or in timeout() - self._add_event( - "WorkflowExecutionTimedOut", - child_policy=self.child_policy, - event_timestamp=_timeout.timestamp, - timeout_type=self.timeout_type, - ) # decision tasks timeouts for task in self.decision_tasks: _timeout = task.first_timeout() if _timeout: - self.should_schedule_decision_next = True - task.timeout(_timeout) - self._add_event( - "DecisionTaskTimedOut", - event_timestamp=_timeout.timestamp, - scheduled_event_id=task.scheduled_event_id, - started_event_id=task.started_event_id, - timeout_type=task.timeout_type, - ) + self.timeout_decision_task(_timeout) # activity tasks timeouts for task in self.activity_tasks: _timeout = task.first_timeout() if _timeout: - self.should_schedule_decision_next = True - task.timeout(_timeout) - self._add_event( - "ActivityTaskTimedOut", - details=task.details, - event_timestamp=_timeout.timestamp, - scheduled_event_id=task.scheduled_event_id, - started_event_id=task.started_event_id, - timeout_type=task.timeout_type, - ) + self.timeout_activity_task(_timeout) + # schedule decision task if needed # TODO: make decision appear as if it has been scheduled immediately after the timeout if self.should_schedule_decision_next: @@ -376,7 +353,7 @@ class WorkflowExecution(object): self.execution_status = "CLOSED" self.close_status = "COMPLETED" self.close_timestamp = now_timestamp() - evt = self._add_event( + self._add_event( "WorkflowExecutionCompleted", decision_task_completed_event_id=event_id, result=result, @@ -387,7 +364,7 @@ class WorkflowExecution(object): self.execution_status = "CLOSED" self.close_status = "FAILED" self.close_timestamp = now_timestamp() - evt = self._add_event( + self._add_event( "WorkflowExecutionFailed", decision_task_completed_event_id=event_id, details=details, @@ -487,7 +464,7 @@ class WorkflowExecution(object): def complete_activity_task(self, task_token, result=None): task = self._find_activity_task(task_token) - evt = self._add_event( + self._add_event( "ActivityTaskCompleted", scheduled_event_id=task.scheduled_event_id, started_event_id=task.started_event_id, @@ -500,7 +477,7 @@ class WorkflowExecution(object): def fail_activity_task(self, task_token, reason=None, details=None): task = self._find_activity_task(task_token) - evt = self._add_event( + self._add_event( "ActivityTaskFailed", scheduled_event_id=task.scheduled_event_id, started_event_id=task.started_event_id, @@ -539,9 +516,41 @@ class WorkflowExecution(object): return _timeout def timeout(self, timeout): + # TODO: process child policy on child workflows here or in the triggering function self.execution_status = "CLOSED" self.close_status = "TIMED_OUT" self.timeout_type = timeout.kind + self._add_event( + "WorkflowExecutionTimedOut", + child_policy=self.child_policy, + event_timestamp=timeout.timestamp, + timeout_type=self.timeout_type, + ) + + def timeout_decision_task(self, _timeout): + self.should_schedule_decision_next = True + task = _timeout.obj + task.timeout(_timeout) + self._add_event( + "DecisionTaskTimedOut", + event_timestamp=_timeout.timestamp, + scheduled_event_id=task.scheduled_event_id, + started_event_id=task.started_event_id, + timeout_type=task.timeout_type, + ) + + def timeout_activity_task(self, _timeout): + self.should_schedule_decision_next = True + task = _timeout.obj + task.timeout(_timeout) + self._add_event( + "ActivityTaskTimedOut", + details=task.details, + event_timestamp=_timeout.timestamp, + scheduled_event_id=task.scheduled_event_id, + started_event_id=task.started_event_id, + timeout_type=task.timeout_type, + ) @property def open(self):