Make SWF events formatting more generic

(suggested in @spulec review)
This commit is contained in:
Jean-Baptiste Barth 2015-11-23 14:51:58 +01:00
parent a06f8b15f5
commit 566a90800e
3 changed files with 87 additions and 174 deletions

View File

@ -2,168 +2,65 @@ from __future__ import unicode_literals
from datetime import datetime
from time import mktime
from moto.core.utils import underscores_to_camelcase
from ..utils import decapitalize, now_timestamp
# We keep track of which history event types we support
# so that we'll be able to catch specific formatting
# for new events if needed.
SUPPORTED_HISTORY_EVENT_TYPES = (
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"DecisionTaskCompleted",
"WorkflowExecutionCompleted",
"WorkflowExecutionFailed",
"ActivityTaskScheduled",
"ScheduleActivityTaskFailed",
"ActivityTaskStarted",
"ActivityTaskCompleted",
"ActivityTaskFailed",
"WorkflowExecutionTerminated",
"ActivityTaskTimedOut",
"DecisionTaskTimedOut",
"WorkflowExecutionTimedOut",
)
class HistoryEvent(object):
def __init__(self, event_id, event_type, **kwargs):
def __init__(self, event_id, event_type, event_timestamp=None, **kwargs):
if event_type not in SUPPORTED_HISTORY_EVENT_TYPES:
raise NotImplementedError(
"HistoryEvent does not implement attributes for type '{0}'".format(event_type)
)
self.event_id = event_id
self.event_type = event_type
self.event_timestamp = now_timestamp()
if event_timestamp:
self.event_timestamp = event_timestamp
else:
self.event_timestamp = now_timestamp()
# pre-populate a dict: {"camelCaseKey": value}
self.event_attributes = {}
for key, value in kwargs.items():
self.__setattr__(key, value)
# break soon if attributes are not valid
self.event_attributes()
if value:
camel_key = underscores_to_camelcase(key)
if key == "task_list":
value = { "name": value }
elif key == "workflow_type":
value = { "name": value.name, "version": value.version }
elif key == "activity_type":
value = value.to_short_dict()
self.event_attributes[camel_key] = value
def to_dict(self):
return {
"eventId": self.event_id,
"eventType": self.event_type,
"eventTimestamp": self.event_timestamp,
self._attributes_key(): self.event_attributes()
self._attributes_key(): self.event_attributes
}
def _attributes_key(self):
key = "{0}EventAttributes".format(self.event_type)
return decapitalize(key)
def event_attributes(self):
if self.event_type == "WorkflowExecutionStarted":
wfe = self.workflow_execution
hsh = {
"childPolicy": wfe.child_policy,
"executionStartToCloseTimeout": wfe.execution_start_to_close_timeout,
"parentInitiatedEventId": 0,
"taskList": {
"name": wfe.task_list
},
"taskStartToCloseTimeout": wfe.task_start_to_close_timeout,
"workflowType": {
"name": wfe.workflow_type.name,
"version": wfe.workflow_type.version
}
}
return hsh
elif self.event_type == "DecisionTaskScheduled":
wfe = self.workflow_execution
return {
"startToCloseTimeout": wfe.task_start_to_close_timeout,
"taskList": {"name": wfe.task_list}
}
elif self.event_type == "DecisionTaskStarted":
hsh = {
"scheduledEventId": self.scheduled_event_id
}
if hasattr(self, "identity") and self.identity:
hsh["identity"] = self.identity
return hsh
elif self.event_type == "DecisionTaskCompleted":
hsh = {
"scheduledEventId": self.scheduled_event_id,
"startedEventId": self.started_event_id,
}
if hasattr(self, "execution_context") and self.execution_context:
hsh["executionContext"] = self.execution_context
return hsh
elif self.event_type == "WorkflowExecutionCompleted":
hsh = {
"decisionTaskCompletedEventId": self.decision_task_completed_event_id,
}
if hasattr(self, "result") and self.result:
hsh["result"] = self.result
return hsh
elif self.event_type == "WorkflowExecutionFailed":
hsh = {
"decisionTaskCompletedEventId": self.decision_task_completed_event_id,
}
if hasattr(self, "details") and self.details:
hsh["details"] = self.details
if hasattr(self, "reason") and self.reason:
hsh["reason"] = self.reason
return hsh
elif self.event_type == "ActivityTaskScheduled":
hsh = {
"activityId": self.attributes["activityId"],
"activityType": self.activity_type.to_short_dict(),
"decisionTaskCompletedEventId": self.decision_task_completed_event_id,
"taskList": {
"name": self.task_list,
},
}
for attr in ["control", "heartbeatTimeout", "input", "scheduleToCloseTimeout",
"scheduleToStartTimeout", "startToCloseTimeout", "taskPriority"]:
if self.attributes.get(attr):
hsh[attr] = self.attributes[attr]
return hsh
elif self.event_type == "ScheduleActivityTaskFailed":
# TODO: implement other possible failure mode: OPEN_ACTIVITIES_LIMIT_EXCEEDED
# NB: some failure modes are not implemented and probably won't be implemented in the
# future, such as ACTIVITY_CREATION_RATE_EXCEEDED or OPERATION_NOT_PERMITTED
return {
"activityId": self.activity_id,
"activityType": self.activity_type.to_short_dict(),
"cause": self.cause,
"decisionTaskCompletedEventId": self.decision_task_completed_event_id,
}
elif self.event_type == "ActivityTaskStarted":
# TODO: merge it with DecisionTaskStarted
hsh = {
"scheduledEventId": self.scheduled_event_id
}
if hasattr(self, "identity") and self.identity:
hsh["identity"] = self.identity
return hsh
elif self.event_type == "ActivityTaskCompleted":
hsh = {
"scheduledEventId": self.scheduled_event_id,
"startedEventId": self.started_event_id,
}
if hasattr(self, "result") and self.result is not None:
hsh["result"] = self.result
return hsh
elif self.event_type == "ActivityTaskFailed":
# TODO: maybe merge it with ActivityTaskCompleted (different optional params tho)
hsh = {
"scheduledEventId": self.scheduled_event_id,
"startedEventId": self.started_event_id,
}
if hasattr(self, "reason") and self.reason is not None:
hsh["reason"] = self.reason
if hasattr(self, "details") and self.details is not None:
hsh["details"] = self.details
return hsh
elif self.event_type == "WorkflowExecutionTerminated":
hsh = {
"childPolicy": self.child_policy,
}
if self.cause:
hsh["cause"] = self.cause
if self.details:
hsh["details"] = self.details
if self.reason:
hsh["reason"] = self.reason
return hsh
elif self.event_type == "ActivityTaskTimedOut":
hsh = {
"scheduledEventId": self.scheduled_event_id,
"startedEventId": self.started_event_id,
"timeoutType": self.timeout_type,
}
if self.details:
hsh["details"] = self.details
return hsh
elif self.event_type == "DecisionTaskTimedOut":
return {
"scheduledEventId": self.scheduled_event_id,
"startedEventId": self.started_event_id,
"timeoutType": self.timeout_type,
}
elif self.event_type == "WorkflowExecutionTimedOut":
return {
"childPolicy": self.child_policy,
"timeoutType": self.timeout_type,
}
else:
raise NotImplementedError(
"HistoryEvent does not implement attributes for type '{0}'".format(self.event_type)
)

