diff --git a/moto/swf/models/__init__.py b/moto/swf/models/__init__.py index c728666aa..509c32746 100644 --- a/moto/swf/models/__init__.py +++ b/moto/swf/models/__init__.py @@ -212,9 +212,27 @@ class SWFBackend(BaseBackend): # # TODO: handle long polling (case 2) for decision tasks candidates = [] - for _task_list, tasks in domain.decision_task_lists.items(): - if _task_list == task_list: - candidates += [t for t in tasks if t.state == "SCHEDULED"] + + # Collect candidate scheduled tasks from open workflow executions + # matching the selected task list. + # + # If another decision task is already started, then no candidates + # will be produced for that workflow execution. This is because only one + # decision task can be started at any given time. + # See https://docs.aws.amazon.com/amazonswf/latest/developerguide/swf-dev-tasks.html + for wfe in domain.workflow_executions: + if wfe.task_list == task_list and wfe.open: + wfe_candidates = [] + found_started = False + for task in wfe.decision_tasks: + if task.state == "STARTED": + found_started = True + break + elif task.state == "SCHEDULED": + wfe_candidates.append(task) + if not found_started: + candidates += wfe_candidates + if any(candidates): # TODO: handle task priorities (but not supported by boto for now) task = min(candidates, key=lambda d: d.scheduled_at) diff --git a/tests/test_swf/responses/test_decision_tasks.py b/tests/test_swf/responses/test_decision_tasks.py index 6ca249c9e..780c3803d 100644 --- a/tests/test_swf/responses/test_decision_tasks.py +++ b/tests/test_swf/responses/test_decision_tasks.py @@ -68,6 +68,95 @@ def test_poll_for_decision_task_previous_started_event_id_boto3(): assert resp["previousStartedEventId"] == 3 +@mock_swf +def test_poll_for_decision_task_ensure_single_started_task(): + client = setup_workflow_boto3() + + resp = client.poll_for_decision_task( + domain="test-domain", taskList={"name": "queue"} + ) + resp.should.have.key("taskToken") + first_decision_task = resp["taskToken"] + + # History should have just the decision task triggered on workflow start + types = [evt["eventType"] for evt in resp["events"]] + types.should.equal( + ["WorkflowExecutionStarted", "DecisionTaskScheduled", "DecisionTaskStarted"] + ) + + # Schedule another decision task, before first one is completed + client.signal_workflow_execution( + domain="test-domain", + signalName="my_signal", + workflowId="uid-abcd1234", + input="my_input", + runId=client.run_id, + ) + + # Second poll should return no new tasks + resp = client.poll_for_decision_task( + domain="test-domain", taskList={"name": "queue"} + ) + assert resp["previousStartedEventId"] == 0 + assert resp["startedEventId"] == 0 + assert resp.should_not.have.key("taskToken") + + resp = client.get_workflow_execution_history( + domain="test-domain", + execution={"runId": client.run_id, "workflowId": "uid-abcd1234"}, + ) + types = [evt["eventType"] for evt in resp["events"]] + types.should.equal( + [ + "WorkflowExecutionStarted", + "DecisionTaskScheduled", + "DecisionTaskStarted", + "WorkflowExecutionSignaled", + "DecisionTaskScheduled", + ] + ) + + client.respond_decision_task_completed(taskToken=first_decision_task) + + resp = client.poll_for_decision_task( + domain="test-domain", taskList={"name": "queue"} + ) + resp.should.have.key("taskToken") + + types = [evt["eventType"] for evt in resp["events"]] + types.should.equal( + [ + "WorkflowExecutionStarted", + "DecisionTaskScheduled", + "DecisionTaskStarted", + "WorkflowExecutionSignaled", + "DecisionTaskScheduled", + "DecisionTaskCompleted", + "DecisionTaskStarted", + ] + ) + + +@mock_swf +def test_poll_for_decision_task_exclude_completed_executions(): + client = setup_workflow_boto3() + + resp = client.get_workflow_execution_history( + domain="test-domain", + execution={"runId": client.run_id, "workflowId": "uid-abcd1234"}, + ) + types = [evt["eventType"] for evt in resp["events"]] + types.should.equal(["WorkflowExecutionStarted", "DecisionTaskScheduled"]) + + client.terminate_workflow_execution( + domain="test-domain", runId=client.run_id, workflowId="uid-abcd1234" + ) + resp = client.poll_for_decision_task( + domain="test-domain", taskList={"name": "queue"} + ) + resp.should_not.have.key("taskToken") + + @mock_swf def test_poll_for_decision_task_when_none_boto3(): client = setup_workflow_boto3()