Implement ScheduleActivityTask decision

This commit is contained in:
Jean-Baptiste Barth 2015-10-26 00:43:35 +01:00
parent 53630dc061
commit 5e086223c2
6 changed files with 371 additions and 5 deletions

View File

@ -23,6 +23,7 @@ class Domain(object):
# of "workflow_id (client determined)" => WorkflowExecution()
# here.
self.workflow_executions = {}
self.task_lists = defaultdict(list)
def __repr__(self):
return "Domain(name: %(name)s, status: %(status)s)" % self.__dict__
@ -83,3 +84,6 @@ class Domain(object):
)
)
return wfe
def add_to_task_list(self, task_list, obj):
self.task_lists[task_list].append(obj)

View File

@ -12,6 +12,13 @@ class GenericType(object):
self.description = kwargs.pop("description")
for key, value in kwargs.iteritems():
self.__setattr__(key, value)
# default values set to none
for key in self._configuration_keys:
attr = camelcase_to_underscores(key)
if not hasattr(self, attr):
self.__setattr__(attr, None)
if not hasattr(self, "task_list"):
self.task_list = None
def __repr__(self):
cls = self.__class__.__name__
@ -49,12 +56,10 @@ class GenericType(object):
"typeInfo": self.to_medium_dict(),
"configuration": {}
}
if hasattr(self, "task_list"):
if self.task_list:
hsh["configuration"]["defaultTaskList"] = {"name": self.task_list}
for key in self._configuration_keys:
attr = camelcase_to_underscores(key)
if not hasattr(self, attr):
continue
if not getattr(self, attr):
continue
hsh["configuration"][key] = getattr(self, attr)

View File

