From fd12e317f86d9615769e1eb0686b3bafa0be8dff Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Barth Date: Wed, 28 Oct 2015 12:29:57 +0100 Subject: [PATCH] Add SWF endpoint RespondActivityTaskFailed --- moto/swf/models/__init__.py | 25 ++++++++---- moto/swf/models/activity_task.py | 3 ++ moto/swf/models/history_event.py | 11 +++++ moto/swf/models/workflow_execution.py | 14 +++++++ moto/swf/responses.py | 9 +++++ tests/test_swf/models/test_activity_task.py | 6 +++ .../test_swf/responses/test_activity_tasks.py | 40 ++++++++++++++++++- 7 files changed, 100 insertions(+), 8 deletions(-) diff --git a/moto/swf/models/__init__.py b/moto/swf/models/__init__.py index f488a24a7..a206e3e78 100644 --- a/moto/swf/models/__init__.py +++ b/moto/swf/models/__init__.py @@ -298,10 +298,7 @@ class SWFBackend(BaseBackend): count += len(pending) return count - def respond_activity_task_completed(self, task_token, result=None): - self._check_string(task_token) - self._check_none_or_string(result) - # let's find the activity task + def _find_activity_task_from_token(self, task_token): activity_task = None for domain in self.domains: for _, wfe in domain.workflow_executions.iteritems(): @@ -337,9 +334,23 @@ class SWFBackend(BaseBackend): "a bug in moto, please report it, thanks!" ) # everything's good - if activity_task: - wfe = activity_task.workflow_execution - wfe.complete_activity_task(activity_task.task_token, result=result) + return activity_task + + def respond_activity_task_completed(self, task_token, result=None): + self._check_string(task_token) + self._check_none_or_string(result) + activity_task = self._find_activity_task_from_token(task_token) + wfe = activity_task.workflow_execution + wfe.complete_activity_task(activity_task.task_token, result=result) + + def respond_activity_task_failed(self, task_token, reason=None, details=None): + self._check_string(task_token) + # TODO: implement length limits on reason and details (common pb with client libs) + self._check_none_or_string(reason) + self._check_none_or_string(details) + activity_task = self._find_activity_task_from_token(task_token) + wfe = activity_task.workflow_execution + wfe.fail_activity_task(activity_task.task_token, reason=reason, details=details) swf_backends = {} diff --git a/moto/swf/models/activity_task.py b/moto/swf/models/activity_task.py index c7b68d9cc..6baa01b5d 100644 --- a/moto/swf/models/activity_task.py +++ b/moto/swf/models/activity_task.py @@ -36,3 +36,6 @@ class ActivityTask(object): def complete(self): self.state = "COMPLETED" + + def fail(self): + self.state = "FAILED" diff --git a/moto/swf/models/history_event.py b/moto/swf/models/history_event.py index a6507c9f9..798cc810c 100644 --- a/moto/swf/models/history_event.py +++ b/moto/swf/models/history_event.py @@ -121,6 +121,17 @@ class HistoryEvent(object): if hasattr(self, "result") and self.result is not None: hsh["result"] = self.result return hsh + elif self.event_type == "ActivityTaskFailed": + # TODO: maybe merge it with ActivityTaskCompleted (different optional params tho) + hsh = { + "scheduledEventId": self.scheduled_event_id, + "startedEventId": self.started_event_id, + } + if hasattr(self, "reason") and self.reason is not None: + hsh["reason"] = self.reason + if hasattr(self, "details") and self.details is not None: + hsh["details"] = self.details + return hsh else: raise NotImplementedError( "HistoryEvent does not implement attributes for type '{}'".format(self.event_type) diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index e111a8f4b..900025dad 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -443,3 +443,17 @@ class WorkflowExecution(object): self.open_counts["openActivityTasks"] -= 1 # TODO: ensure we don't schedule multiple decisions at the same time! self.schedule_decision_task() + + def fail_activity_task(self, task_token, reason=None, details=None): + task = self._find_activity_task(task_token) + evt = self._add_event( + "ActivityTaskFailed", + scheduled_event_id=task.scheduled_event_id, + started_event_id=task.started_event_id, + reason=reason, + details=details, + ) + task.fail() + self.open_counts["openActivityTasks"] -= 1 + # TODO: ensure we don't schedule multiple decisions at the same time! + self.schedule_decision_task() diff --git a/moto/swf/responses.py b/moto/swf/responses.py index 3d180afcd..c90fd5d8d 100644 --- a/moto/swf/responses.py +++ b/moto/swf/responses.py @@ -278,3 +278,12 @@ class SWFResponse(BaseResponse): task_token, result=result ) return "" + + def respond_activity_task_failed(self): + task_token = self._params["taskToken"] + reason = self._params.get("reason") + details = self._params.get("details") + self.swf_backend.respond_activity_task_failed( + task_token, reason=reason, details=details + ) + return "" diff --git a/tests/test_swf/models/test_activity_task.py b/tests/test_swf/models/test_activity_task.py index d691cc054..93c842c8e 100644 --- a/tests/test_swf/models/test_activity_task.py +++ b/tests/test_swf/models/test_activity_task.py @@ -29,6 +29,12 @@ def test_activity_task_creation(): task.complete() task.state.should.equal("COMPLETED") + # NB: this doesn't make any sense for SWF, a task shouldn't go from a + # "COMPLETED" state to a "FAILED" one, but this is an internal state on our + # side and we don't care about invalid state transitions for now. + task.fail() + task.state.should.equal("FAILED") + def test_activity_task_full_dict_representation(): wfe = make_workflow_execution() wft = wfe.workflow_type diff --git a/tests/test_swf/responses/test_activity_tasks.py b/tests/test_swf/responses/test_activity_tasks.py index f5b053f6d..13825f856 100644 --- a/tests/test_swf/responses/test_activity_tasks.py +++ b/tests/test_swf/responses/test_activity_tasks.py @@ -72,7 +72,7 @@ def test_count_pending_decision_tasks_on_non_existent_task_list(): # RespondActivityTaskCompleted endpoint @mock_swf -def test_poll_for_activity_task_when_one(): +def test_respond_activity_task_completed(): conn = setup_workflow() decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"] conn.respond_decision_task_completed(decision_token, decisions=[ @@ -133,3 +133,41 @@ def test_respond_activity_task_completed_with_task_already_completed(): conn.respond_activity_task_completed.when.called_with( activity_token ).should.throw(SWFUnknownResourceFault, "Unknown activity, scheduledEventId = 5") + + +# RespondActivityTaskFailed endpoint +@mock_swf +def test_respond_activity_task_failed(): + conn = setup_workflow() + decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"] + conn.respond_decision_task_completed(decision_token, decisions=[ + SCHEDULE_ACTIVITY_TASK_DECISION + ]) + activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"] + + resp = conn.respond_activity_task_failed(activity_token, + reason="short reason", + details="long details") + resp.should.be.none + + resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234") + resp["events"][-2]["eventType"].should.equal("ActivityTaskFailed") + resp["events"][-2]["activityTaskFailedEventAttributes"].should.equal( + { "reason": "short reason", "details": "long details", + "scheduledEventId": 5, "startedEventId": 6 } + ) + +@mock_swf +def test_respond_activity_task_completed_with_wrong_token(): + # NB: we just test ONE failure case for RespondActivityTaskFailed + # because the safeguards are shared with RespondActivityTaskCompleted, so + # no need to retest everything end-to-end. + conn = setup_workflow() + decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"] + conn.respond_decision_task_completed(decision_token, decisions=[ + SCHEDULE_ACTIVITY_TASK_DECISION + ]) + conn.poll_for_activity_task("test-domain", "activity-task-list") + conn.respond_activity_task_failed.when.called_with( + "not-a-correct-token" + ).should.throw(SWFValidationException, "Invalid token")