diff --git a/moto/swf/models/__init__.py b/moto/swf/models/__init__.py index 19ec6eb43..f488a24a7 100644 --- a/moto/swf/models/__init__.py +++ b/moto/swf/models/__init__.py @@ -298,6 +298,49 @@ class SWFBackend(BaseBackend): count += len(pending) return count + def respond_activity_task_completed(self, task_token, result=None): + self._check_string(task_token) + self._check_none_or_string(result) + # let's find the activity task + activity_task = None + for domain in self.domains: + for _, wfe in domain.workflow_executions.iteritems(): + for task in wfe.activity_tasks: + if task.task_token == task_token: + activity_task = task + # no task found + if not activity_task: + # Same as for decision tasks, we raise an invalid token BOTH for clearly + # wrong SWF tokens and OK tokens but not used correctly. This should not + # be a problem in moto. + raise SWFValidationException("Invalid token") + # activity task found, but WorflowExecution is CLOSED + wfe = activity_task.workflow_execution + if wfe.execution_status != "OPEN": + raise SWFUnknownResourceFault( + "execution", + "WorkflowExecution=[workflowId={}, runId={}]".format( + wfe.workflow_id, wfe.run_id + ) + ) + # activity task found, but already completed + if activity_task.state != "STARTED": + if activity_task.state == "COMPLETED": + raise SWFUnknownResourceFault( + "activity, scheduledEventId = {}".format(activity_task.scheduled_event_id) + ) + else: + raise ValueError( + "This shouldn't happen: you have to PollForActivityTask to get a token, " + "which changes ActivityTask status to 'STARTED' ; then it can only change " + "to 'COMPLETED'. If you didn't hack moto/swf internals, this is probably " + "a bug in moto, please report it, thanks!" + ) + # everything's good + if activity_task: + wfe = activity_task.workflow_execution + wfe.complete_activity_task(activity_task.task_token, result=result) + swf_backends = {} for region in boto.swf.regions(): diff --git a/moto/swf/models/history_event.py b/moto/swf/models/history_event.py index 45a839038..a6507c9f9 100644 --- a/moto/swf/models/history_event.py +++ b/moto/swf/models/history_event.py @@ -113,6 +113,14 @@ class HistoryEvent(object): if hasattr(self, "identity") and self.identity: hsh["identity"] = self.identity return hsh + elif self.event_type == "ActivityTaskCompleted": + hsh = { + "scheduledEventId": self.scheduled_event_id, + "startedEventId": self.started_event_id, + } + if hasattr(self, "result") and self.result is not None: + hsh["result"] = self.result + return hsh else: raise NotImplementedError( "HistoryEvent does not implement attributes for type '{}'".format(self.event_type) diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index dfa7380f9..e111a8f4b 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -430,3 +430,16 @@ class WorkflowExecution(object): identity=identity ) task.start(evt.event_id) + + def complete_activity_task(self, task_token, result=None): + task = self._find_activity_task(task_token) + evt = self._add_event( + "ActivityTaskCompleted", + scheduled_event_id=task.scheduled_event_id, + started_event_id=task.started_event_id, + result=result, + ) + task.complete() + self.open_counts["openActivityTasks"] -= 1 + # TODO: ensure we don't schedule multiple decisions at the same time! + self.schedule_decision_task() diff --git a/moto/swf/responses.py b/moto/swf/responses.py index 890b23576..3d180afcd 100644 --- a/moto/swf/responses.py +++ b/moto/swf/responses.py @@ -270,3 +270,11 @@ class SWFResponse(BaseResponse): task_list = self._params["taskList"]["name"] count = self.swf_backend.count_pending_activity_tasks(domain_name, task_list) return json.dumps({"count": count, "truncated": False}) + + def respond_activity_task_completed(self): + task_token = self._params["taskToken"] + result = self._params.get("result") + self.swf_backend.respond_activity_task_completed( + task_token, result=result + ) + return "" diff --git a/tests/test_swf/models/test_workflow_execution.py b/tests/test_swf/models/test_workflow_execution.py index edacade87..30aafceb2 100644 --- a/tests/test_swf/models/test_workflow_execution.py +++ b/tests/test_swf/models/test_workflow_execution.py @@ -352,3 +352,22 @@ def test_workflow_execution_start_activity_task(): task.state.should.equal("STARTED") wfe.events()[-1].event_type.should.equal("ActivityTaskStarted") wfe.events()[-1].identity.should.equal("worker01") + +def test_complete_activity_task(): + wfe = make_workflow_execution() + wfe.schedule_activity_task(123, VALID_ACTIVITY_TASK_ATTRIBUTES) + task_token = wfe.activity_tasks[-1].task_token + + wfe.open_counts["openActivityTasks"].should.equal(1) + wfe.open_counts["openDecisionTasks"].should.equal(0) + + wfe.start_activity_task(task_token, identity="worker01") + wfe.complete_activity_task(task_token, result="a superb result") + + task = wfe.activity_tasks[-1] + task.state.should.equal("COMPLETED") + wfe.events()[-2].event_type.should.equal("ActivityTaskCompleted") + wfe.events()[-1].event_type.should.equal("DecisionTaskScheduled") + + wfe.open_counts["openActivityTasks"].should.equal(0) + wfe.open_counts["openDecisionTasks"].should.equal(1) diff --git a/tests/test_swf/responses/test_activity_tasks.py b/tests/test_swf/responses/test_activity_tasks.py index a4fb86841..f5b053f6d 100644 --- a/tests/test_swf/responses/test_activity_tasks.py +++ b/tests/test_swf/responses/test_activity_tasks.py @@ -2,25 +2,31 @@ import boto from sure import expect from moto import mock_swf -from moto.swf.exceptions import SWFUnknownResourceFault +from moto.swf import swf_backend +from moto.swf.exceptions import ( + SWFValidationException, + SWFUnknownResourceFault, +) from ..utils import setup_workflow +SCHEDULE_ACTIVITY_TASK_DECISION = { + "decisionType": "ScheduleActivityTask", + "scheduleActivityTaskDecisionAttributes": { + "activityId": "my-activity-001", + "activityType": { "name": "test-activity", "version": "v1.1" }, + "taskList": { "name": "activity-task-list" }, + } +} + # PollForActivityTask endpoint @mock_swf def test_poll_for_activity_task_when_one(): conn = setup_workflow() decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"] conn.respond_decision_task_completed(decision_token, decisions=[ - { - "decisionType": "ScheduleActivityTask", - "scheduleActivityTaskDecisionAttributes": { - "activityId": "my-activity-001", - "activityType": { "name": "test-activity", "version": "v1.1" }, - "taskList": { "name": "activity-task-list" }, - } - } + SCHEDULE_ACTIVITY_TASK_DECISION ]) resp = conn.poll_for_activity_task("test-domain", "activity-task-list", identity="surprise") resp["activityId"].should.equal("my-activity-001") @@ -51,14 +57,7 @@ def test_count_pending_activity_tasks(): conn = setup_workflow() decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"] conn.respond_decision_task_completed(decision_token, decisions=[ - { - "decisionType": "ScheduleActivityTask", - "scheduleActivityTaskDecisionAttributes": { - "activityId": "my-activity-001", - "activityType": { "name": "test-activity", "version": "v1.1" }, - "taskList": { "name": "activity-task-list" }, - } - } + SCHEDULE_ACTIVITY_TASK_DECISION ]) resp = conn.count_pending_activity_tasks("test-domain", "activity-task-list") @@ -69,3 +68,68 @@ def test_count_pending_decision_tasks_on_non_existent_task_list(): conn = setup_workflow() resp = conn.count_pending_activity_tasks("test-domain", "non-existent") resp.should.equal({"count": 0, "truncated": False}) + + +# RespondActivityTaskCompleted endpoint +@mock_swf +def test_poll_for_activity_task_when_one(): + conn = setup_workflow() + decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"] + conn.respond_decision_task_completed(decision_token, decisions=[ + SCHEDULE_ACTIVITY_TASK_DECISION + ]) + activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"] + + resp = conn.respond_activity_task_completed(activity_token, result="result of the task") + resp.should.be.none + + resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234") + resp["events"][-2]["eventType"].should.equal("ActivityTaskCompleted") + resp["events"][-2]["activityTaskCompletedEventAttributes"].should.equal( + { "result": "result of the task", "scheduledEventId": 5, "startedEventId": 6 } + ) + +@mock_swf +def test_respond_activity_task_completed_with_wrong_token(): + conn = setup_workflow() + decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"] + conn.respond_decision_task_completed(decision_token, decisions=[ + SCHEDULE_ACTIVITY_TASK_DECISION + ]) + conn.poll_for_activity_task("test-domain", "activity-task-list") + conn.respond_activity_task_completed.when.called_with( + "not-a-correct-token" + ).should.throw(SWFValidationException, "Invalid token") + +@mock_swf +def test_respond_activity_task_completed_on_closed_workflow_execution(): + conn = setup_workflow() + decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"] + conn.respond_decision_task_completed(decision_token, decisions=[ + SCHEDULE_ACTIVITY_TASK_DECISION + ]) + activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"] + + # bad: we're closing workflow execution manually, but endpoints are not coded for now.. + wfe = swf_backend.domains[0].workflow_executions.values()[0] + wfe.execution_status = "CLOSED" + # /bad + + conn.respond_activity_task_completed.when.called_with( + activity_token + ).should.throw(SWFUnknownResourceFault, "WorkflowExecution=") + +@mock_swf +def test_respond_activity_task_completed_with_task_already_completed(): + conn = setup_workflow() + decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"] + conn.respond_decision_task_completed(decision_token, decisions=[ + SCHEDULE_ACTIVITY_TASK_DECISION + ]) + activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"] + + conn.respond_activity_task_completed(activity_token) + + conn.respond_activity_task_completed.when.called_with( + activity_token + ).should.throw(SWFUnknownResourceFault, "Unknown activity, scheduledEventId = 5")