Simplify implementation of ScheduleActivityTask decision

This commit is contained in:
Jean-Baptiste Barth 2015-10-26 05:50:15 +01:00
parent 5e086223c2
commit a713005882

View File

@ -200,6 +200,9 @@ class WorkflowExecution(object):
dt.start(evt.event_id) dt.start(evt.event_id)
def complete_decision_task(self, task_token, decisions=None, execution_context=None): def complete_decision_task(self, task_token, decisions=None, execution_context=None):
# 'decisions' can be None per boto.swf defaults, so replace it with something iterable
if not decisions:
decisions = []
# In case of a malformed or invalid decision task, SWF will raise an error and # In case of a malformed or invalid decision task, SWF will raise an error and
# it won't perform any of the decisions in the decision set. # it won't perform any of the decisions in the decision set.
self.validate_decisions(decisions) self.validate_decisions(decisions)
@ -236,9 +239,6 @@ class WorkflowExecution(object):
validation problem, such as a malformed decision for instance. I didn't validation problem, such as a malformed decision for instance. I didn't
find an explicit documentation for that though, so criticisms welcome. find an explicit documentation for that though, so criticisms welcome.
""" """
if not decisions:
return
problems = [] problems = []
# check close decision is last # check close decision is last
@ -285,10 +285,6 @@ class WorkflowExecution(object):
Handles a Decision according to SWF docs. Handles a Decision according to SWF docs.
See: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_Decision.html See: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_Decision.html
""" """
# 'decisions' can be None per boto.swf defaults, so replace it with something iterable
if not decisions:
decisions = []
# handle each decision separately, in order # handle each decision separately, in order
for decision in decisions: for decision in decisions:
decision_type = decision["decisionType"] decision_type = decision["decisionType"]
@ -339,6 +335,17 @@ class WorkflowExecution(object):
) )
def schedule_activity_task(self, event_id, attributes): def schedule_activity_task(self, event_id, attributes):
# Helper function to avoid repeating ourselves in the next sections
def fail_schedule_activity_task(_type, _cause):
self._add_event(
"ScheduleActivityTaskFailed",
activity_id=attributes["activityId"],
activity_type=_type,
cause=_cause,
decision_task_completed_event_id=event_id,
)
self.should_schedule_decision_next = True
activity_type = self.domain.get_type( activity_type = self.domain.get_type(
"activity", "activity",
attributes["activityType"]["name"], attributes["activityType"]["name"],
@ -348,35 +355,16 @@ class WorkflowExecution(object):
if not activity_type: if not activity_type:
fake_type = ActivityType(attributes["activityType"]["name"], fake_type = ActivityType(attributes["activityType"]["name"],
attributes["activityType"]["version"]) attributes["activityType"]["version"])
self._add_event( fail_schedule_activity_task(fake_type,
"ScheduleActivityTaskFailed", "ACTIVITY_TYPE_DOES_NOT_EXIST")
activity_id=attributes["activityId"],
activity_type=fake_type,
cause="ACTIVITY_TYPE_DOES_NOT_EXIST",
decision_task_completed_event_id=event_id,
)
self.should_schedule_decision_next = True
return return
if activity_type.status == "DEPRECATED": if activity_type.status == "DEPRECATED":
self._add_event( fail_schedule_activity_task(activity_type,
"ScheduleActivityTaskFailed", "ACTIVITY_TYPE_DEPRECATED")
activity_id=attributes["activityId"],
activity_type=activity_type,
cause="ACTIVITY_TYPE_DEPRECATED",
decision_task_completed_event_id=event_id,
)
self.should_schedule_decision_next = True
return return
if any(at for at in self.activity_tasks if any(at for at in self.activity_tasks if at.activity_id == attributes["activityId"]):
if at.activity_id == attributes["activityId"]): fail_schedule_activity_task(activity_type,
self._add_event( "ACTIVITY_ID_ALREADY_IN_USE")
"ScheduleActivityTaskFailed",
activity_id=attributes["activityId"],
activity_type=activity_type,
cause="ACTIVITY_ID_ALREADY_IN_USE",
decision_task_completed_event_id=event_id,
)
self.should_schedule_decision_next = True
return return
# find task list or default task list, else fail # find task list or default task list, else fail
@ -384,14 +372,8 @@ class WorkflowExecution(object):
if not task_list and activity_type.task_list: if not task_list and activity_type.task_list:
task_list = activity_type.task_list task_list = activity_type.task_list
if not task_list: if not task_list:
self._add_event( fail_schedule_activity_task(activity_type,
"ScheduleActivityTaskFailed", "DEFAULT_TASK_LIST_UNDEFINED")
activity_id=attributes["activityId"],
activity_type=activity_type,
cause="DEFAULT_TASK_LIST_UNDEFINED",
decision_task_completed_event_id=event_id,
)
self.should_schedule_decision_next = True
return return
# find timeouts or default timeout, else fail # find timeouts or default timeout, else fail
@ -402,14 +384,8 @@ class WorkflowExecution(object):
timeouts[_type] = attributes.get(_type, default_value) timeouts[_type] = attributes.get(_type, default_value)
if not timeouts[_type]: if not timeouts[_type]:
error_key = default_key.replace("default_task_", "default_") error_key = default_key.replace("default_task_", "default_")
self._add_event( fail_schedule_activity_task(activity_type,
"ScheduleActivityTaskFailed", "{}_UNDEFINED".format(error_key.upper()))
activity_id=attributes["activityId"],
activity_type=activity_type,
cause="{}_UNDEFINED".format(error_key.upper()),
decision_task_completed_event_id=event_id,
)
self.should_schedule_decision_next = True
return return
task = ActivityTask( task = ActivityTask(