diff --git a/moto/swf/models/__init__.py b/moto/swf/models/__init__.py index 88e362f4e..f4fe246f8 100644 --- a/moto/swf/models/__init__.py +++ b/moto/swf/models/__init__.py @@ -20,6 +20,7 @@ from .decision_task import DecisionTask from .domain import Domain from .generic_type import GenericType from .history_event import HistoryEvent +from .timeout import Timeout from .workflow_type import WorkflowType from .workflow_execution import WorkflowExecution diff --git a/moto/swf/models/activity_task.py b/moto/swf/models/activity_task.py index 1f011cb8d..76d0eac70 100644 --- a/moto/swf/models/activity_task.py +++ b/moto/swf/models/activity_task.py @@ -5,6 +5,8 @@ import uuid from ..exceptions import SWFWorkflowExecutionClosedError from ..utils import now_timestamp +from .timeout import Timeout + class ActivityTask(object): def __init__(self, activity_id, activity_type, scheduled_event_id, @@ -60,19 +62,23 @@ class ActivityTask(object): def reset_heartbeat_clock(self): self.last_heartbeat_timestamp = now_timestamp() - def has_timedout(self): + def first_timeout(self): if not self.workflow_execution.open: - return False + return None # TODO: handle the "NONE" case heartbeat_timeout_at = self.last_heartbeat_timestamp + \ int(self.timeouts["heartbeatTimeout"]) - return heartbeat_timeout_at < now_timestamp() + _timeout = Timeout(self, heartbeat_timeout_at, "HEARTBEAT") + if _timeout.reached: + return _timeout + def process_timeouts(self): - if self.has_timedout(): - self.timeout() + _timeout = self.first_timeout() + if _timeout: + self.timeout(_timeout) - def timeout(self): + def timeout(self, _timeout): self._check_workflow_execution_open() self.state = "TIMED_OUT" - self.timeout_type = "HEARTBEAT" + self.timeout_type = _timeout.kind diff --git a/moto/swf/models/decision_task.py b/moto/swf/models/decision_task.py index 23822976c..fb7b9d080 100644 --- a/moto/swf/models/decision_task.py +++ b/moto/swf/models/decision_task.py @@ -5,6 +5,8 @@ import uuid from ..exceptions import SWFWorkflowExecutionClosedError from ..utils import now_timestamp +from .timeout import Timeout + class DecisionTask(object): def __init__(self, workflow_execution, scheduled_event_id): @@ -50,19 +52,21 @@ class DecisionTask(object): self._check_workflow_execution_open() self.state = "COMPLETED" - def has_timedout(self): + def first_timeout(self): if self.state != "STARTED" or not self.workflow_execution.open: - return False + return None # TODO: handle the "NONE" case - start_to_close_timeout = self.started_timestamp + \ - int(self.start_to_close_timeout) - return start_to_close_timeout < now_timestamp() + start_to_close_at = self.started_timestamp + int(self.start_to_close_timeout) + _timeout = Timeout(self, start_to_close_at, "START_TO_CLOSE") + if _timeout.reached: + return _timeout def process_timeouts(self): - if self.has_timedout(): - self.timeout() + _timeout = self.first_timeout() + if _timeout: + self.timeout(_timeout) - def timeout(self): + def timeout(self, _timeout): self._check_workflow_execution_open() self.state = "TIMED_OUT" - self.timeout_type = "START_TO_CLOSE" + self.timeout_type = _timeout.kind diff --git a/moto/swf/models/timeout.py b/moto/swf/models/timeout.py new file mode 100644 index 000000000..66cb3b84c --- /dev/null +++ b/moto/swf/models/timeout.py @@ -0,0 +1,12 @@ +from ..utils import now_timestamp + + +class Timeout(object): + def __init__(self, obj, timestamp, kind): + self.obj = obj + self.timestamp = timestamp + self.kind = kind + + @property + def reached(self): + return now_timestamp() >= self.timestamp diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index aee678a1d..2c1e74a02 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -18,6 +18,7 @@ from .activity_task import ActivityTask from .activity_type import ActivityType from .decision_task import DecisionTask from .history_event import HistoryEvent +from .timeout import Timeout # TODO: extract decision related logic into a Decision class @@ -151,8 +152,9 @@ class WorkflowExecution(object): self.should_schedule_decision_next = False # workflow execution timeout - if self.has_timedout(): - self.process_timeouts() + _timeout = self.first_timeout() + if _timeout: + self.execute_timeout(_timeout) # TODO: process child policy on child workflows here or in process_timeouts() self._add_event( "WorkflowExecutionTimedOut", @@ -162,7 +164,7 @@ class WorkflowExecution(object): # decision tasks timeouts for task in self.decision_tasks: - if task.state == "STARTED" and task.has_timedout(): + if task.state == "STARTED" and task.first_timeout(): self.should_schedule_decision_next = True task.process_timeouts() self._add_event( @@ -174,7 +176,7 @@ class WorkflowExecution(object): # activity tasks timeouts for task in self.activity_tasks: - if task.open and task.has_timedout(): + if task.open and task.first_timeout(): self.should_schedule_decision_next = True task.process_timeouts() self._add_event( @@ -522,19 +524,23 @@ class WorkflowExecution(object): self.close_status = "TERMINATED" self.close_cause = "OPERATOR_INITIATED" - def has_timedout(self): + def first_timeout(self): if not self.open or not self.start_timestamp: - return False - # TODO: handle the "NONE" case - start_to_close_timeout = self.start_timestamp + \ - int(self.execution_start_to_close_timeout) - return start_to_close_timeout < now_timestamp() + return None + start_to_close_at = self.start_timestamp + int(self.execution_start_to_close_timeout) + _timeout = Timeout(self, start_to_close_at, "START_TO_CLOSE") + if _timeout.reached: + return _timeout + + def execute_timeout(self, timeout): + self.execution_status = "CLOSED" + self.close_status = "TIMED_OUT" + self.timeout_type = timeout.kind def process_timeouts(self): - if self.has_timedout(): - self.execution_status = "CLOSED" - self.close_status = "TIMED_OUT" - self.timeout_type = "START_TO_CLOSE" + _timeout = self.first_timeout() + if _timeout: + self.execute_timeout(_timeout) @property def open(self): diff --git a/tests/test_swf/models/test_activity_task.py b/tests/test_swf/models/test_activity_task.py index 8b81cbdd5..ef4823cc7 100644 --- a/tests/test_swf/models/test_activity_task.py +++ b/tests/test_swf/models/test_activity_task.py @@ -5,6 +5,7 @@ from moto.swf.exceptions import SWFWorkflowExecutionClosedError from moto.swf.models import ( ActivityTask, ActivityType, + Timeout, ) from ..utils import make_workflow_execution, ACTIVITY_TASK_TIMEOUTS @@ -83,7 +84,7 @@ def test_activity_task_reset_heartbeat_clock(): task.last_heartbeat_timestamp.should.equal(1420117200.0) -def test_activity_task_has_timedout(): +def test_activity_task_first_timeout(): wfe = make_workflow_execution() with freeze_time("2015-01-01 12:00:00"): @@ -95,11 +96,11 @@ def test_activity_task_has_timedout(): timeouts=ACTIVITY_TASK_TIMEOUTS, workflow_execution=wfe, ) - task.has_timedout().should.equal(False) + task.first_timeout().should.be.none # activity task timeout is 300s == 5mins with freeze_time("2015-01-01 12:06:00"): - task.has_timedout().should.equal(True) + task.first_timeout().should.be.a(Timeout) task.process_timeouts() task.state.should.equal("TIMED_OUT") task.timeout_type.should.equal("HEARTBEAT") @@ -120,10 +121,10 @@ def test_activity_task_cannot_timeout_on_closed_workflow_execution(): ) with freeze_time("2015-01-01 14:10:00"): - task.has_timedout().should.equal(True) - wfe.has_timedout().should.equal(True) + task.first_timeout().should.be.a(Timeout) + wfe.first_timeout().should.be.a(Timeout) wfe.process_timeouts() - task.has_timedout().should.equal(False) + task.first_timeout().should.be.none def test_activity_task_cannot_change_state_on_closed_workflow_execution(): wfe = make_workflow_execution() @@ -139,6 +140,6 @@ def test_activity_task_cannot_change_state_on_closed_workflow_execution(): ) wfe.complete(123) - task.timeout.when.called_with().should.throw(SWFWorkflowExecutionClosedError) + task.timeout.when.called_with(Timeout(task, 0, "foo")).should.throw(SWFWorkflowExecutionClosedError) task.complete.when.called_with().should.throw(SWFWorkflowExecutionClosedError) task.fail.when.called_with().should.throw(SWFWorkflowExecutionClosedError) diff --git a/tests/test_swf/models/test_decision_task.py b/tests/test_swf/models/test_decision_task.py index ae2b59fdc..f85b83ebd 100644 --- a/tests/test_swf/models/test_decision_task.py +++ b/tests/test_swf/models/test_decision_task.py @@ -2,7 +2,7 @@ from freezegun import freeze_time from sure import expect from moto.swf.exceptions import SWFWorkflowExecutionClosedError -from moto.swf.models import DecisionTask +from moto.swf.models import DecisionTask, Timeout from ..utils import make_workflow_execution @@ -32,21 +32,21 @@ def test_decision_task_full_dict_representation(): fd = dt.to_full_dict() fd["startedEventId"].should.equal(1234) -def test_decision_task_has_timedout(): +def test_decision_task_first_timeout(): wfe = make_workflow_execution() dt = DecisionTask(wfe, 123) - dt.has_timedout().should.equal(False) + dt.first_timeout().should.be.none with freeze_time("2015-01-01 12:00:00"): dt.start(1234) - dt.has_timedout().should.equal(False) + dt.first_timeout().should.be.none # activity task timeout is 300s == 5mins with freeze_time("2015-01-01 12:06:00"): - dt.has_timedout().should.equal(True) + dt.first_timeout().should.be.a(Timeout) dt.complete() - dt.has_timedout().should.equal(False) + dt.first_timeout().should.be.none def test_decision_task_cannot_timeout_on_closed_workflow_execution(): with freeze_time("2015-01-01 12:00:00"): @@ -58,10 +58,10 @@ def test_decision_task_cannot_timeout_on_closed_workflow_execution(): dt.start(1234) with freeze_time("2015-01-01 14:10:00"): - dt.has_timedout().should.equal(True) - wfe.has_timedout().should.equal(True) + dt.first_timeout().should.be.a(Timeout) + wfe.first_timeout().should.be.a(Timeout) wfe.process_timeouts() - dt.has_timedout().should.equal(False) + dt.first_timeout().should.be.none def test_decision_task_cannot_change_state_on_closed_workflow_execution(): wfe = make_workflow_execution() @@ -70,5 +70,5 @@ def test_decision_task_cannot_change_state_on_closed_workflow_execution(): wfe.complete(123) - task.timeout.when.called_with().should.throw(SWFWorkflowExecutionClosedError) + task.timeout.when.called_with(Timeout(task, 0, "foo")).should.throw(SWFWorkflowExecutionClosedError) task.complete.when.called_with().should.throw(SWFWorkflowExecutionClosedError) diff --git a/tests/test_swf/models/test_timeout.py b/tests/test_swf/models/test_timeout.py new file mode 100644 index 000000000..6ba26b1d2 --- /dev/null +++ b/tests/test_swf/models/test_timeout.py @@ -0,0 +1,18 @@ +from freezegun import freeze_time +from sure import expect + +from moto.swf.models import Timeout, WorkflowExecution + +from ..utils import make_workflow_execution + +def test_timeout_creation(): + wfe = make_workflow_execution() + + # epoch 1420113600 == "2015-01-01 13:00:00" + timeout = Timeout(wfe, 1420117200, "START_TO_CLOSE") + + with freeze_time("2015-01-01 12:00:00"): + timeout.reached.should.be.falsy + + with freeze_time("2015-01-01 13:00:00"): + timeout.reached.should.be.truthy diff --git a/tests/test_swf/models/test_workflow_execution.py b/tests/test_swf/models/test_workflow_execution.py index bc3585f78..e533f925b 100644 --- a/tests/test_swf/models/test_workflow_execution.py +++ b/tests/test_swf/models/test_workflow_execution.py @@ -3,6 +3,7 @@ from freezegun import freeze_time from moto.swf.models import ( ActivityType, + Timeout, WorkflowType, WorkflowExecution, ) @@ -106,7 +107,7 @@ def test_workflow_execution_medium_dict_representation(): md["workflowType"].should.equal(wf_type.to_short_dict()) md["startTimestamp"].should.be.a('float') md["executionStatus"].should.equal("OPEN") - md["cancelRequested"].should.equal(False) + md["cancelRequested"].should.be.falsy md.should_not.contain("tagList") wfe.tag_list = ["foo", "bar", "baz"] @@ -395,14 +396,14 @@ def test_terminate(): # take default child_policy if not provided (as here) last_event.child_policy.should.equal("ABANDON") -def test_has_timedout(): +def test_first_timeout(): wfe = make_workflow_execution() - wfe.has_timedout().should.equal(False) + wfe.first_timeout().should.be.none with freeze_time("2015-01-01 12:00:00"): wfe.start() - wfe.has_timedout().should.equal(False) + wfe.first_timeout().should.be.none with freeze_time("2015-01-01 14:01"): # 2 hours timeout reached - wfe.has_timedout().should.equal(True) + wfe.first_timeout().should.be.a(Timeout)