diff --git a/moto/swf/models/history_event.py b/moto/swf/models/history_event.py index 6e4002345..c602abee8 100644 --- a/moto/swf/models/history_event.py +++ b/moto/swf/models/history_event.py @@ -2,168 +2,65 @@ from __future__ import unicode_literals from datetime import datetime from time import mktime +from moto.core.utils import underscores_to_camelcase + from ..utils import decapitalize, now_timestamp +# We keep track of which history event types we support +# so that we'll be able to catch specific formatting +# for new events if needed. +SUPPORTED_HISTORY_EVENT_TYPES = ( + "WorkflowExecutionStarted", + "DecisionTaskScheduled", + "DecisionTaskStarted", + "DecisionTaskCompleted", + "WorkflowExecutionCompleted", + "WorkflowExecutionFailed", + "ActivityTaskScheduled", + "ScheduleActivityTaskFailed", + "ActivityTaskStarted", + "ActivityTaskCompleted", + "ActivityTaskFailed", + "WorkflowExecutionTerminated", + "ActivityTaskTimedOut", + "DecisionTaskTimedOut", + "WorkflowExecutionTimedOut", +) + class HistoryEvent(object): - def __init__(self, event_id, event_type, **kwargs): + def __init__(self, event_id, event_type, event_timestamp=None, **kwargs): + if event_type not in SUPPORTED_HISTORY_EVENT_TYPES: + raise NotImplementedError( + "HistoryEvent does not implement attributes for type '{0}'".format(event_type) + ) self.event_id = event_id self.event_type = event_type - self.event_timestamp = now_timestamp() + if event_timestamp: + self.event_timestamp = event_timestamp + else: + self.event_timestamp = now_timestamp() + # pre-populate a dict: {"camelCaseKey": value} + self.event_attributes = {} for key, value in kwargs.items(): - self.__setattr__(key, value) - # break soon if attributes are not valid - self.event_attributes() + if value: + camel_key = underscores_to_camelcase(key) + if key == "task_list": + value = { "name": value } + elif key == "workflow_type": + value = { "name": value.name, "version": value.version } + elif key == "activity_type": + value = value.to_short_dict() + self.event_attributes[camel_key] = value def to_dict(self): return { "eventId": self.event_id, "eventType": self.event_type, "eventTimestamp": self.event_timestamp, - self._attributes_key(): self.event_attributes() + self._attributes_key(): self.event_attributes } def _attributes_key(self): key = "{0}EventAttributes".format(self.event_type) return decapitalize(key) - - def event_attributes(self): - if self.event_type == "WorkflowExecutionStarted": - wfe = self.workflow_execution - hsh = { - "childPolicy": wfe.child_policy, - "executionStartToCloseTimeout": wfe.execution_start_to_close_timeout, - "parentInitiatedEventId": 0, - "taskList": { - "name": wfe.task_list - }, - "taskStartToCloseTimeout": wfe.task_start_to_close_timeout, - "workflowType": { - "name": wfe.workflow_type.name, - "version": wfe.workflow_type.version - } - } - return hsh - elif self.event_type == "DecisionTaskScheduled": - wfe = self.workflow_execution - return { - "startToCloseTimeout": wfe.task_start_to_close_timeout, - "taskList": {"name": wfe.task_list} - } - elif self.event_type == "DecisionTaskStarted": - hsh = { - "scheduledEventId": self.scheduled_event_id - } - if hasattr(self, "identity") and self.identity: - hsh["identity"] = self.identity - return hsh - elif self.event_type == "DecisionTaskCompleted": - hsh = { - "scheduledEventId": self.scheduled_event_id, - "startedEventId": self.started_event_id, - } - if hasattr(self, "execution_context") and self.execution_context: - hsh["executionContext"] = self.execution_context - return hsh - elif self.event_type == "WorkflowExecutionCompleted": - hsh = { - "decisionTaskCompletedEventId": self.decision_task_completed_event_id, - } - if hasattr(self, "result") and self.result: - hsh["result"] = self.result - return hsh - elif self.event_type == "WorkflowExecutionFailed": - hsh = { - "decisionTaskCompletedEventId": self.decision_task_completed_event_id, - } - if hasattr(self, "details") and self.details: - hsh["details"] = self.details - if hasattr(self, "reason") and self.reason: - hsh["reason"] = self.reason - return hsh - elif self.event_type == "ActivityTaskScheduled": - hsh = { - "activityId": self.attributes["activityId"], - "activityType": self.activity_type.to_short_dict(), - "decisionTaskCompletedEventId": self.decision_task_completed_event_id, - "taskList": { - "name": self.task_list, - }, - } - for attr in ["control", "heartbeatTimeout", "input", "scheduleToCloseTimeout", - "scheduleToStartTimeout", "startToCloseTimeout", "taskPriority"]: - if self.attributes.get(attr): - hsh[attr] = self.attributes[attr] - return hsh - elif self.event_type == "ScheduleActivityTaskFailed": - # TODO: implement other possible failure mode: OPEN_ACTIVITIES_LIMIT_EXCEEDED - # NB: some failure modes are not implemented and probably won't be implemented in the - # future, such as ACTIVITY_CREATION_RATE_EXCEEDED or OPERATION_NOT_PERMITTED - return { - "activityId": self.activity_id, - "activityType": self.activity_type.to_short_dict(), - "cause": self.cause, - "decisionTaskCompletedEventId": self.decision_task_completed_event_id, - } - elif self.event_type == "ActivityTaskStarted": - # TODO: merge it with DecisionTaskStarted - hsh = { - "scheduledEventId": self.scheduled_event_id - } - if hasattr(self, "identity") and self.identity: - hsh["identity"] = self.identity - return hsh - elif self.event_type == "ActivityTaskCompleted": - hsh = { - "scheduledEventId": self.scheduled_event_id, - "startedEventId": self.started_event_id, - } - if hasattr(self, "result") and self.result is not None: - hsh["result"] = self.result - return hsh - elif self.event_type == "ActivityTaskFailed": - # TODO: maybe merge it with ActivityTaskCompleted (different optional params tho) - hsh = { - "scheduledEventId": self.scheduled_event_id, - "startedEventId": self.started_event_id, - } - if hasattr(self, "reason") and self.reason is not None: - hsh["reason"] = self.reason - if hasattr(self, "details") and self.details is not None: - hsh["details"] = self.details - return hsh - elif self.event_type == "WorkflowExecutionTerminated": - hsh = { - "childPolicy": self.child_policy, - } - if self.cause: - hsh["cause"] = self.cause - if self.details: - hsh["details"] = self.details - if self.reason: - hsh["reason"] = self.reason - return hsh - elif self.event_type == "ActivityTaskTimedOut": - hsh = { - "scheduledEventId": self.scheduled_event_id, - "startedEventId": self.started_event_id, - "timeoutType": self.timeout_type, - } - if self.details: - hsh["details"] = self.details - return hsh - elif self.event_type == "DecisionTaskTimedOut": - return { - "scheduledEventId": self.scheduled_event_id, - "startedEventId": self.started_event_id, - "timeoutType": self.timeout_type, - } - elif self.event_type == "WorkflowExecutionTimedOut": - return { - "childPolicy": self.child_policy, - "timeoutType": self.timeout_type, - } - else: - raise NotImplementedError( - "HistoryEvent does not implement attributes for type '{0}'".format(self.event_type) - ) diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index 67601438b..aa08d8e9f 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -228,14 +228,21 @@ class WorkflowExecution(object): self.start_timestamp = now_timestamp() self._add_event( "WorkflowExecutionStarted", - workflow_execution=self, + child_policy=self.child_policy, + execution_start_to_close_timeout=self.execution_start_to_close_timeout, + # TODO: fix this hardcoded value + parent_initiated_event_id=0, + task_list=self.task_list, + task_start_to_close_timeout=self.task_start_to_close_timeout, + workflow_type=self.workflow_type, ) self.schedule_decision_task() def _schedule_decision_task(self): evt = self._add_event( "DecisionTaskScheduled", - workflow_execution=self, + start_to_close_timeout=self.task_start_to_close_timeout, + task_list=self.task_list, ) self.domain.add_to_decision_task_list( self.task_list, @@ -274,7 +281,6 @@ class WorkflowExecution(object): dt = self._find_decision_task(task_token) evt = self._add_event( "DecisionTaskStarted", - workflow_execution=self, scheduled_event_id=dt.scheduled_event_id, identity=identity ) @@ -419,6 +425,9 @@ class WorkflowExecution(object): def schedule_activity_task(self, event_id, attributes): # Helper function to avoid repeating ourselves in the next sections def fail_schedule_activity_task(_type, _cause): + # TODO: implement other possible failure mode: OPEN_ACTIVITIES_LIMIT_EXCEEDED + # NB: some failure modes are not implemented and probably won't be implemented in + # the future, such as ACTIVITY_CREATION_RATE_EXCEEDED or OPERATION_NOT_PERMITTED self._add_event( "ScheduleActivityTaskFailed", activity_id=attributes["activityId"], @@ -473,10 +482,17 @@ class WorkflowExecution(object): # Only add event and increment counters now that nothing went wrong evt = self._add_event( "ActivityTaskScheduled", - decision_task_completed_event_id=event_id, + activity_id=attributes["activityId"], activity_type=activity_type, - attributes=attributes, + control=attributes.get("control"), + decision_task_completed_event_id=event_id, + heartbeat_timeout=attributes.get("heartbeatTimeout"), + input=attributes.get("input"), + schedule_to_close_timeout=attributes.get("scheduleToCloseTimeout"), + schedule_to_start_timeout=attributes.get("scheduleToStartTimeout"), + start_to_close_timeout=attributes.get("startToCloseTimeout"), task_list=task_list, + task_priority=attributes.get("taskPriority"), ) task = ActivityTask( activity_id=attributes["activityId"], diff --git a/tests/test_swf/models/test_workflow_execution.py b/tests/test_swf/models/test_workflow_execution.py index 8a010eb42..0546c2f93 100644 --- a/tests/test_swf/models/test_workflow_execution.py +++ b/tests/test_swf/models/test_workflow_execution.py @@ -151,13 +151,13 @@ def test_workflow_execution_start_decision_task(): dt = wfe.decision_tasks[0] dt.state.should.equal("STARTED") wfe.events()[-1].event_type.should.equal("DecisionTaskStarted") - wfe.events()[-1].identity.should.equal("srv01") + wfe.events()[-1].event_attributes["identity"].should.equal("srv01") def test_workflow_execution_history_events_ids(): wfe = make_workflow_execution() - wfe._add_event("WorkflowExecutionStarted", workflow_execution=wfe) - wfe._add_event("DecisionTaskScheduled", workflow_execution=wfe) - wfe._add_event("DecisionTaskStarted", workflow_execution=wfe, scheduled_event_id=2) + wfe._add_event("WorkflowExecutionStarted") + wfe._add_event("DecisionTaskScheduled") + wfe._add_event("DecisionTaskStarted") ids = [evt.event_id for evt in wfe.events()] ids.should.equal([1, 2, 3]) @@ -181,8 +181,8 @@ def test_workflow_execution_complete(): wfe.close_status.should.equal("COMPLETED") wfe.close_timestamp.should.equal(1420200000.0) wfe.events()[-1].event_type.should.equal("WorkflowExecutionCompleted") - wfe.events()[-1].decision_task_completed_event_id.should.equal(123) - wfe.events()[-1].result.should.equal("foo") + wfe.events()[-1].event_attributes["decisionTaskCompletedEventId"].should.equal(123) + wfe.events()[-1].event_attributes["result"].should.equal("foo") @freeze_time("2015-01-02 12:00:00") def test_workflow_execution_fail(): @@ -193,9 +193,9 @@ def test_workflow_execution_fail(): wfe.close_status.should.equal("FAILED") wfe.close_timestamp.should.equal(1420200000.0) wfe.events()[-1].event_type.should.equal("WorkflowExecutionFailed") - wfe.events()[-1].decision_task_completed_event_id.should.equal(123) - wfe.events()[-1].details.should.equal("some details") - wfe.events()[-1].reason.should.equal("my rules") + wfe.events()[-1].event_attributes["decisionTaskCompletedEventId"].should.equal(123) + wfe.events()[-1].event_attributes["details"].should.equal("some details") + wfe.events()[-1].event_attributes["reason"].should.equal("my rules") @freeze_time("2015-01-01 12:00:00") def test_workflow_execution_schedule_activity_task(): @@ -209,8 +209,8 @@ def test_workflow_execution_schedule_activity_task(): wfe.open_counts["openActivityTasks"].should.equal(1) last_event = wfe.events()[-1] last_event.event_type.should.equal("ActivityTaskScheduled") - last_event.decision_task_completed_event_id.should.equal(123) - last_event.task_list.should.equal("task-list-name") + last_event.event_attributes["decisionTaskCompletedEventId"].should.equal(123) + last_event.event_attributes["taskList"]["name"].should.equal("task-list-name") wfe.activity_tasks.should.have.length_of(1) task = wfe.activity_tasks[0] @@ -235,7 +235,7 @@ def test_workflow_execution_schedule_activity_task_without_task_list_should_take wfe.open_counts["openActivityTasks"].should.equal(1) last_event = wfe.events()[-1] last_event.event_type.should.equal("ActivityTaskScheduled") - last_event.task_list.should.equal("foobar") + last_event.event_attributes["taskList"]["name"].should.equal("foobar") task = wfe.activity_tasks[0] wfe.domain.activity_task_lists["foobar"].should.contain(task) @@ -255,43 +255,43 @@ def test_workflow_execution_schedule_activity_task_should_fail_if_wrong_attribut wfe.schedule_activity_task(123, hsh) last_event = wfe.events()[-1] last_event.event_type.should.equal("ScheduleActivityTaskFailed") - last_event.cause.should.equal("ACTIVITY_TYPE_DOES_NOT_EXIST") + last_event.event_attributes["cause"].should.equal("ACTIVITY_TYPE_DOES_NOT_EXIST") hsh["activityType"]["name"] = "test-activity" wfe.schedule_activity_task(123, hsh) last_event = wfe.events()[-1] last_event.event_type.should.equal("ScheduleActivityTaskFailed") - last_event.cause.should.equal("ACTIVITY_TYPE_DEPRECATED") + last_event.event_attributes["cause"].should.equal("ACTIVITY_TYPE_DEPRECATED") hsh["activityType"]["version"] = "v1.2" wfe.schedule_activity_task(123, hsh) last_event = wfe.events()[-1] last_event.event_type.should.equal("ScheduleActivityTaskFailed") - last_event.cause.should.equal("DEFAULT_TASK_LIST_UNDEFINED") + last_event.event_attributes["cause"].should.equal("DEFAULT_TASK_LIST_UNDEFINED") hsh["taskList"] = { "name": "foobar" } wfe.schedule_activity_task(123, hsh) last_event = wfe.events()[-1] last_event.event_type.should.equal("ScheduleActivityTaskFailed") - last_event.cause.should.equal("DEFAULT_SCHEDULE_TO_START_TIMEOUT_UNDEFINED") + last_event.event_attributes["cause"].should.equal("DEFAULT_SCHEDULE_TO_START_TIMEOUT_UNDEFINED") hsh["scheduleToStartTimeout"] = "600" wfe.schedule_activity_task(123, hsh) last_event = wfe.events()[-1] last_event.event_type.should.equal("ScheduleActivityTaskFailed") - last_event.cause.should.equal("DEFAULT_SCHEDULE_TO_CLOSE_TIMEOUT_UNDEFINED") + last_event.event_attributes["cause"].should.equal("DEFAULT_SCHEDULE_TO_CLOSE_TIMEOUT_UNDEFINED") hsh["scheduleToCloseTimeout"] = "600" wfe.schedule_activity_task(123, hsh) last_event = wfe.events()[-1] last_event.event_type.should.equal("ScheduleActivityTaskFailed") - last_event.cause.should.equal("DEFAULT_START_TO_CLOSE_TIMEOUT_UNDEFINED") + last_event.event_attributes["cause"].should.equal("DEFAULT_START_TO_CLOSE_TIMEOUT_UNDEFINED") hsh["startToCloseTimeout"] = "600" wfe.schedule_activity_task(123, hsh) last_event = wfe.events()[-1] last_event.event_type.should.equal("ScheduleActivityTaskFailed") - last_event.cause.should.equal("DEFAULT_HEARTBEAT_TIMEOUT_UNDEFINED") + last_event.event_attributes["cause"].should.equal("DEFAULT_HEARTBEAT_TIMEOUT_UNDEFINED") wfe.open_counts["openActivityTasks"].should.equal(0) wfe.activity_tasks.should.have.length_of(0) @@ -351,7 +351,7 @@ def test_workflow_execution_schedule_activity_task_with_same_activity_id(): wfe.open_counts["openActivityTasks"].should.equal(1) last_event = wfe.events()[-1] last_event.event_type.should.equal("ScheduleActivityTaskFailed") - last_event.cause.should.equal("ACTIVITY_ID_ALREADY_IN_USE") + last_event.event_attributes["cause"].should.equal("ACTIVITY_ID_ALREADY_IN_USE") def test_workflow_execution_start_activity_task(): wfe = make_workflow_execution() @@ -361,7 +361,7 @@ def test_workflow_execution_start_activity_task(): task = wfe.activity_tasks[-1] task.state.should.equal("STARTED") wfe.events()[-1].event_type.should.equal("ActivityTaskStarted") - wfe.events()[-1].identity.should.equal("worker01") + wfe.events()[-1].event_attributes["identity"].should.equal("worker01") def test_complete_activity_task(): wfe = make_workflow_execution() @@ -395,7 +395,7 @@ def test_terminate(): last_event = wfe.events()[-1] last_event.event_type.should.equal("WorkflowExecutionTerminated") # take default child_policy if not provided (as here) - last_event.child_policy.should.equal("ABANDON") + last_event.event_attributes["childPolicy"].should.equal("ABANDON") def test_first_timeout(): wfe = make_workflow_execution()