SWF: Start just one decision task at a time (#5669)

This commit is contained in:
Alexandru Ciucă 2022-11-16 12:36:40 +02:00 committed by GitHub
parent 6ab2497a12
commit d6b4b9718d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 110 additions and 3 deletions

View File

@ -212,9 +212,27 @@ class SWFBackend(BaseBackend):
#
# TODO: handle long polling (case 2) for decision tasks
candidates = []
for _task_list, tasks in domain.decision_task_lists.items():
if _task_list == task_list:
candidates += [t for t in tasks if t.state == "SCHEDULED"]
# Collect candidate scheduled tasks from open workflow executions
# matching the selected task list.
#
# If another decision task is already started, then no candidates
# will be produced for that workflow execution. This is because only one
# decision task can be started at any given time.
# See https://docs.aws.amazon.com/amazonswf/latest/developerguide/swf-dev-tasks.html
for wfe in domain.workflow_executions:
if wfe.task_list == task_list and wfe.open:
wfe_candidates = []
found_started = False
for task in wfe.decision_tasks:
if task.state == "STARTED":
found_started = True
break
elif task.state == "SCHEDULED":
wfe_candidates.append(task)
if not found_started:
candidates += wfe_candidates
if any(candidates):
# TODO: handle task priorities (but not supported by boto for now)
task = min(candidates, key=lambda d: d.scheduled_at)

View File

@ -68,6 +68,95 @@ def test_poll_for_decision_task_previous_started_event_id_boto3():
assert resp["previousStartedEventId"] == 3
@mock_swf
def test_poll_for_decision_task_ensure_single_started_task():
client = setup_workflow_boto3()
resp = client.poll_for_decision_task(
domain="test-domain", taskList={"name": "queue"}
)
resp.should.have.key("taskToken")
first_decision_task = resp["taskToken"]
# History should have just the decision task triggered on workflow start
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal(
["WorkflowExecutionStarted", "DecisionTaskScheduled", "DecisionTaskStarted"]
)
# Schedule another decision task, before first one is completed
client.signal_workflow_execution(
domain="test-domain",
signalName="my_signal",
workflowId="uid-abcd1234",
input="my_input",
runId=client.run_id,
)
# Second poll should return no new tasks
resp = client.poll_for_decision_task(
domain="test-domain", taskList={"name": "queue"}
)
assert resp["previousStartedEventId"] == 0
assert resp["startedEventId"] == 0
assert resp.should_not.have.key("taskToken")
resp = client.get_workflow_execution_history(
domain="test-domain",
execution={"runId": client.run_id, "workflowId": "uid-abcd1234"},
)
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal(
[
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"WorkflowExecutionSignaled",
"DecisionTaskScheduled",
]
)
client.respond_decision_task_completed(taskToken=first_decision_task)
resp = client.poll_for_decision_task(
domain="test-domain", taskList={"name": "queue"}
)
resp.should.have.key("taskToken")
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal(
[
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"WorkflowExecutionSignaled",
"DecisionTaskScheduled",
"DecisionTaskCompleted",
"DecisionTaskStarted",
]
)
@mock_swf
def test_poll_for_decision_task_exclude_completed_executions():
client = setup_workflow_boto3()
resp = client.get_workflow_execution_history(
domain="test-domain",
execution={"runId": client.run_id, "workflowId": "uid-abcd1234"},
)
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal(["WorkflowExecutionStarted", "DecisionTaskScheduled"])
client.terminate_workflow_execution(
domain="test-domain", runId=client.run_id, workflowId="uid-abcd1234"
)
resp = client.poll_for_decision_task(
domain="test-domain", taskList={"name": "queue"}
)
resp.should_not.have.key("taskToken")
@mock_swf
def test_poll_for_decision_task_when_none_boto3():
client = setup_workflow_boto3()