Implement start to close timeout on SWF workflow executions
This commit is contained in:
parent
86973f2b87
commit
f38d23e483
@ -158,6 +158,11 @@ class HistoryEvent(object):
|
|||||||
"startedEventId": self.started_event_id,
|
"startedEventId": self.started_event_id,
|
||||||
"timeoutType": self.timeout_type,
|
"timeoutType": self.timeout_type,
|
||||||
}
|
}
|
||||||
|
elif self.event_type == "WorkflowExecutionTimedOut":
|
||||||
|
return {
|
||||||
|
"childPolicy": self.child_policy,
|
||||||
|
"timeoutType": self.timeout_type,
|
||||||
|
}
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError(
|
raise NotImplementedError(
|
||||||
"HistoryEvent does not implement attributes for type '{0}'".format(self.event_type)
|
"HistoryEvent does not implement attributes for type '{0}'".format(self.event_type)
|
||||||
|
@ -59,6 +59,7 @@ class WorkflowExecution(object):
|
|||||||
self.parent = None
|
self.parent = None
|
||||||
self.start_timestamp = None
|
self.start_timestamp = None
|
||||||
self.tag_list = [] # TODO
|
self.tag_list = [] # TODO
|
||||||
|
self.timeout_type = None
|
||||||
self.workflow_type = workflow_type
|
self.workflow_type = workflow_type
|
||||||
# args processing
|
# args processing
|
||||||
# NB: the order follows boto/SWF order of exceptions appearance (if no
|
# NB: the order follows boto/SWF order of exceptions appearance (if no
|
||||||
@ -149,7 +150,15 @@ class WorkflowExecution(object):
|
|||||||
def _process_timeouts(self):
|
def _process_timeouts(self):
|
||||||
self.should_schedule_decision_next = False
|
self.should_schedule_decision_next = False
|
||||||
|
|
||||||
# TODO: process timeouts on workflow itself
|
# workflow execution timeout
|
||||||
|
if self.has_timedout():
|
||||||
|
self.process_timeouts()
|
||||||
|
# TODO: process child policy on child workflows here or in process_timeouts()
|
||||||
|
self._add_event(
|
||||||
|
"WorkflowExecutionTimedOut",
|
||||||
|
child_policy=self.child_policy,
|
||||||
|
timeout_type=self.timeout_type,
|
||||||
|
)
|
||||||
|
|
||||||
# decision tasks timeouts
|
# decision tasks timeouts
|
||||||
for task in self.decision_tasks:
|
for task in self.decision_tasks:
|
||||||
@ -512,3 +521,17 @@ class WorkflowExecution(object):
|
|||||||
self.execution_status = "CLOSED"
|
self.execution_status = "CLOSED"
|
||||||
self.close_status = "TERMINATED"
|
self.close_status = "TERMINATED"
|
||||||
self.close_cause = "OPERATOR_INITIATED"
|
self.close_cause = "OPERATOR_INITIATED"
|
||||||
|
|
||||||
|
def has_timedout(self):
|
||||||
|
if self.execution_status != "OPEN" or not self.start_timestamp:
|
||||||
|
return False
|
||||||
|
# TODO: handle the "NONE" case
|
||||||
|
start_to_close_timeout = self.start_timestamp + \
|
||||||
|
int(self.execution_start_to_close_timeout)
|
||||||
|
return start_to_close_timeout < now_timestamp()
|
||||||
|
|
||||||
|
def process_timeouts(self):
|
||||||
|
if self.has_timedout():
|
||||||
|
self.execution_status = "CLOSED"
|
||||||
|
self.close_status = "TIMED_OUT"
|
||||||
|
self.timeout_type = "START_TO_CLOSE"
|
||||||
|
@ -394,3 +394,15 @@ def test_terminate():
|
|||||||
last_event.event_type.should.equal("WorkflowExecutionTerminated")
|
last_event.event_type.should.equal("WorkflowExecutionTerminated")
|
||||||
# take default child_policy if not provided (as here)
|
# take default child_policy if not provided (as here)
|
||||||
last_event.child_policy.should.equal("ABANDON")
|
last_event.child_policy.should.equal("ABANDON")
|
||||||
|
|
||||||
|
def test_has_timedout():
|
||||||
|
wfe = make_workflow_execution()
|
||||||
|
wfe.has_timedout().should.equal(False)
|
||||||
|
|
||||||
|
with freeze_time("2015-01-01 12:00:00"):
|
||||||
|
wfe.start()
|
||||||
|
wfe.has_timedout().should.equal(False)
|
||||||
|
|
||||||
|
with freeze_time("2015-01-01 14:01"):
|
||||||
|
# 2 hours timeout reached
|
||||||
|
wfe.has_timedout().should.equal(True)
|
||||||
|
@ -63,3 +63,32 @@ def test_decision_task_start_to_close_timeout():
|
|||||||
attrs.should.equal({
|
attrs.should.equal({
|
||||||
"scheduledEventId": 2, "startedEventId": 3, "timeoutType": "START_TO_CLOSE"
|
"scheduledEventId": 2, "startedEventId": 3, "timeoutType": "START_TO_CLOSE"
|
||||||
})
|
})
|
||||||
|
|
||||||
|
# Workflow Execution Start to Close timeout
|
||||||
|
# Default value in workflow helpers: 2 hours
|
||||||
|
@mock_swf
|
||||||
|
def test_workflow_execution_start_to_close_timeout():
|
||||||
|
pass
|
||||||
|
with freeze_time("2015-01-01 12:00:00"):
|
||||||
|
conn = setup_workflow()
|
||||||
|
|
||||||
|
with freeze_time("2015-01-01 13:59:30"):
|
||||||
|
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
|
||||||
|
|
||||||
|
event_types = [evt["eventType"] for evt in resp["events"]]
|
||||||
|
event_types.should.equal(
|
||||||
|
["WorkflowExecutionStarted", "DecisionTaskScheduled"]
|
||||||
|
)
|
||||||
|
|
||||||
|
with freeze_time("2015-01-01 14:00:30"):
|
||||||
|
# => Workflow Execution Start to Close timeout reached!!
|
||||||
|
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
|
||||||
|
|
||||||
|
event_types = [evt["eventType"] for evt in resp["events"]]
|
||||||
|
event_types.should.equal(
|
||||||
|
["WorkflowExecutionStarted", "DecisionTaskScheduled", "WorkflowExecutionTimedOut"]
|
||||||
|
)
|
||||||
|
attrs = resp["events"][-1]["workflowExecutionTimedOutEventAttributes"]
|
||||||
|
attrs.should.equal({
|
||||||
|
"childPolicy": "ABANDON", "timeoutType": "START_TO_CLOSE"
|
||||||
|
})
|
||||||
|
@ -44,7 +44,7 @@ def _generic_workflow_type_attributes():
|
|||||||
], {
|
], {
|
||||||
"task_list": "queue",
|
"task_list": "queue",
|
||||||
"default_child_policy": "ABANDON",
|
"default_child_policy": "ABANDON",
|
||||||
"default_execution_start_to_close_timeout": "300",
|
"default_execution_start_to_close_timeout": "7200",
|
||||||
"default_task_start_to_close_timeout": "300",
|
"default_task_start_to_close_timeout": "300",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user