@ -81,6 +81,30 @@ class HistoryEvent(object):
if hasattr(self, "reason") and self.reason:
hsh["reason"] = self.reason
return hsh
elif self.event_type == "ActivityTaskScheduled":
hsh = {
"activityId": self.attributes["activityId"],
"activityType": self.activity_type.to_short_dict(),
"decisionTaskCompletedEventId": self.decision_task_completed_event_id,
"taskList": {
"name": self.task_list,
},
}
for attr in ["control", "heartbeatTimeout", "input", "scheduleToCloseTimeout",
"scheduleToStartTimeout", "startToCloseTimeout", "taskPriority"]:
if self.attributes.get(attr):
hsh[attr] = self.attributes[attr]
return hsh
elif self.event_type == "ScheduleActivityTaskFailed":
# TODO: implement other possible failure mode: OPEN_ACTIVITIES_LIMIT_EXCEEDED
# NB: some failure modes are not implemented and probably won't be implemented in the
# future, such as ACTIVITY_CREATION_RATE_EXCEEDED or OPERATION_NOT_PERMITTED
return {
"activityId": self.activity_id,
"activityType": self.activity_type.to_short_dict(),
"cause": self.cause,
"decisionTaskCompletedEventId": self.decision_task_completed_event_id,
}
else:
raise NotImplementedError(
"HistoryEvent does not implement attributes for type '{}'".format(self.event_type)

View File

@ -14,6 +14,8 @@ from ..exceptions import (
SWFDecisionValidationException,
)
from ..utils import decapitalize
from .activity_task import ActivityTask
from .activity_type import ActivityType
from .decision_task import DecisionTask
from .history_event import HistoryEvent
@ -209,7 +211,10 @@ class WorkflowExecution(object):
execution_context=execution_context,
)
dt.complete()
self.should_schedule_decision_next = False
self.handle_decisions(evt.event_id, decisions)
if self.should_schedule_decision_next:
self.schedule_decision_task()
def _check_decision_attributes(self, kind, value, decision_id):
problems = []
@ -293,6 +298,8 @@ class WorkflowExecution(object):
self.complete(event_id, attributes.get("result"))
elif decision_type == "FailWorkflowExecution":
self.fail(event_id, attributes.get("details"), attributes.get("reason"))
elif decision_type == "ScheduleActivityTask":
self.schedule_activity_task(event_id, attributes)
else:
# TODO: implement Decision type: CancelTimer
# TODO: implement Decision type: CancelWorkflowExecution
@ -300,7 +307,6 @@ class WorkflowExecution(object):
# TODO: implement Decision type: RecordMarker
# TODO: implement Decision type: RequestCancelActivityTask
# TODO: implement Decision type: RequestCancelExternalWorkflowExecution
# TODO: implement Decision type: ScheduleActivityTask
# TODO: implement Decision type: ScheduleLambdaFunction
# TODO: implement Decision type: SignalExternalWorkflowExecution
# TODO: implement Decision type: StartChildWorkflowExecution
@ -331,3 +337,96 @@ class WorkflowExecution(object):
details=details,
reason=reason,
)
def schedule_activity_task(self, event_id, attributes):
activity_type = self.domain.get_type(
"activity",
attributes["activityType"]["name"],
attributes["activityType"]["version"],
ignore_empty=True,
)
if not activity_type:
fake_type = ActivityType(attributes["activityType"]["name"],
attributes["activityType"]["version"])
self._add_event(
"ScheduleActivityTaskFailed",
activity_id=attributes["activityId"],
activity_type=fake_type,
cause="ACTIVITY_TYPE_DOES_NOT_EXIST",
decision_task_completed_event_id=event_id,
)
self.should_schedule_decision_next = True
return
if activity_type.status == "DEPRECATED":
self._add_event(
"ScheduleActivityTaskFailed",
activity_id=attributes["activityId"],
activity_type=activity_type,
cause="ACTIVITY_TYPE_DEPRECATED",
decision_task_completed_event_id=event_id,
)
self.should_schedule_decision_next = True
return
if any(at for at in self.activity_tasks
if at.activity_id == attributes["activityId"]):
self._add_event(
"ScheduleActivityTaskFailed",
activity_id=attributes["activityId"],
activity_type=activity_type,
cause="ACTIVITY_ID_ALREADY_IN_USE",
decision_task_completed_event_id=event_id,
)
self.should_schedule_decision_next = True
return
# find task list or default task list, else fail
task_list = attributes.get("taskList", {}).get("name")
if not task_list and activity_type.task_list:
task_list = activity_type.task_list
if not task_list:
self._add_event(
"ScheduleActivityTaskFailed",
activity_id=attributes["activityId"],
activity_type=activity_type,
cause="DEFAULT_TASK_LIST_UNDEFINED",
decision_task_completed_event_id=event_id,
)
self.should_schedule_decision_next = True
return
# find timeouts or default timeout, else fail
timeouts = {}
for _type in ["scheduleToStartTimeout", "scheduleToCloseTimeout", "startToCloseTimeout", "heartbeatTimeout"]:
default_key = "default_task_"+camelcase_to_underscores(_type)
default_value = getattr(activity_type, default_key)
timeouts[_type] = attributes.get(_type, default_value)
if not timeouts[_type]:
error_key = default_key.replace("default_task_", "default_")
self._add_event(
"ScheduleActivityTaskFailed",
activity_id=attributes["activityId"],
activity_type=activity_type,
cause="{}_UNDEFINED".format(error_key.upper()),
decision_task_completed_event_id=event_id,
)
self.should_schedule_decision_next = True
return
task = ActivityTask(
activity_id=attributes["activityId"],
activity_type=activity_type,
input=attributes.get("input"),
workflow_execution=self,
)
# Only add event and increment counters if nothing went wrong
# TODO: don't store activity tasks in 2 places...
self.activity_tasks.append(task)
self.domain.add_to_task_list(task_list, task)
self._add_event(
"ActivityTaskScheduled",
decision_task_completed_event_id=event_id,
activity_type=activity_type,
attributes=attributes,
task_list=task_list,
)
self.open_counts["openActivityTasks"] += 1

View File

