Ensure activity and decision tasks cannot timeout on a closed workflow

This commit is contained in:
Jean-Baptiste Barth 2015-11-04 22:35:45 +01:00
parent 9c3996ff58
commit 65c95ab5bc
5 changed files with 44 additions and 7 deletions

View File

@ -54,6 +54,8 @@ class ActivityTask(object):
self.last_heartbeat_timestamp = now_timestamp()
def has_timedout(self):
if not self.workflow_execution.open:
return False
# TODO: handle the "NONE" case
heartbeat_timeout_at = self.last_heartbeat_timestamp + \
int(self.timeouts["heartbeatTimeout"])

View File

@ -45,7 +45,7 @@ class DecisionTask(object):
self.state = "COMPLETED"
def has_timedout(self):
if self.state != "STARTED":
if self.state != "STARTED" or not self.workflow_execution.open:
return False
# TODO: handle the "NONE" case
start_to_close_timeout = self.started_timestamp + \

View File

@ -102,3 +102,24 @@ def test_activity_task_has_timedout():
task.process_timeouts()
task.state.should.equal("TIMED_OUT")
task.timeout_type.should.equal("HEARTBEAT")
def test_activity_task_cannot_timeout_on_closed_workflow_execution():
with freeze_time("2015-01-01 12:00:00"):
wfe = make_workflow_execution()
wfe.start()
with freeze_time("2015-01-01 13:58:00"):
task = ActivityTask(
activity_id="my-activity-123",
activity_type="foo",
input="optional",
scheduled_event_id=117,
timeouts=ACTIVITY_TASK_TIMEOUTS,
workflow_execution=wfe,
)
with freeze_time("2015-01-01 14:10:00"):
task.has_timedout().should.equal(True)
wfe.has_timedout().should.equal(True)
wfe.process_timeouts()
task.has_timedout().should.equal(False)

View File

@ -33,7 +33,6 @@ def test_decision_task_full_dict_representation():
def test_decision_task_has_timedout():
wfe = make_workflow_execution()
wft = wfe.workflow_type
dt = DecisionTask(wfe, 123)
dt.has_timedout().should.equal(False)
@ -47,3 +46,18 @@ def test_decision_task_has_timedout():
dt.complete()
dt.has_timedout().should.equal(False)
def test_decision_task_cannot_timeout_on_closed_workflow_execution():
with freeze_time("2015-01-01 12:00:00"):
wfe = make_workflow_execution()
wfe.start()
with freeze_time("2015-01-01 13:55:00"):
dt = DecisionTask(wfe, 123)
dt.start(1234)
with freeze_time("2015-01-01 14:10:00"):
dt.has_timedout().should.equal(True)
wfe.has_timedout().should.equal(True)
wfe.process_timeouts()
dt.has_timedout().should.equal(False)

View File

@ -8,7 +8,7 @@ from moto.swf.models import Domain
# Fake WorkflowExecution for tests purposes
WorkflowExecution = namedtuple(
"WorkflowExecution",
["workflow_id", "run_id", "execution_status"]
["workflow_id", "run_id", "execution_status", "open"]
)
@ -59,10 +59,10 @@ def test_domain_decision_tasks():
def test_domain_get_workflow_execution():
domain = Domain("my-domain", "60")
wfe1 = WorkflowExecution(workflow_id="wf-id-1", run_id="run-id-1", execution_status="OPEN")
wfe2 = WorkflowExecution(workflow_id="wf-id-1", run_id="run-id-2", execution_status="CLOSED")
wfe3 = WorkflowExecution(workflow_id="wf-id-2", run_id="run-id-3", execution_status="OPEN")
wfe4 = WorkflowExecution(workflow_id="wf-id-3", run_id="run-id-4", execution_status="CLOSED")
wfe1 = WorkflowExecution(workflow_id="wf-id-1", run_id="run-id-1", execution_status="OPEN", open=True)
wfe2 = WorkflowExecution(workflow_id="wf-id-1", run_id="run-id-2", execution_status="CLOSED", open=False)
wfe3 = WorkflowExecution(workflow_id="wf-id-2", run_id="run-id-3", execution_status="OPEN", open=True)
wfe4 = WorkflowExecution(workflow_id="wf-id-3", run_id="run-id-4", execution_status="CLOSED", open=False)
domain.workflow_executions = [wfe1, wfe2, wfe3, wfe4]
# get workflow execution through workflow_id and run_id