Add SWF endpoint RespondActivityTaskCompleted
This commit is contained in:
parent
08643945df
commit
c9e8ad03f8
@ -298,6 +298,49 @@ class SWFBackend(BaseBackend):
|
||||
count += len(pending)
|
||||
return count
|
||||
|
||||
def respond_activity_task_completed(self, task_token, result=None):
|
||||
self._check_string(task_token)
|
||||
self._check_none_or_string(result)
|
||||
# let's find the activity task
|
||||
activity_task = None
|
||||
for domain in self.domains:
|
||||
for _, wfe in domain.workflow_executions.iteritems():
|
||||
for task in wfe.activity_tasks:
|
||||
if task.task_token == task_token:
|
||||
activity_task = task
|
||||
# no task found
|
||||
if not activity_task:
|
||||
# Same as for decision tasks, we raise an invalid token BOTH for clearly
|
||||
# wrong SWF tokens and OK tokens but not used correctly. This should not
|
||||
# be a problem in moto.
|
||||
raise SWFValidationException("Invalid token")
|
||||
# activity task found, but WorflowExecution is CLOSED
|
||||
wfe = activity_task.workflow_execution
|
||||
if wfe.execution_status != "OPEN":
|
||||
raise SWFUnknownResourceFault(
|
||||
"execution",
|
||||
"WorkflowExecution=[workflowId={}, runId={}]".format(
|
||||
wfe.workflow_id, wfe.run_id
|
||||
)
|
||||
)
|
||||
# activity task found, but already completed
|
||||
if activity_task.state != "STARTED":
|
||||
if activity_task.state == "COMPLETED":
|
||||
raise SWFUnknownResourceFault(
|
||||
"activity, scheduledEventId = {}".format(activity_task.scheduled_event_id)
|
||||
)
|
||||
else:
|
||||
raise ValueError(
|
||||
"This shouldn't happen: you have to PollForActivityTask to get a token, "
|
||||
"which changes ActivityTask status to 'STARTED' ; then it can only change "
|
||||
"to 'COMPLETED'. If you didn't hack moto/swf internals, this is probably "
|
||||
"a bug in moto, please report it, thanks!"
|
||||
)
|
||||
# everything's good
|
||||
if activity_task:
|
||||
wfe = activity_task.workflow_execution
|
||||
wfe.complete_activity_task(activity_task.task_token, result=result)
|
||||
|
||||
|
||||
swf_backends = {}
|
||||
for region in boto.swf.regions():
|
||||
|
@ -113,6 +113,14 @@ class HistoryEvent(object):
|
||||
if hasattr(self, "identity") and self.identity:
|
||||
hsh["identity"] = self.identity
|
||||
return hsh
|
||||
elif self.event_type == "ActivityTaskCompleted":
|
||||
hsh = {
|
||||
"scheduledEventId": self.scheduled_event_id,
|
||||
"startedEventId": self.started_event_id,
|
||||
}
|
||||
if hasattr(self, "result") and self.result is not None:
|
||||
hsh["result"] = self.result
|
||||
return hsh
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
"HistoryEvent does not implement attributes for type '{}'".format(self.event_type)
|
||||
|
@ -430,3 +430,16 @@ class WorkflowExecution(object):
|
||||
identity=identity
|
||||
)
|
||||
task.start(evt.event_id)
|
||||
|
||||
def complete_activity_task(self, task_token, result=None):
|
||||
task = self._find_activity_task(task_token)
|
||||
evt = self._add_event(
|
||||
"ActivityTaskCompleted",
|
||||
scheduled_event_id=task.scheduled_event_id,
|
||||
started_event_id=task.started_event_id,
|
||||
result=result,
|
||||
)
|
||||
task.complete()
|
||||
self.open_counts["openActivityTasks"] -= 1
|
||||
# TODO: ensure we don't schedule multiple decisions at the same time!
|
||||
self.schedule_decision_task()
|
||||
|
@ -270,3 +270,11 @@ class SWFResponse(BaseResponse):
|
||||
task_list = self._params["taskList"]["name"]
|
||||
count = self.swf_backend.count_pending_activity_tasks(domain_name, task_list)
|
||||
return json.dumps({"count": count, "truncated": False})
|
||||
|
||||
def respond_activity_task_completed(self):
|
||||
task_token = self._params["taskToken"]
|
||||
result = self._params.get("result")
|
||||
self.swf_backend.respond_activity_task_completed(
|
||||
task_token, result=result
|
||||
)
|
||||
return ""
|
||||
|
@ -352,3 +352,22 @@ def test_workflow_execution_start_activity_task():
|
||||
task.state.should.equal("STARTED")
|
||||
wfe.events()[-1].event_type.should.equal("ActivityTaskStarted")
|
||||
wfe.events()[-1].identity.should.equal("worker01")
|
||||
|
||||
def test_complete_activity_task():
|
||||
wfe = make_workflow_execution()
|
||||
wfe.schedule_activity_task(123, VALID_ACTIVITY_TASK_ATTRIBUTES)
|
||||
task_token = wfe.activity_tasks[-1].task_token
|
||||
|
||||
wfe.open_counts["openActivityTasks"].should.equal(1)
|
||||
wfe.open_counts["openDecisionTasks"].should.equal(0)
|
||||
|
||||
wfe.start_activity_task(task_token, identity="worker01")
|
||||
wfe.complete_activity_task(task_token, result="a superb result")
|
||||
|
||||
task = wfe.activity_tasks[-1]
|
||||
task.state.should.equal("COMPLETED")
|
||||
wfe.events()[-2].event_type.should.equal("ActivityTaskCompleted")
|
||||
wfe.events()[-1].event_type.should.equal("DecisionTaskScheduled")
|
||||
|
||||
wfe.open_counts["openActivityTasks"].should.equal(0)
|
||||
wfe.open_counts["openDecisionTasks"].should.equal(1)
|
||||
|
@ -2,25 +2,31 @@ import boto
|
||||
from sure import expect
|
||||
|
||||
from moto import mock_swf
|
||||
from moto.swf.exceptions import SWFUnknownResourceFault
|
||||
from moto.swf import swf_backend
|
||||
from moto.swf.exceptions import (
|
||||
SWFValidationException,
|
||||
SWFUnknownResourceFault,
|
||||
)
|
||||
|
||||
from ..utils import setup_workflow
|
||||
|
||||
|
||||
SCHEDULE_ACTIVITY_TASK_DECISION = {
|
||||
"decisionType": "ScheduleActivityTask",
|
||||
"scheduleActivityTaskDecisionAttributes": {
|
||||
"activityId": "my-activity-001",
|
||||
"activityType": { "name": "test-activity", "version": "v1.1" },
|
||||
"taskList": { "name": "activity-task-list" },
|
||||
}
|
||||
}
|
||||
|
||||
# PollForActivityTask endpoint
|
||||
@mock_swf
|
||||
def test_poll_for_activity_task_when_one():
|
||||
conn = setup_workflow()
|
||||
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
|
||||
conn.respond_decision_task_completed(decision_token, decisions=[
|
||||
{
|
||||
"decisionType": "ScheduleActivityTask",
|
||||
"scheduleActivityTaskDecisionAttributes": {
|
||||
"activityId": "my-activity-001",
|
||||
"activityType": { "name": "test-activity", "version": "v1.1" },
|
||||
"taskList": { "name": "activity-task-list" },
|
||||
}
|
||||
}
|
||||
SCHEDULE_ACTIVITY_TASK_DECISION
|
||||
])
|
||||
resp = conn.poll_for_activity_task("test-domain", "activity-task-list", identity="surprise")
|
||||
resp["activityId"].should.equal("my-activity-001")
|
||||
@ -51,14 +57,7 @@ def test_count_pending_activity_tasks():
|
||||
conn = setup_workflow()
|
||||
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
|
||||
conn.respond_decision_task_completed(decision_token, decisions=[
|
||||
{
|
||||
"decisionType": "ScheduleActivityTask",
|
||||
"scheduleActivityTaskDecisionAttributes": {
|
||||
"activityId": "my-activity-001",
|
||||
"activityType": { "name": "test-activity", "version": "v1.1" },
|
||||
"taskList": { "name": "activity-task-list" },
|
||||
}
|
||||
}
|
||||
SCHEDULE_ACTIVITY_TASK_DECISION
|
||||
])
|
||||
|
||||
resp = conn.count_pending_activity_tasks("test-domain", "activity-task-list")
|
||||
@ -69,3 +68,68 @@ def test_count_pending_decision_tasks_on_non_existent_task_list():
|
||||
conn = setup_workflow()
|
||||
resp = conn.count_pending_activity_tasks("test-domain", "non-existent")
|
||||
resp.should.equal({"count": 0, "truncated": False})
|
||||
|
||||
|
||||
# RespondActivityTaskCompleted endpoint
|
||||
@mock_swf
|
||||
def test_poll_for_activity_task_when_one():
|
||||
conn = setup_workflow()
|
||||
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
|
||||
conn.respond_decision_task_completed(decision_token, decisions=[
|
||||
SCHEDULE_ACTIVITY_TASK_DECISION
|
||||
])
|
||||
activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"]
|
||||
|
||||
resp = conn.respond_activity_task_completed(activity_token, result="result of the task")
|
||||
resp.should.be.none
|
||||
|
||||
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
|
||||
resp["events"][-2]["eventType"].should.equal("ActivityTaskCompleted")
|
||||
resp["events"][-2]["activityTaskCompletedEventAttributes"].should.equal(
|
||||
{ "result": "result of the task", "scheduledEventId": 5, "startedEventId": 6 }
|
||||
)
|
||||
|
||||
@mock_swf
|
||||
def test_respond_activity_task_completed_with_wrong_token():
|
||||
conn = setup_workflow()
|
||||
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
|
||||
conn.respond_decision_task_completed(decision_token, decisions=[
|
||||
SCHEDULE_ACTIVITY_TASK_DECISION
|
||||
])
|
||||
conn.poll_for_activity_task("test-domain", "activity-task-list")
|
||||
conn.respond_activity_task_completed.when.called_with(
|
||||
"not-a-correct-token"
|
||||
).should.throw(SWFValidationException, "Invalid token")
|
||||
|
||||
@mock_swf
|
||||
def test_respond_activity_task_completed_on_closed_workflow_execution():
|
||||
conn = setup_workflow()
|
||||
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
|
||||
conn.respond_decision_task_completed(decision_token, decisions=[
|
||||
SCHEDULE_ACTIVITY_TASK_DECISION
|
||||
])
|
||||
activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"]
|
||||
|
||||
# bad: we're closing workflow execution manually, but endpoints are not coded for now..
|
||||
wfe = swf_backend.domains[0].workflow_executions.values()[0]
|
||||
wfe.execution_status = "CLOSED"
|
||||
# /bad
|
||||
|
||||
conn.respond_activity_task_completed.when.called_with(
|
||||
activity_token
|
||||
).should.throw(SWFUnknownResourceFault, "WorkflowExecution=")
|
||||
|
||||
@mock_swf
|
||||
def test_respond_activity_task_completed_with_task_already_completed():
|
||||
conn = setup_workflow()
|
||||
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
|
||||
conn.respond_decision_task_completed(decision_token, decisions=[
|
||||
SCHEDULE_ACTIVITY_TASK_DECISION
|
||||
])
|
||||
activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"]
|
||||
|
||||
conn.respond_activity_task_completed(activity_token)
|
||||
|
||||
conn.respond_activity_task_completed.when.called_with(
|
||||
activity_token
|
||||
).should.throw(SWFUnknownResourceFault, "Unknown activity, scheduledEventId = 5")
|
||||
|
Loading…
Reference in New Issue
Block a user