Implement FailWorkflowExecution decision

This commit is contained in:
Jean-Baptiste Barth 2015-10-24 04:35:21 +02:00
parent 6810973b76
commit 417f732b53
4 changed files with 101 additions and 5 deletions

View File

@ -72,6 +72,15 @@ class HistoryEvent(object):
if hasattr(self, "result") and self.result:
hsh["result"] = self.result
return hsh
elif self.event_type == "WorkflowExecutionFailed":
hsh = {
"decisionTaskCompletedEventId": self.decision_task_completed_event_id,
}
if hasattr(self, "details") and self.details:
hsh["details"] = self.details
if hasattr(self, "reason") and self.reason:
hsh["reason"] = self.reason
return hsh
else:
raise NotImplementedError(
"HistoryEvent does not implement attributes for type '{}'".format(self.event_type)

View File

@ -1,4 +1,6 @@
from __future__ import unicode_literals
from datetime import datetime
from time import mktime
import uuid
from moto.core.utils import camelcase_to_underscores
@ -38,11 +40,20 @@ class WorkflowExecution(object):
]
def __init__(self, workflow_type, workflow_id, **kwargs):
self.workflow_type = workflow_type
self.workflow_id = workflow_id
self.run_id = uuid.uuid4().hex
self.execution_status = "OPEN"
# WorkflowExecutionInfo
self.cancel_requested = False
# TODO: check valid values among:
# COMPLETED | FAILED | CANCELED | TERMINATED | CONTINUED_AS_NEW | TIMED_OUT
# TODO: implement them all
self.close_status = None
self.close_timestamp = None
self.execution_status = "OPEN"
self.parent = None
self.start_timestamp = None
self.tag_list = [] # TODO
self.workflow_type = workflow_type
# args processing
# NB: the order follows boto/SWF order of exceptions appearance (if no
# param is set, # SWF will raise DefaultUndefinedFault errors in the
@ -102,7 +113,7 @@ class WorkflowExecution(object):
"executionStatus": self.execution_status,
"cancelRequested": self.cancel_requested,
}
if hasattr(self, "tag_list"):
if hasattr(self, "tag_list") and self.tag_list:
hsh["tagList"] = self.tag_list
return hsh
@ -140,7 +151,12 @@ class WorkflowExecution(object):
self._events.append(evt)
return evt
# TODO: move it in utils
def _now_timestamp(self):
return float(mktime(datetime.now().timetuple()))
def start(self):
self.start_timestamp = self._now_timestamp()
self._add_event(
"WorkflowExecutionStarted",
workflow_execution=self,
@ -274,11 +290,12 @@ class WorkflowExecution(object):
attributes = decision.get(attributes_key, {})
if decision_type == "CompleteWorkflowExecution":
self.complete(event_id, attributes.get("result"))
elif decision_type == "FailWorkflowExecution":
self.fail(event_id, attributes.get("details"), attributes.get("reason"))
else:
# TODO: implement Decision type: CancelTimer
# TODO: implement Decision type: CancelWorkflowExecution
# TODO: implement Decision type: ContinueAsNewWorkflowExecution
# TODO: implement Decision type: FailWorkflowExecution
# TODO: implement Decision type: RecordMarker
# TODO: implement Decision type: RequestCancelActivityTask
# TODO: implement Decision type: RequestCancelExternalWorkflowExecution
@ -291,8 +308,22 @@ class WorkflowExecution(object):
def complete(self, event_id, result=None):
self.execution_status = "CLOSED"
self.close_status = "COMPLETED"
self.close_timestamp = self._now_timestamp()
evt = self._add_event(
"WorkflowExecutionCompleted",
decision_task_completed_event_id=event_id,
result=result,
)
def fail(self, event_id, details=None, reason=None):
# TODO: implement lenght constraints on details/reason
self.execution_status = "CLOSED"
self.close_status = "FAILED"
self.close_timestamp = self._now_timestamp()
evt = self._add_event(
"WorkflowExecutionFailed",
decision_task_completed_event_id=event_id,
details=details,
reason=reason,
)

View File

@ -144,7 +144,7 @@ def test_respond_decision_task_completed_with_complete_workflow_execution():
decisions = [{
"decisionType": "CompleteWorkflowExecution",
"completeWorkflowExecutionEventAttributes": {}
"completeWorkflowExecutionEventAttributes": {"result": "foo bar"}
}]
resp = conn.respond_decision_task_completed(task_token, decisions=decisions)
resp.should.be.none
@ -158,6 +158,7 @@ def test_respond_decision_task_completed_with_complete_workflow_execution():
"DecisionTaskCompleted",
"WorkflowExecutionCompleted",
])
resp["events"][-1]["workflowExecutionCompletedEventAttributes"]["result"].should.equal("foo bar")
@mock_swf
def test_respond_decision_task_completed_with_close_decision_not_last():
@ -230,3 +231,29 @@ def test_respond_decision_task_completed_with_missing_attributes_totally():
r"Value null at 'decisions.1.member.startTimerDecisionAttributes.timerId' " \
r"failed to satisfy constraint: Member must not be null"
)
@mock_swf
def test_respond_decision_task_completed_with_fail_workflow_execution():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
task_token = resp["taskToken"]
decisions = [{
"decisionType": "FailWorkflowExecution",
"failWorkflowExecutionEventAttributes": {"reason": "my rules", "details": "foo"}
}]
resp = conn.respond_decision_task_completed(task_token, decisions=decisions)
resp.should.be.none
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal([
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"DecisionTaskCompleted",
"WorkflowExecutionFailed",
])
attrs = resp["events"][-1]["workflowExecutionFailedEventAttributes"]
attrs["reason"].should.equal("my rules")
attrs["details"].should.equal("foo")

View File

@ -209,16 +209,45 @@ def test_workflow_execution_history_events_ids():
ids = [evt.event_id for evt in wfe.events()]
ids.should.equal([1, 2, 3])
@freeze_time("2015-01-01 12:00:00")
def test_workflow_execution_start():
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234")
wfe.events().should.equal([])
wfe.start()
wfe.start_timestamp.should.equal(1420110000.0)
wfe.events().should.have.length_of(2)
wfe.events()[0].event_type.should.equal("WorkflowExecutionStarted")
wfe.events()[1].event_type.should.equal("DecisionTaskScheduled")
@freeze_time("2015-01-02 12:00:00")
def test_workflow_execution_complete():
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234")
wfe.complete(123, result="foo")
wfe.execution_status.should.equal("CLOSED")
wfe.close_status.should.equal("COMPLETED")
wfe.close_timestamp.should.equal(1420196400.0)
wfe.events()[-1].event_type.should.equal("WorkflowExecutionCompleted")
wfe.events()[-1].decision_task_completed_event_id.should.equal(123)
wfe.events()[-1].result.should.equal("foo")
@freeze_time("2015-01-02 12:00:00")
def test_workflow_execution_fail():
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234")
wfe.fail(123, details="some details", reason="my rules")
wfe.execution_status.should.equal("CLOSED")
wfe.close_status.should.equal("FAILED")
wfe.close_timestamp.should.equal(1420196400.0)
wfe.events()[-1].event_type.should.equal("WorkflowExecutionFailed")
wfe.events()[-1].decision_task_completed_event_id.should.equal(123)
wfe.events()[-1].details.should.equal("some details")
wfe.events()[-1].reason.should.equal("my rules")
# HistoryEvent
@freeze_time("2015-01-01 12:00:00")