Rework task lists for activity/decision tasks

This commit is contained in:
Jean-Baptiste Barth 2015-10-26 10:55:55 +01:00
parent 83c08b7655
commit be71909a8c
4 changed files with 72 additions and 18 deletions

View File

@ -23,7 +23,8 @@ class Domain(object):
# of "workflow_id (client determined)" => WorkflowExecution() # of "workflow_id (client determined)" => WorkflowExecution()
# here. # here.
self.workflow_executions = {} self.workflow_executions = {}
self.task_lists = defaultdict(list) self.activity_task_lists = {}
self.decision_task_lists = {}
def __repr__(self): def __repr__(self):
return "Domain(name: %(name)s, status: %(status)s)" % self.__dict__ return "Domain(name: %(name)s, status: %(status)s)" % self.__dict__
@ -85,5 +86,26 @@ class Domain(object):
) )
return wfe return wfe
def add_to_task_list(self, task_list, obj): def add_to_activity_task_list(self, task_list, obj):
self.task_lists[task_list].append(obj) if not task_list in self.activity_task_lists:
self.activity_task_lists[task_list] = []
self.activity_task_lists[task_list].append(obj)
@property
def activity_tasks(self):
_all = []
for _, tasks in self.activity_task_lists.iteritems():
_all += tasks
return _all
def add_to_decision_task_list(self, task_list, obj):
if not task_list in self.decision_task_lists:
self.decision_task_lists[task_list] = []
self.decision_task_lists[task_list].append(obj)
@property
def decision_tasks(self):
_all = []
for _, tasks in self.decision_task_lists.iteritems():
_all += tasks
return _all

View File

