Implement CompleteWorkflowExecution decision

This commit is contained in:
Jean-Baptiste Barth 2015-10-12 23:32:11 +02:00
parent d97c770849
commit 381eb5eb0f
4 changed files with 70 additions and 16 deletions

View File

@ -64,6 +64,13 @@ class HistoryEvent(object):
if hasattr(self, "execution_context") and self.execution_context:
hsh["executionContext"] = self.execution_context
return hsh
elif self.event_type == "WorkflowExecutionCompleted":
hsh = {
"decisionTaskCompletedEventId": self.decision_task_completed_event_id,
}
if hasattr(self, "result") and self.result:
hsh["result"] = self.result
return hsh
else:
raise NotImplementedError(
"HistoryEvent does not implement attributes for type '{}'".format(self.event_type)

View File

@ -162,9 +162,9 @@ class WorkflowExecution(object):
execution_context=execution_context,
)
dt.complete()
self.handle_decisions(decisions)
self.handle_decisions(evt.event_id, decisions)
def handle_decisions(self, decisions):
def handle_decisions(self, event_id, decisions):
"""
Handles a Decision according to SWF docs.
See: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_Decision.html
@ -176,17 +176,31 @@ class WorkflowExecution(object):
# handle each decision separately, in order
for decision in decisions:
decision_type = decision["decisionType"]
# TODO: implement Decision type: CancelTimer
# TODO: implement Decision type: CancelWorkflowExecution
# TODO: implement Decision type: CompleteWorkflowExecution
# TODO: implement Decision type: ContinueAsNewWorkflowExecution
# TODO: implement Decision type: FailWorkflowExecution
# TODO: implement Decision type: RecordMarker
# TODO: implement Decision type: RequestCancelActivityTask
# TODO: implement Decision type: RequestCancelExternalWorkflowExecution
# TODO: implement Decision type: ScheduleActivityTask
# TODO: implement Decision type: ScheduleLambdaFunction
# TODO: implement Decision type: SignalExternalWorkflowExecution
# TODO: implement Decision type: StartChildWorkflowExecution
# TODO: implement Decision type: StartTimer
raise NotImplementedError("Cannot handle decision: {}".format(decision_type))
attributes_key = "{}{}EventAttributes".format(
decision_type[0].lower(), decision_type[1:]
)
attributes = decision.get(attributes_key, {})
if decision_type == "CompleteWorkflowExecution":
self.complete(event_id, attributes.get("result"))
else:
# TODO: implement Decision type: CancelTimer
# TODO: implement Decision type: CancelWorkflowExecution
# TODO: implement Decision type: ContinueAsNewWorkflowExecution
# TODO: implement Decision type: FailWorkflowExecution
# TODO: implement Decision type: RecordMarker
# TODO: implement Decision type: RequestCancelActivityTask
# TODO: implement Decision type: RequestCancelExternalWorkflowExecution
# TODO: implement Decision type: ScheduleActivityTask
# TODO: implement Decision type: ScheduleLambdaFunction
# TODO: implement Decision type: SignalExternalWorkflowExecution
# TODO: implement Decision type: StartChildWorkflowExecution
# TODO: implement Decision type: StartTimer
raise NotImplementedError("Cannot handle decision: {}".format(decision_type))
def complete(self, event_id, result=None):
self.execution_status = "CLOSED"
evt = self._add_event(
"WorkflowExecutionCompleted",
decision_task_completed_event_id=event_id,
result=result,
)

View File

@ -134,3 +134,26 @@ def test_respond_decision_task_completed_with_task_already_completed():
conn.respond_decision_task_completed.when.called_with(
task_token
).should.throw(SWFUnknownResourceFault)
@mock_swf
def test_respond_decision_task_completed_with_complete_workflow_execution():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
task_token = resp["taskToken"]
decisions = [{
"decisionType": "CompleteWorkflowExecution",
"completeWorkflowExecutionEventAttributes": {}
}]
resp = conn.respond_decision_task_completed(task_token, decisions=decisions)
resp.should.be.none
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal([
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"DecisionTaskCompleted",
"WorkflowExecutionCompleted",
])

View File

@ -209,6 +209,16 @@ def test_workflow_execution_history_events_ids():
ids = [evt.event_id for evt in wfe.events()]
ids.should.equal([1, 2, 3])
def test_workflow_execution_complete():
wft = get_basic_workflow_type()
wfe = WorkflowExecution(wft, "ab1234")
wfe.complete(123, result="foo")
wfe.execution_status.should.equal("CLOSED")
wfe.events()[-1].event_type.should.equal("WorkflowExecutionCompleted")
wfe.events()[-1].decision_task_completed_event_id.should.equal(123)
wfe.events()[-1].result.should.equal("foo")
# HistoryEvent
@freeze_time("2015-01-01 12:00:00")