Implement start to close timeout on SWF decision tasks
This commit is contained in:
parent
e32fef50b6
commit
86973f2b87
@ -54,6 +54,7 @@ class ActivityTask(object):
|
||||
self.last_heartbeat_timestamp = now_timestamp()
|
||||
|
||||
def has_timedout(self):
|
||||
# TODO: handle the "NONE" case
|
||||
heartbeat_timeout_at = self.last_heartbeat_timestamp + \
|
||||
int(self.timeouts["heartbeatTimeout"])
|
||||
return heartbeat_timeout_at < now_timestamp()
|
||||
|
@ -2,6 +2,8 @@ from __future__ import unicode_literals
|
||||
from datetime import datetime
|
||||
import uuid
|
||||
|
||||
from ..utils import now_timestamp
|
||||
|
||||
|
||||
class DecisionTask(object):
|
||||
def __init__(self, workflow_execution, scheduled_event_id):
|
||||
@ -11,10 +13,13 @@ class DecisionTask(object):
|
||||
self.scheduled_event_id = scheduled_event_id
|
||||
self.previous_started_event_id = 0
|
||||
self.started_event_id = None
|
||||
self.started_timestamp = None
|
||||
self.start_to_close_timeout = self.workflow_execution.task_start_to_close_timeout
|
||||
self.state = "SCHEDULED"
|
||||
# this is *not* necessarily coherent with workflow execution history,
|
||||
# but that shouldn't be a problem for tests
|
||||
self.scheduled_at = datetime.now()
|
||||
self.timeout_type = None
|
||||
|
||||
def to_full_dict(self, reverse_order=False):
|
||||
events = self.workflow_execution.events(reverse_order=reverse_order)
|
||||
@ -33,7 +38,21 @@ class DecisionTask(object):
|
||||
|
||||
def start(self, started_event_id):
|
||||
self.state = "STARTED"
|
||||
self.started_timestamp = now_timestamp()
|
||||
self.started_event_id = started_event_id
|
||||
|
||||
def complete(self):
|
||||
self.state = "COMPLETED"
|
||||
|
||||
def has_timedout(self):
|
||||
if self.state != "STARTED":
|
||||
return False
|
||||
# TODO: handle the "NONE" case
|
||||
start_to_close_timeout = self.started_timestamp + \
|
||||
int(self.start_to_close_timeout)
|
||||
return start_to_close_timeout < now_timestamp()
|
||||
|
||||
def process_timeouts(self):
|
||||
if self.has_timedout():
|
||||
self.state = "TIMED_OUT"
|
||||
self.timeout_type = "START_TO_CLOSE"
|
||||
|
@ -152,6 +152,12 @@ class HistoryEvent(object):
|
||||
if self.details:
|
||||
hsh["details"] = self.details
|
||||
return hsh
|
||||
elif self.event_type == "DecisionTaskTimedOut":
|
||||
return {
|
||||
"scheduledEventId": self.scheduled_event_id,
|
||||
"startedEventId": self.started_event_id,
|
||||
"timeoutType": self.timeout_type,
|
||||
}
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
"HistoryEvent does not implement attributes for type '{0}'".format(self.event_type)
|
||||
|
@ -148,8 +148,21 @@ class WorkflowExecution(object):
|
||||
|
||||
def _process_timeouts(self):
|
||||
self.should_schedule_decision_next = False
|
||||
|
||||
# TODO: process timeouts on workflow itself
|
||||
# TODO: process timeouts on decision tasks
|
||||
|
||||
# decision tasks timeouts
|
||||
for task in self.decision_tasks:
|
||||
if task.state == "STARTED" and task.has_timedout():
|
||||
self.should_schedule_decision_next = True
|
||||
task.process_timeouts()
|
||||
self._add_event(
|
||||
"DecisionTaskTimedOut",
|
||||
scheduled_event_id=task.scheduled_event_id,
|
||||
started_event_id=task.started_event_id,
|
||||
timeout_type=task.timeout_type,
|
||||
)
|
||||
|
||||
# activity tasks timeouts
|
||||
for task in self.activity_tasks:
|
||||
if task.open and task.has_timedout():
|
||||
|
@ -1,3 +1,4 @@
|
||||
from freezegun import freeze_time
|
||||
from sure import expect
|
||||
|
||||
from moto.swf.models import DecisionTask
|
||||
@ -29,3 +30,20 @@ def test_decision_task_full_dict_representation():
|
||||
dt.start(1234)
|
||||
fd = dt.to_full_dict()
|
||||
fd["startedEventId"].should.equal(1234)
|
||||
|
||||
def test_decision_task_has_timedout():
|
||||
wfe = make_workflow_execution()
|
||||
wft = wfe.workflow_type
|
||||
dt = DecisionTask(wfe, 123)
|
||||
dt.has_timedout().should.equal(False)
|
||||
|
||||
with freeze_time("2015-01-01 12:00:00"):
|
||||
dt.start(1234)
|
||||
dt.has_timedout().should.equal(False)
|
||||
|
||||
# activity task timeout is 300s == 5mins
|
||||
with freeze_time("2015-01-01 12:06:00"):
|
||||
dt.has_timedout().should.equal(True)
|
||||
|
||||
dt.complete()
|
||||
dt.has_timedout().should.equal(False)
|
||||
|
@ -32,3 +32,34 @@ def test_activity_task_heartbeat_timeout():
|
||||
attrs["timeoutType"].should.equal("HEARTBEAT")
|
||||
|
||||
resp["events"][-1]["eventType"].should.equal("DecisionTaskScheduled")
|
||||
|
||||
# Decision Task Start to Close timeout
|
||||
# Default value in workflow helpers: 5 mins
|
||||
@mock_swf
|
||||
def test_decision_task_start_to_close_timeout():
|
||||
pass
|
||||
with freeze_time("2015-01-01 12:00:00"):
|
||||
conn = setup_workflow()
|
||||
conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
|
||||
|
||||
with freeze_time("2015-01-01 12:04:30"):
|
||||
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
|
||||
|
||||
event_types = [evt["eventType"] for evt in resp["events"]]
|
||||
event_types.should.equal(
|
||||
["WorkflowExecutionStarted", "DecisionTaskScheduled", "DecisionTaskStarted"]
|
||||
)
|
||||
|
||||
with freeze_time("2015-01-01 12:05:30"):
|
||||
# => Decision Task Start to Close timeout reached!!
|
||||
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
|
||||
|
||||
event_types = [evt["eventType"] for evt in resp["events"]]
|
||||
event_types.should.equal(
|
||||
["WorkflowExecutionStarted", "DecisionTaskScheduled", "DecisionTaskStarted",
|
||||
"DecisionTaskTimedOut", "DecisionTaskScheduled"]
|
||||
)
|
||||
attrs = resp["events"][-2]["decisionTaskTimedOutEventAttributes"]
|
||||
attrs.should.equal({
|
||||
"scheduledEventId": 2, "startedEventId": 3, "timeoutType": "START_TO_CLOSE"
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user