@ -17,7 +17,13 @@ def setup_workflow():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60", description="A test domain")
conn = mock_basic_workflow_type("test-domain", conn)
conn.register_activity_type("test-domain", "test-activity", "v1.1")
conn.register_activity_type(
"test-domain", "test-activity", "v1.1",
default_task_heartbeat_timeout="600",
default_task_schedule_to_close_timeout="600",
default_task_schedule_to_start_timeout="600",
default_task_start_to_close_timeout="600",
)
wfe = conn.start_workflow_execution("test-domain", "uid-abcd1234", "test-workflow", "v1.0")
conn.run_id = wfe["runId"]
return conn
@ -266,3 +272,50 @@ def test_respond_decision_task_completed_with_fail_workflow_execution():
attrs = resp["events"][-1]["workflowExecutionFailedEventAttributes"]
attrs["reason"].should.equal("my rules")
attrs["details"].should.equal("foo")
@mock_swf
def test_respond_decision_task_completed_with_schedule_activity_task():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
task_token = resp["taskToken"]
decisions = [{
"decisionType": "ScheduleActivityTask",
"scheduleActivityTaskDecisionAttributes": {
"activityId": "my-activity-001",
"activityType": {
"name": "test-activity",
"version": "v1.1"
},
"heartbeatTimeout": "60",
"input": "123",
"taskList": {
"name": "my-task-list"
},
}
}]
resp = conn.respond_decision_task_completed(task_token, decisions=decisions)
resp.should.be.none
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal([
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"DecisionTaskCompleted",
"ActivityTaskScheduled",
])
resp["events"][-1]["activityTaskScheduledEventAttributes"].should.equal({
"decisionTaskCompletedEventId": 4,
"activityId": "my-activity-001",
"activityType": {
"name": "test-activity",
"version": "v1.1",
},
"heartbeatTimeout": "60",
"input": "123",
"taskList": {
"name": "my-task-list"
},
})

View File

