diff --git a/moto/swf/models/__init__.py b/moto/swf/models/__init__.py index a206e3e78..27f56a711 100644 --- a/moto/swf/models/__init__.py +++ b/moto/swf/models/__init__.py @@ -160,7 +160,7 @@ class SWFBackend(BaseBackend): self._check_string(run_id) self._check_string(workflow_id) domain = self._get_domain(domain_name) - return domain.get_workflow_execution(run_id, workflow_id) + return domain.get_workflow_execution(workflow_id, run_id=run_id) def poll_for_decision_task(self, domain_name, task_list, identity=None): self._check_string(domain_name) @@ -198,7 +198,7 @@ class SWFBackend(BaseBackend): self._check_string(task_list) domain = self._get_domain(domain_name) count = 0 - for _, wfe in domain.workflow_executions.iteritems(): + for wfe in domain.workflow_executions: if wfe.task_list == task_list: count += wfe.open_counts["openDecisionTasks"] return count @@ -211,7 +211,7 @@ class SWFBackend(BaseBackend): # let's find decision task decision_task = None for domain in self.domains: - for _, wfe in domain.workflow_executions.iteritems(): + for wfe in domain.workflow_executions: for dt in wfe.decision_tasks: if dt.task_token == task_token: decision_task = dt @@ -301,7 +301,7 @@ class SWFBackend(BaseBackend): def _find_activity_task_from_token(self, task_token): activity_task = None for domain in self.domains: - for _, wfe in domain.workflow_executions.iteritems(): + for wfe in domain.workflow_executions: for task in wfe.activity_tasks: if task.task_token == task_token: activity_task = task @@ -352,6 +352,18 @@ class SWFBackend(BaseBackend): wfe = activity_task.workflow_execution wfe.fail_activity_task(activity_task.task_token, reason=reason, details=details) + def terminate_workflow_execution(self, domain_name, workflow_id, child_policy=None, + details=None, reason=None, run_id=None): + self._check_string(domain_name) + self._check_string(workflow_id) + self._check_none_or_string(child_policy) + self._check_none_or_string(details) + self._check_none_or_string(reason) + self._check_none_or_string(run_id) + domain = self._get_domain(domain_name) + wfe = domain.get_workflow_execution(workflow_id, run_id=run_id, raise_if_closed=True) + wfe.terminate(child_policy=child_policy, details=details, reason=reason) + swf_backends = {} for region in boto.swf.regions(): diff --git a/moto/swf/models/domain.py b/moto/swf/models/domain.py index d04a41841..9caf307f4 100644 --- a/moto/swf/models/domain.py +++ b/moto/swf/models/domain.py @@ -22,7 +22,7 @@ class Domain(object): # that against SWF API) ; hence the storage method as a dict # of "workflow_id (client determined)" => WorkflowExecution() # here. - self.workflow_executions = {} + self.workflow_executions = [] self.activity_task_lists = {} self.decision_task_lists = {} @@ -71,18 +71,32 @@ class Domain(object): def add_workflow_execution(self, workflow_execution): _id = workflow_execution.workflow_id - if self.workflow_executions.get(_id): + # TODO: handle this better: this should raise ONLY if there's an OPEN wfe with this ID + if any(wfe.workflow_id == _id for wfe in self.workflow_executions): raise SWFWorkflowExecutionAlreadyStartedFault() - self.workflow_executions[_id] = workflow_execution + self.workflow_executions.append(workflow_execution) - def get_workflow_execution(self, run_id, workflow_id): - wfe = self.workflow_executions.get(workflow_id) - if not wfe or wfe.run_id != run_id: - raise SWFUnknownResourceFault( - "execution", - "WorkflowExecution=[workflowId={}, runId={}]".format( - workflow_id, run_id + def get_workflow_execution(self, workflow_id, run_id=None, raise_if_closed=False): + if run_id: + _all = [w for w in self.workflow_executions + if w.workflow_id == workflow_id and w.run_id == run_id] + else: + _all = [w for w in self.workflow_executions + if w.workflow_id == workflow_id and w.execution_status == "OPEN"] + wfe = _all[0] if _all else None + if raise_if_closed and wfe and wfe.execution_status == "CLOSED": + wfe = None + if run_id: + if not wfe or wfe.run_id != run_id: + raise SWFUnknownResourceFault( + "execution", + "WorkflowExecution=[workflowId={}, runId={}]".format( + workflow_id, run_id + ) ) + elif not wfe: + raise SWFUnknownResourceFault( + "execution, workflowId = {}".format(workflow_id) ) return wfe diff --git a/moto/swf/models/history_event.py b/moto/swf/models/history_event.py index 798cc810c..eb3c1f795 100644 --- a/moto/swf/models/history_event.py +++ b/moto/swf/models/history_event.py @@ -132,6 +132,17 @@ class HistoryEvent(object): if hasattr(self, "details") and self.details is not None: hsh["details"] = self.details return hsh + elif self.event_type == "WorkflowExecutionTerminated": + hsh = { + "childPolicy": self.child_policy, + } + if self.cause: + hsh["cause"] = self.cause + if self.details: + hsh["details"] = self.details + if self.reason: + hsh["reason"] = self.reason + return hsh else: raise NotImplementedError( "HistoryEvent does not implement attributes for type '{}'".format(self.event_type) diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index 40efab156..10d134dca 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -50,6 +50,7 @@ class WorkflowExecution(object): # TODO: check valid values among: # COMPLETED | FAILED | CANCELED | TERMINATED | CONTINUED_AS_NEW | TIMED_OUT # TODO: implement them all + self.close_cause = None self.close_status = None self.close_timestamp = None self.execution_status = "OPEN" @@ -467,3 +468,21 @@ class WorkflowExecution(object): self.open_counts["openActivityTasks"] -= 1 # TODO: ensure we don't schedule multiple decisions at the same time! self.schedule_decision_task() + + def terminate(self, child_policy=None, details=None, reason=None): + # TODO: handle child policy for child workflows here + # TODO: handle cause="CHILD_POLICY_APPLIED" + # Until this, we set cause manually to "OPERATOR_INITIATED" + cause = "OPERATOR_INITIATED" + if not child_policy: + child_policy = self.child_policy + self._add_event( + "WorkflowExecutionTerminated", + cause=cause, + child_policy=child_policy, + details=details, + reason=reason, + ) + self.execution_status = "CLOSED" + self.close_status = "TERMINATED" + self.close_cause = "OPERATOR_INITIATED" diff --git a/moto/swf/responses.py b/moto/swf/responses.py index c90fd5d8d..ffadc73f2 100644 --- a/moto/swf/responses.py +++ b/moto/swf/responses.py @@ -287,3 +287,16 @@ class SWFResponse(BaseResponse): task_token, reason=reason, details=details ) return "" + + def terminate_workflow_execution(self): + domain_name = self._params["domain"] + workflow_id = self._params["workflowId"] + child_policy = self._params.get("childPolicy") + details = self._params.get("details") + reason = self._params.get("reason") + run_id = self._params.get("runId") + self.swf_backend.terminate_workflow_execution( + domain_name, workflow_id, child_policy=child_policy, + details=details, reason=reason, run_id=run_id + ) + return "" diff --git a/tests/test_swf/models/test_domain.py b/tests/test_swf/models/test_domain.py index 5d2982e10..215cbace0 100644 --- a/tests/test_swf/models/test_domain.py +++ b/tests/test_swf/models/test_domain.py @@ -1,8 +1,17 @@ +from collections import namedtuple from sure import expect +from moto.swf.exceptions import SWFUnknownResourceFault from moto.swf.models import Domain +# Fake WorkflowExecution for tests purposes +WorkflowExecution = namedtuple( + "WorkflowExecution", + ["workflow_id", "run_id", "execution_status"] +) + + def test_domain_short_dict_representation(): domain = Domain("foo", "52") domain.to_short_dict().should.equal({"name":"foo", "status":"REGISTERED"}) @@ -46,3 +55,45 @@ def test_domain_decision_tasks(): domain.add_to_decision_task_list("foo", "bar") domain.add_to_decision_task_list("other", "baz") domain.decision_tasks.should.equal(["bar", "baz"]) + +def test_domain_get_workflow_execution(): + domain = Domain("my-domain", "60") + + wfe1 = WorkflowExecution(workflow_id="wf-id-1", run_id="run-id-1", execution_status="OPEN") + wfe2 = WorkflowExecution(workflow_id="wf-id-1", run_id="run-id-2", execution_status="CLOSED") + wfe3 = WorkflowExecution(workflow_id="wf-id-2", run_id="run-id-3", execution_status="OPEN") + wfe4 = WorkflowExecution(workflow_id="wf-id-3", run_id="run-id-4", execution_status="CLOSED") + domain.workflow_executions = [wfe1, wfe2, wfe3, wfe4] + + # get workflow execution through workflow_id and run_id + domain.get_workflow_execution("wf-id-1", run_id="run-id-1").should.equal(wfe1) + domain.get_workflow_execution("wf-id-1", run_id="run-id-2").should.equal(wfe2) + domain.get_workflow_execution("wf-id-3", run_id="run-id-4").should.equal(wfe4) + domain.get_workflow_execution.when.called_with( + "wf-id-1", run_id="non-existent" + ).should.throw( + SWFUnknownResourceFault, + "Unknown execution: WorkflowExecution=[workflowId=wf-id-1, runId=non-existent]" + ) + + # get OPEN workflow execution by default if no run_id + domain.get_workflow_execution("wf-id-1").should.equal(wfe1) + domain.get_workflow_execution.when.called_with( + "wf-id-3" + ).should.throw( + SWFUnknownResourceFault, "Unknown execution, workflowId = wf-id-3" + ) + domain.get_workflow_execution.when.called_with( + "wf-id-non-existent" + ).should.throw( + SWFUnknownResourceFault, "Unknown execution, workflowId = wf-id-non-existent" + ) + + # raise_if_closed attribute + domain.get_workflow_execution("wf-id-1", run_id="run-id-1", raise_if_closed=True).should.equal(wfe1) + domain.get_workflow_execution.when.called_with( + "wf-id-3", run_id="run-id-4", raise_if_closed=True + ).should.throw( + SWFUnknownResourceFault, + "Unknown execution: WorkflowExecution=[workflowId=wf-id-3, runId=run-id-4]" + ) diff --git a/tests/test_swf/models/test_workflow_execution.py b/tests/test_swf/models/test_workflow_execution.py index 21f7c5448..ced636969 100644 --- a/tests/test_swf/models/test_workflow_execution.py +++ b/tests/test_swf/models/test_workflow_execution.py @@ -379,3 +379,18 @@ def test_complete_activity_task(): wfe.open_counts["openActivityTasks"].should.equal(0) wfe.open_counts["openDecisionTasks"].should.equal(1) + +def test_terminate(): + wfe = make_workflow_execution() + wfe.schedule_decision_task() + wfe.terminate() + + wfe.execution_status.should.equal("CLOSED") + wfe.close_status.should.equal("TERMINATED") + wfe.close_cause.should.equal("OPERATOR_INITIATED") + wfe.open_counts["openDecisionTasks"].should.equal(1) + + last_event = wfe.events()[-1] + last_event.event_type.should.equal("WorkflowExecutionTerminated") + # take default child_policy if not provided (as here) + last_event.child_policy.should.equal("ABANDON") diff --git a/tests/test_swf/responses/test_activity_tasks.py b/tests/test_swf/responses/test_activity_tasks.py index 13825f856..3c7f82b5e 100644 --- a/tests/test_swf/responses/test_activity_tasks.py +++ b/tests/test_swf/responses/test_activity_tasks.py @@ -111,7 +111,7 @@ def test_respond_activity_task_completed_on_closed_workflow_execution(): activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"] # bad: we're closing workflow execution manually, but endpoints are not coded for now.. - wfe = swf_backend.domains[0].workflow_executions.values()[0] + wfe = swf_backend.domains[0].workflow_executions[-1] wfe.execution_status = "CLOSED" # /bad diff --git a/tests/test_swf/responses/test_decision_tasks.py b/tests/test_swf/responses/test_decision_tasks.py index 64f51d30b..9510b31fd 100644 --- a/tests/test_swf/responses/test_decision_tasks.py +++ b/tests/test_swf/responses/test_decision_tasks.py @@ -123,7 +123,7 @@ def test_respond_decision_task_completed_on_close_workflow_execution(): task_token = resp["taskToken"] # bad: we're closing workflow execution manually, but endpoints are not coded for now.. - wfe = swf_backend.domains[0].workflow_executions.values()[0] + wfe = swf_backend.domains[0].workflow_executions[-1] wfe.execution_status = "CLOSED" # /bad diff --git a/tests/test_swf/responses/test_workflow_executions.py b/tests/test_swf/responses/test_workflow_executions.py index 1b8d599f9..f4125f77c 100644 --- a/tests/test_swf/responses/test_workflow_executions.py +++ b/tests/test_swf/responses/test_workflow_executions.py @@ -100,3 +100,64 @@ def test_get_workflow_execution_history_on_non_existent_workflow_execution(): conn.get_workflow_execution_history.when.called_with( "test-domain", "wrong-run-id", "wrong-workflow-id" ).should.throw(SWFUnknownResourceFault) + + +# TerminateWorkflowExecution endpoint +@mock_swf +def test_terminate_workflow_execution(): + conn = setup_swf_environment() + run_id = conn.start_workflow_execution( + "test-domain", "uid-abcd1234", "test-workflow", "v1.0" + )["runId"] + + resp = conn.terminate_workflow_execution("test-domain", "uid-abcd1234", + details="some details", + reason="a more complete reason", + run_id=run_id) + resp.should.be.none + + resp = conn.get_workflow_execution_history("test-domain", run_id, "uid-abcd1234") + evt = resp["events"][-1] + evt["eventType"].should.equal("WorkflowExecutionTerminated") + attrs = evt["workflowExecutionTerminatedEventAttributes"] + attrs["details"].should.equal("some details") + attrs["reason"].should.equal("a more complete reason") + attrs["cause"].should.equal("OPERATOR_INITIATED") + +@mock_swf +def test_terminate_workflow_execution_with_wrong_workflow_or_run_id(): + conn = setup_swf_environment() + run_id = conn.start_workflow_execution( + "test-domain", "uid-abcd1234", "test-workflow", "v1.0" + )["runId"] + + # terminate workflow execution + resp = conn.terminate_workflow_execution("test-domain", "uid-abcd1234") + + # already closed, with run_id + conn.terminate_workflow_execution.when.called_with( + "test-domain", "uid-abcd1234", run_id=run_id + ).should.throw( + SWFUnknownResourceFault, "WorkflowExecution=[workflowId=uid-abcd1234, runId=" + ) + + # already closed, without run_id + conn.terminate_workflow_execution.when.called_with( + "test-domain", "uid-abcd1234" + ).should.throw( + SWFUnknownResourceFault, "Unknown execution, workflowId = uid-abcd1234" + ) + + # wrong workflow id + conn.terminate_workflow_execution.when.called_with( + "test-domain", "uid-non-existent" + ).should.throw( + SWFUnknownResourceFault, "Unknown execution, workflowId = uid-non-existent" + ) + + # wrong run_id + conn.terminate_workflow_execution.when.called_with( + "test-domain", "uid-abcd1234", run_id="foo" + ).should.throw( + SWFUnknownResourceFault, "WorkflowExecution=[workflowId=uid-abcd1234, runId=" + )