@ -75,9 +75,7 @@ class WorkflowExecution(object):
} }
# events # events
self._events = [] self._events = []
# tasks # child workflows
self.decision_tasks = []
self.activity_tasks = []
self.child_workflow_executions = [] self.child_workflow_executions = []
def __repr__(self): def __repr__(self):
@ -167,12 +165,22 @@ class WorkflowExecution(object):
self.schedule_decision_task() self.schedule_decision_task()
def schedule_decision_task(self): def schedule_decision_task(self):
self.open_counts["openDecisionTasks"] += 1
evt = self._add_event( evt = self._add_event(
"DecisionTaskScheduled", "DecisionTaskScheduled",
workflow_execution=self, workflow_execution=self,
) )
self.decision_tasks.append(DecisionTask(self, evt.event_id)) self.domain.add_to_decision_task_list(
self.task_list,
DecisionTask(self, evt.event_id),
)
self.open_counts["openDecisionTasks"] += 1
@property
def decision_tasks(self):
return filter(
lambda t: t.workflow_execution == self,
self.domain.decision_tasks
)
@property @property
def scheduled_decision_tasks(self): def scheduled_decision_tasks(self):
@ -181,6 +189,13 @@ class WorkflowExecution(object):
self.decision_tasks self.decision_tasks
) )
@property
def activity_tasks(self):
return filter(
lambda t: t.workflow_execution == self,
self.domain.activity_tasks
)
def _find_decision_task(self, task_token): def _find_decision_task(self, task_token):
for dt in self.decision_tasks: for dt in self.decision_tasks:
if dt.task_token == task_token: if dt.task_token == task_token:
@ -395,9 +410,7 @@ class WorkflowExecution(object):
workflow_execution=self, workflow_execution=self,
) )
# Only add event and increment counters if nothing went wrong # Only add event and increment counters if nothing went wrong
# TODO: don't store activity tasks in 2 places... self.domain.add_to_activity_task_list(task_list, task)
self.activity_tasks.append(task)
self.domain.add_to_task_list(task_list, task)
self._add_event( self._add_event(
"ActivityTaskScheduled", "ActivityTaskScheduled",
decision_task_completed_event_id=event_id, decision_task_completed_event_id=event_id,

View File

@ -21,9 +21,28 @@ def test_domain_string_representation():
domain = Domain("my-domain", "60") domain = Domain("my-domain", "60")
str(domain).should.equal("Domain(name: my-domain, status: REGISTERED)") str(domain).should.equal("Domain(name: my-domain, status: REGISTERED)")
def test_domain_add_to_task_list(): def test_domain_add_to_activity_task_list():
domain = Domain("my-domain", "60") domain = Domain("my-domain", "60")
domain.add_to_task_list("foo", "bar") domain.add_to_activity_task_list("foo", "bar")
dict(domain.task_lists).should.equal({ domain.activity_task_lists.should.equal({
"foo": ["bar"] "foo": ["bar"]
}) })
def test_domain_activity_tasks():
domain = Domain("my-domain", "60")
domain.add_to_activity_task_list("foo", "bar")
domain.add_to_activity_task_list("other", "baz")
domain.activity_tasks.should.equal(["bar", "baz"])
def test_domain_add_to_decision_task_list():
domain = Domain("my-domain", "60")
domain.add_to_decision_task_list("foo", "bar")
domain.decision_task_lists.should.equal({
"foo": ["bar"]
})
def test_domain_decision_tasks():
domain = Domain("my-domain", "60")
domain.add_to_decision_task_list("foo", "bar")
domain.add_to_decision_task_list("other", "baz")
domain.decision_tasks.should.equal(["bar", "baz"])

View File

@ -207,7 +207,7 @@ def test_workflow_execution_schedule_activity_task():
task = wfe.activity_tasks[0] task = wfe.activity_tasks[0]
task.activity_id.should.equal("my-activity-001") task.activity_id.should.equal("my-activity-001")
task.activity_type.name.should.equal("test-activity") task.activity_type.name.should.equal("test-activity")
wfe.domain.task_lists["task-list-name"].should.contain(task) wfe.domain.activity_task_lists["task-list-name"].should.contain(task)
def test_workflow_execution_schedule_activity_task_without_task_list_should_take_default(): def test_workflow_execution_schedule_activity_task_without_task_list_should_take_default():
wfe = make_workflow_execution() wfe = make_workflow_execution()
@ -229,7 +229,7 @@ def test_workflow_execution_schedule_activity_task_without_task_list_should_take
last_event.task_list.should.equal("foobar") last_event.task_list.should.equal("foobar")
task = wfe.activity_tasks[0] task = wfe.activity_tasks[0]
wfe.domain.task_lists["foobar"].should.contain(task) wfe.domain.activity_task_lists["foobar"].should.contain(task)
def test_workflow_execution_schedule_activity_task_should_fail_if_wrong_attributes(): def test_workflow_execution_schedule_activity_task_should_fail_if_wrong_attributes():
wfe = make_workflow_execution() wfe = make_workflow_execution()
@ -286,7 +286,7 @@ def test_workflow_execution_schedule_activity_task_should_fail_if_wrong_attribut
wfe.open_counts["openActivityTasks"].should.equal(0) wfe.open_counts["openActivityTasks"].should.equal(0)
wfe.activity_tasks.should.have.length_of(0) wfe.activity_tasks.should.have.length_of(0)
wfe.domain.task_lists.should.have.length_of(0) wfe.domain.activity_task_lists.should.have.length_of(0)
hsh["heartbeatTimeout"] = "300" hsh["heartbeatTimeout"] = "300"
wfe.schedule_activity_task(123, hsh) wfe.schedule_activity_task(123, hsh)
@ -294,7 +294,7 @@ def test_workflow_execution_schedule_activity_task_should_fail_if_wrong_attribut
last_event.event_type.should.equal("ActivityTaskScheduled") last_event.event_type.should.equal("ActivityTaskScheduled")
task = wfe.activity_tasks[0] task = wfe.activity_tasks[0]
wfe.domain.task_lists["foobar"].should.contain(task) wfe.domain.activity_task_lists["foobar"].should.contain(task)
wfe.open_counts["openDecisionTasks"].should.equal(0) wfe.open_counts["openDecisionTasks"].should.equal(0)
wfe.open_counts["openActivityTasks"].should.equal(1) wfe.open_counts["openActivityTasks"].should.equal(1)