@ -3,6 +3,7 @@ from freezegun import freeze_time
from moto.swf.models import (
ActivityTask,
ActivityType,
DecisionTask,
Domain,
GenericType,
@ -24,6 +25,7 @@ from .utils import (
# TODO: move them in utils
def make_workflow_execution(**kwargs):
domain = get_basic_domain()
domain.add_type(ActivityType("test-activity", "v1.1"))
wft = get_basic_workflow_type()
return WorkflowExecution(domain, wft, "ab1234", **kwargs)
@ -47,6 +49,13 @@ def test_domain_string_representation():
domain = Domain("my-domain", "60")
str(domain).should.equal("Domain(name: my-domain, status: REGISTERED)")
def test_domain_add_to_task_list():
domain = Domain("my-domain", "60")
domain.add_to_task_list("foo", "bar")
dict(domain.task_lists).should.equal({
"foo": ["bar"]
})
# GenericType (ActivityType, WorkflowType)
class FooType(GenericType):
@ -265,6 +274,178 @@ def test_workflow_execution_fail():
wfe.events()[-1].details.should.equal("some details")
wfe.events()[-1].reason.should.equal("my rules")
def test_workflow_execution_schedule_activity_task():
wfe = make_workflow_execution()
wfe.schedule_activity_task(123, {
"activityId": "my-activity-001",
"activityType": { "name": "test-activity", "version": "v1.1" },
"taskList": { "name": "task-list-name" },
"scheduleToStartTimeout": "600",
"scheduleToCloseTimeout": "600",
"startToCloseTimeout": "600",
"heartbeatTimeout": "300",
})
wfe.open_counts["openActivityTasks"].should.equal(1)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ActivityTaskScheduled")
last_event.decision_task_completed_event_id.should.equal(123)
last_event.task_list.should.equal("task-list-name")
wfe.activity_tasks.should.have.length_of(1)
task = wfe.activity_tasks[0]
task.activity_id.should.equal("my-activity-001")
task.activity_type.name.should.equal("test-activity")
wfe.domain.task_lists["task-list-name"].should.contain(task)
def test_workflow_execution_schedule_activity_task_without_task_list_should_take_default():
wfe = make_workflow_execution()
wfe.domain.add_type(
ActivityType("test-activity", "v1.2", task_list="foobar")
)
wfe.schedule_activity_task(123, {
"activityId": "my-activity-001",
"activityType": { "name": "test-activity", "version": "v1.2" },
"scheduleToStartTimeout": "600",
"scheduleToCloseTimeout": "600",
"startToCloseTimeout": "600",
"heartbeatTimeout": "300",
})
wfe.open_counts["openActivityTasks"].should.equal(1)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ActivityTaskScheduled")
last_event.task_list.should.equal("foobar")
task = wfe.activity_tasks[0]
wfe.domain.task_lists["foobar"].should.contain(task)
def test_workflow_execution_schedule_activity_task_should_fail_if_wrong_attributes():
wfe = make_workflow_execution()
at = ActivityType("test-activity", "v1.1")
at.status = "DEPRECATED"
wfe.domain.add_type(at)
wfe.domain.add_type(ActivityType("test-activity", "v1.2"))
hsh = {
"activityId": "my-activity-001",
"activityType": { "name": "test-activity-does-not-exists", "version": "v1.1" },
}
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("ACTIVITY_TYPE_DOES_NOT_EXIST")
hsh["activityType"]["name"] = "test-activity"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("ACTIVITY_TYPE_DEPRECATED")
hsh["activityType"]["version"] = "v1.2"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("DEFAULT_TASK_LIST_UNDEFINED")
hsh["taskList"] = { "name": "foobar" }
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("DEFAULT_SCHEDULE_TO_START_TIMEOUT_UNDEFINED")
hsh["scheduleToStartTimeout"] = "600"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("DEFAULT_SCHEDULE_TO_CLOSE_TIMEOUT_UNDEFINED")
hsh["scheduleToCloseTimeout"] = "600"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("DEFAULT_START_TO_CLOSE_TIMEOUT_UNDEFINED")
hsh["startToCloseTimeout"] = "600"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("DEFAULT_HEARTBEAT_TIMEOUT_UNDEFINED")
wfe.open_counts["openActivityTasks"].should.equal(0)
wfe.activity_tasks.should.have.length_of(0)
wfe.domain.task_lists.should.have.length_of(0)
hsh["heartbeatTimeout"] = "300"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ActivityTaskScheduled")
task = wfe.activity_tasks[0]
wfe.domain.task_lists["foobar"].should.contain(task)
wfe.open_counts["openDecisionTasks"].should.equal(0)
wfe.open_counts["openActivityTasks"].should.equal(1)
def test_workflow_execution_schedule_activity_task_failure_triggers_new_decision():
wfe = make_workflow_execution()
wfe.start()
task_token = wfe.decision_tasks[-1].task_token
wfe.start_decision_task(task_token)
wfe.complete_decision_task(task_token, decisions=[
{
"decisionType": "ScheduleActivityTask",
"scheduleActivityTaskDecisionAttributes": {
"activityId": "my-activity-001",
"activityType": { "name": "test-activity-does-not-exist", "version": "v1.2" },
}
},
{
"decisionType": "ScheduleActivityTask",
"scheduleActivityTaskDecisionAttributes": {
"activityId": "my-activity-001",
"activityType": { "name": "test-activity-does-not-exist", "version": "v1.2" },
}
},
])
wfe.open_counts["openActivityTasks"].should.equal(0)
wfe.open_counts["openDecisionTasks"].should.equal(1)
last_events = wfe.events()[-3:]
last_events[0].event_type.should.equal("ScheduleActivityTaskFailed")
last_events[1].event_type.should.equal("ScheduleActivityTaskFailed")
last_events[2].event_type.should.equal("DecisionTaskScheduled")
def test_workflow_execution_schedule_activity_task_with_same_activity_id():
wfe = make_workflow_execution()
wfe.schedule_activity_task(123, {
"activityId": "my-activity-001",
"activityType": { "name": "test-activity", "version": "v1.1" },
"taskList": { "name": "task-list-name" },
"scheduleToStartTimeout": "600",
"scheduleToCloseTimeout": "600",
"startToCloseTimeout": "600",
"heartbeatTimeout": "300",
})
wfe.open_counts["openActivityTasks"].should.equal(1)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ActivityTaskScheduled")
wfe.schedule_activity_task(123, {
"activityId": "my-activity-001",
"activityType": { "name": "test-activity", "version": "v1.1" },
"taskList": { "name": "task-list-name" },
"scheduleToStartTimeout": "600",
"scheduleToCloseTimeout": "600",
"startToCloseTimeout": "600",
"heartbeatTimeout": "300",
})
wfe.open_counts["openActivityTasks"].should.equal(1)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("ACTIVITY_ID_ALREADY_IN_USE")
# HistoryEvent
@freeze_time("2015-01-01 12:00:00")