From 65c95ab5bcf339af24f52ae0d03a7bcfa65553fd Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Barth Date: Wed, 4 Nov 2015 22:35:45 +0100 Subject: [PATCH] Ensure activity and decision tasks cannot timeout on a closed workflow --- moto/swf/models/activity_task.py | 2 ++ moto/swf/models/decision_task.py | 2 +- tests/test_swf/models/test_activity_task.py | 21 +++++++++++++++++++++ tests/test_swf/models/test_decision_task.py | 16 +++++++++++++++- tests/test_swf/models/test_domain.py | 10 +++++----- 5 files changed, 44 insertions(+), 7 deletions(-) diff --git a/moto/swf/models/activity_task.py b/moto/swf/models/activity_task.py index ccddb0ba6..91f6f0b21 100644 --- a/moto/swf/models/activity_task.py +++ b/moto/swf/models/activity_task.py @@ -54,6 +54,8 @@ class ActivityTask(object): self.last_heartbeat_timestamp = now_timestamp() def has_timedout(self): + if not self.workflow_execution.open: + return False # TODO: handle the "NONE" case heartbeat_timeout_at = self.last_heartbeat_timestamp + \ int(self.timeouts["heartbeatTimeout"]) diff --git a/moto/swf/models/decision_task.py b/moto/swf/models/decision_task.py index dfc8f5687..d024ae118 100644 --- a/moto/swf/models/decision_task.py +++ b/moto/swf/models/decision_task.py @@ -45,7 +45,7 @@ class DecisionTask(object): self.state = "COMPLETED" def has_timedout(self): - if self.state != "STARTED": + if self.state != "STARTED" or not self.workflow_execution.open: return False # TODO: handle the "NONE" case start_to_close_timeout = self.started_timestamp + \ diff --git a/tests/test_swf/models/test_activity_task.py b/tests/test_swf/models/test_activity_task.py index 4057c947c..f9f0e2ef7 100644 --- a/tests/test_swf/models/test_activity_task.py +++ b/tests/test_swf/models/test_activity_task.py @@ -102,3 +102,24 @@ def test_activity_task_has_timedout(): task.process_timeouts() task.state.should.equal("TIMED_OUT") task.timeout_type.should.equal("HEARTBEAT") + +def test_activity_task_cannot_timeout_on_closed_workflow_execution(): + with freeze_time("2015-01-01 12:00:00"): + wfe = make_workflow_execution() + wfe.start() + + with freeze_time("2015-01-01 13:58:00"): + task = ActivityTask( + activity_id="my-activity-123", + activity_type="foo", + input="optional", + scheduled_event_id=117, + timeouts=ACTIVITY_TASK_TIMEOUTS, + workflow_execution=wfe, + ) + + with freeze_time("2015-01-01 14:10:00"): + task.has_timedout().should.equal(True) + wfe.has_timedout().should.equal(True) + wfe.process_timeouts() + task.has_timedout().should.equal(False) diff --git a/tests/test_swf/models/test_decision_task.py b/tests/test_swf/models/test_decision_task.py index 64268b380..f0efb94c0 100644 --- a/tests/test_swf/models/test_decision_task.py +++ b/tests/test_swf/models/test_decision_task.py @@ -33,7 +33,6 @@ def test_decision_task_full_dict_representation(): def test_decision_task_has_timedout(): wfe = make_workflow_execution() - wft = wfe.workflow_type dt = DecisionTask(wfe, 123) dt.has_timedout().should.equal(False) @@ -47,3 +46,18 @@ def test_decision_task_has_timedout(): dt.complete() dt.has_timedout().should.equal(False) + +def test_decision_task_cannot_timeout_on_closed_workflow_execution(): + with freeze_time("2015-01-01 12:00:00"): + wfe = make_workflow_execution() + wfe.start() + + with freeze_time("2015-01-01 13:55:00"): + dt = DecisionTask(wfe, 123) + dt.start(1234) + + with freeze_time("2015-01-01 14:10:00"): + dt.has_timedout().should.equal(True) + wfe.has_timedout().should.equal(True) + wfe.process_timeouts() + dt.has_timedout().should.equal(False) diff --git a/tests/test_swf/models/test_domain.py b/tests/test_swf/models/test_domain.py index 6430bc1ae..515e633f9 100644 --- a/tests/test_swf/models/test_domain.py +++ b/tests/test_swf/models/test_domain.py @@ -8,7 +8,7 @@ from moto.swf.models import Domain # Fake WorkflowExecution for tests purposes WorkflowExecution = namedtuple( "WorkflowExecution", - ["workflow_id", "run_id", "execution_status"] + ["workflow_id", "run_id", "execution_status", "open"] ) @@ -59,10 +59,10 @@ def test_domain_decision_tasks(): def test_domain_get_workflow_execution(): domain = Domain("my-domain", "60") - wfe1 = WorkflowExecution(workflow_id="wf-id-1", run_id="run-id-1", execution_status="OPEN") - wfe2 = WorkflowExecution(workflow_id="wf-id-1", run_id="run-id-2", execution_status="CLOSED") - wfe3 = WorkflowExecution(workflow_id="wf-id-2", run_id="run-id-3", execution_status="OPEN") - wfe4 = WorkflowExecution(workflow_id="wf-id-3", run_id="run-id-4", execution_status="CLOSED") + wfe1 = WorkflowExecution(workflow_id="wf-id-1", run_id="run-id-1", execution_status="OPEN", open=True) + wfe2 = WorkflowExecution(workflow_id="wf-id-1", run_id="run-id-2", execution_status="CLOSED", open=False) + wfe3 = WorkflowExecution(workflow_id="wf-id-2", run_id="run-id-3", execution_status="OPEN", open=True) + wfe4 = WorkflowExecution(workflow_id="wf-id-3", run_id="run-id-4", execution_status="CLOSED", open=False) domain.workflow_executions = [wfe1, wfe2, wfe3, wfe4] # get workflow execution through workflow_id and run_id