View File

@ -228,14 +228,21 @@ class WorkflowExecution(object):
self.start_timestamp = now_timestamp()
self._add_event(
"WorkflowExecutionStarted",
workflow_execution=self,
child_policy=self.child_policy,
execution_start_to_close_timeout=self.execution_start_to_close_timeout,
# TODO: fix this hardcoded value
parent_initiated_event_id=0,
task_list=self.task_list,
task_start_to_close_timeout=self.task_start_to_close_timeout,
workflow_type=self.workflow_type,
)
self.schedule_decision_task()
def _schedule_decision_task(self):
evt = self._add_event(
"DecisionTaskScheduled",
workflow_execution=self,
start_to_close_timeout=self.task_start_to_close_timeout,
task_list=self.task_list,
)
self.domain.add_to_decision_task_list(
self.task_list,
@ -274,7 +281,6 @@ class WorkflowExecution(object):
dt = self._find_decision_task(task_token)
evt = self._add_event(
"DecisionTaskStarted",
workflow_execution=self,
scheduled_event_id=dt.scheduled_event_id,
identity=identity
)
@ -419,6 +425,9 @@ class WorkflowExecution(object):
def schedule_activity_task(self, event_id, attributes):
# Helper function to avoid repeating ourselves in the next sections
def fail_schedule_activity_task(_type, _cause):
# TODO: implement other possible failure mode: OPEN_ACTIVITIES_LIMIT_EXCEEDED
# NB: some failure modes are not implemented and probably won't be implemented in
# the future, such as ACTIVITY_CREATION_RATE_EXCEEDED or OPERATION_NOT_PERMITTED
self._add_event(
"ScheduleActivityTaskFailed",
activity_id=attributes["activityId"],
@ -473,10 +482,17 @@ class WorkflowExecution(object):
# Only add event and increment counters now that nothing went wrong
evt = self._add_event(
"ActivityTaskScheduled",
decision_task_completed_event_id=event_id,
activity_id=attributes["activityId"],
activity_type=activity_type,
attributes=attributes,
control=attributes.get("control"),
decision_task_completed_event_id=event_id,
heartbeat_timeout=attributes.get("heartbeatTimeout"),
input=attributes.get("input"),
schedule_to_close_timeout=attributes.get("scheduleToCloseTimeout"),
schedule_to_start_timeout=attributes.get("scheduleToStartTimeout"),
start_to_close_timeout=attributes.get("startToCloseTimeout"),
task_list=task_list,
task_priority=attributes.get("taskPriority"),
)
task = ActivityTask(
activity_id=attributes["activityId"],

View File

@ -151,13 +151,13 @@ def test_workflow_execution_start_decision_task():
dt = wfe.decision_tasks[0]
dt.state.should.equal("STARTED")
wfe.events()[-1].event_type.should.equal("DecisionTaskStarted")
wfe.events()[-1].identity.should.equal("srv01")
wfe.events()[-1].event_attributes["identity"].should.equal("srv01")
def test_workflow_execution_history_events_ids():
wfe = make_workflow_execution()
wfe._add_event("WorkflowExecutionStarted", workflow_execution=wfe)
wfe._add_event("DecisionTaskScheduled", workflow_execution=wfe)
wfe._add_event("DecisionTaskStarted", workflow_execution=wfe, scheduled_event_id=2)
wfe._add_event("WorkflowExecutionStarted")
wfe._add_event("DecisionTaskScheduled")
wfe._add_event("DecisionTaskStarted")
ids = [evt.event_id for evt in wfe.events()]
ids.should.equal([1, 2, 3])
@ -181,8 +181,8 @@ def test_workflow_execution_complete():
wfe.close_status.should.equal("COMPLETED")
wfe.close_timestamp.should.equal(1420200000.0)
wfe.events()[-1].event_type.should.equal("WorkflowExecutionCompleted")
wfe.events()[-1].decision_task_completed_event_id.should.equal(123)
wfe.events()[-1].result.should.equal("foo")
wfe.events()[-1].event_attributes["decisionTaskCompletedEventId"].should.equal(123)
wfe.events()[-1].event_attributes["result"].should.equal("foo")
@freeze_time("2015-01-02 12:00:00")
def test_workflow_execution_fail():
@ -193,9 +193,9 @@ def test_workflow_execution_fail():
wfe.close_status.should.equal("FAILED")
wfe.close_timestamp.should.equal(1420200000.0)
wfe.events()[-1].event_type.should.equal("WorkflowExecutionFailed")
wfe.events()[-1].decision_task_completed_event_id.should.equal(123)
wfe.events()[-1].details.should.equal("some details")
wfe.events()[-1].reason.should.equal("my rules")
wfe.events()[-1].event_attributes["decisionTaskCompletedEventId"].should.equal(123)
wfe.events()[-1].event_attributes["details"].should.equal("some details")
wfe.events()[-1].event_attributes["reason"].should.equal("my rules")
@freeze_time("2015-01-01 12:00:00")
def test_workflow_execution_schedule_activity_task():
@ -209,8 +209,8 @@ def test_workflow_execution_schedule_activity_task():
wfe.open_counts["openActivityTasks"].should.equal(1)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ActivityTaskScheduled")
last_event.decision_task_completed_event_id.should.equal(123)
last_event.task_list.should.equal("task-list-name")
last_event.event_attributes["decisionTaskCompletedEventId"].should.equal(123)
last_event.event_attributes["taskList"]["name"].should.equal("task-list-name")
wfe.activity_tasks.should.have.length_of(1)
task = wfe.activity_tasks[0]
@ -235,7 +235,7 @@ def test_workflow_execution_schedule_activity_task_without_task_list_should_take
wfe.open_counts["openActivityTasks"].should.equal(1)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ActivityTaskScheduled")
last_event.task_list.should.equal("foobar")
last_event.event_attributes["taskList"]["name"].should.equal("foobar")
task = wfe.activity_tasks[0]
wfe.domain.activity_task_lists["foobar"].should.contain(task)
@ -255,43 +255,43 @@ def test_workflow_execution_schedule_activity_task_should_fail_if_wrong_attribut
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("ACTIVITY_TYPE_DOES_NOT_EXIST")
last_event.event_attributes["cause"].should.equal("ACTIVITY_TYPE_DOES_NOT_EXIST")
hsh["activityType"]["name"] = "test-activity"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("ACTIVITY_TYPE_DEPRECATED")
last_event.event_attributes["cause"].should.equal("ACTIVITY_TYPE_DEPRECATED")
hsh["activityType"]["version"] = "v1.2"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("DEFAULT_TASK_LIST_UNDEFINED")
last_event.event_attributes["cause"].should.equal("DEFAULT_TASK_LIST_UNDEFINED")
hsh["taskList"] = { "name": "foobar" }
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("DEFAULT_SCHEDULE_TO_START_TIMEOUT_UNDEFINED")
last_event.event_attributes["cause"].should.equal("DEFAULT_SCHEDULE_TO_START_TIMEOUT_UNDEFINED")
hsh["scheduleToStartTimeout"] = "600"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("DEFAULT_SCHEDULE_TO_CLOSE_TIMEOUT_UNDEFINED")
last_event.event_attributes["cause"].should.equal("DEFAULT_SCHEDULE_TO_CLOSE_TIMEOUT_UNDEFINED")
hsh["scheduleToCloseTimeout"] = "600"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("DEFAULT_START_TO_CLOSE_TIMEOUT_UNDEFINED")
last_event.event_attributes["cause"].should.equal("DEFAULT_START_TO_CLOSE_TIMEOUT_UNDEFINED")
hsh["startToCloseTimeout"] = "600"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("DEFAULT_HEARTBEAT_TIMEOUT_UNDEFINED")
last_event.event_attributes["cause"].should.equal("DEFAULT_HEARTBEAT_TIMEOUT_UNDEFINED")
wfe.open_counts["openActivityTasks"].should.equal(0)
wfe.activity_tasks.should.have.length_of(0)
@ -351,7 +351,7 @@ def test_workflow_execution_schedule_activity_task_with_same_activity_id():
wfe.open_counts["openActivityTasks"].should.equal(1)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.cause.should.equal("ACTIVITY_ID_ALREADY_IN_USE")
last_event.event_attributes["cause"].should.equal("ACTIVITY_ID_ALREADY_IN_USE")
def test_workflow_execution_start_activity_task():
wfe = make_workflow_execution()
@ -361,7 +361,7 @@ def test_workflow_execution_start_activity_task():
task = wfe.activity_tasks[-1]
task.state.should.equal("STARTED")
wfe.events()[-1].event_type.should.equal("ActivityTaskStarted")
wfe.events()[-1].identity.should.equal("worker01")
wfe.events()[-1].event_attributes["identity"].should.equal("worker01")
def test_complete_activity_task():
wfe = make_workflow_execution()
@ -395,7 +395,7 @@ def test_terminate():
last_event = wfe.events()[-1]
last_event.event_type.should.equal("WorkflowExecutionTerminated")
# take default child_policy if not provided (as here)
last_event.child_policy.should.equal("ABANDON")
last_event.event_attributes["childPolicy"].should.equal("ABANDON")
def test_first_timeout():
wfe = make_workflow_execution()