Add SWF endpoint RespondActivityTaskFailed
This commit is contained in:
parent
c9e8ad03f8
commit
fd12e317f8
@ -298,10 +298,7 @@ class SWFBackend(BaseBackend):
|
|||||||
count += len(pending)
|
count += len(pending)
|
||||||
return count
|
return count
|
||||||
|
|
||||||
def respond_activity_task_completed(self, task_token, result=None):
|
def _find_activity_task_from_token(self, task_token):
|
||||||
self._check_string(task_token)
|
|
||||||
self._check_none_or_string(result)
|
|
||||||
# let's find the activity task
|
|
||||||
activity_task = None
|
activity_task = None
|
||||||
for domain in self.domains:
|
for domain in self.domains:
|
||||||
for _, wfe in domain.workflow_executions.iteritems():
|
for _, wfe in domain.workflow_executions.iteritems():
|
||||||
@ -337,9 +334,23 @@ class SWFBackend(BaseBackend):
|
|||||||
"a bug in moto, please report it, thanks!"
|
"a bug in moto, please report it, thanks!"
|
||||||
)
|
)
|
||||||
# everything's good
|
# everything's good
|
||||||
if activity_task:
|
return activity_task
|
||||||
wfe = activity_task.workflow_execution
|
|
||||||
wfe.complete_activity_task(activity_task.task_token, result=result)
|
def respond_activity_task_completed(self, task_token, result=None):
|
||||||
|
self._check_string(task_token)
|
||||||
|
self._check_none_or_string(result)
|
||||||
|
activity_task = self._find_activity_task_from_token(task_token)
|
||||||
|
wfe = activity_task.workflow_execution
|
||||||
|
wfe.complete_activity_task(activity_task.task_token, result=result)
|
||||||
|
|
||||||
|
def respond_activity_task_failed(self, task_token, reason=None, details=None):
|
||||||
|
self._check_string(task_token)
|
||||||
|
# TODO: implement length limits on reason and details (common pb with client libs)
|
||||||
|
self._check_none_or_string(reason)
|
||||||
|
self._check_none_or_string(details)
|
||||||
|
activity_task = self._find_activity_task_from_token(task_token)
|
||||||
|
wfe = activity_task.workflow_execution
|
||||||
|
wfe.fail_activity_task(activity_task.task_token, reason=reason, details=details)
|
||||||
|
|
||||||
|
|
||||||
swf_backends = {}
|
swf_backends = {}
|
||||||
|
@ -36,3 +36,6 @@ class ActivityTask(object):
|
|||||||
|
|
||||||
def complete(self):
|
def complete(self):
|
||||||
self.state = "COMPLETED"
|
self.state = "COMPLETED"
|
||||||
|
|
||||||
|
def fail(self):
|
||||||
|
self.state = "FAILED"
|
||||||
|
@ -121,6 +121,17 @@ class HistoryEvent(object):
|
|||||||
if hasattr(self, "result") and self.result is not None:
|
if hasattr(self, "result") and self.result is not None:
|
||||||
hsh["result"] = self.result
|
hsh["result"] = self.result
|
||||||
return hsh
|
return hsh
|
||||||
|
elif self.event_type == "ActivityTaskFailed":
|
||||||
|
# TODO: maybe merge it with ActivityTaskCompleted (different optional params tho)
|
||||||
|
hsh = {
|
||||||
|
"scheduledEventId": self.scheduled_event_id,
|
||||||
|
"startedEventId": self.started_event_id,
|
||||||
|
}
|
||||||
|
if hasattr(self, "reason") and self.reason is not None:
|
||||||
|
hsh["reason"] = self.reason
|
||||||
|
if hasattr(self, "details") and self.details is not None:
|
||||||
|
hsh["details"] = self.details
|
||||||
|
return hsh
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError(
|
raise NotImplementedError(
|
||||||
"HistoryEvent does not implement attributes for type '{}'".format(self.event_type)
|
"HistoryEvent does not implement attributes for type '{}'".format(self.event_type)
|
||||||
|
@ -443,3 +443,17 @@ class WorkflowExecution(object):
|
|||||||
self.open_counts["openActivityTasks"] -= 1
|
self.open_counts["openActivityTasks"] -= 1
|
||||||
# TODO: ensure we don't schedule multiple decisions at the same time!
|
# TODO: ensure we don't schedule multiple decisions at the same time!
|
||||||
self.schedule_decision_task()
|
self.schedule_decision_task()
|
||||||
|
|
||||||
|
def fail_activity_task(self, task_token, reason=None, details=None):
|
||||||
|
task = self._find_activity_task(task_token)
|
||||||
|
evt = self._add_event(
|
||||||
|
"ActivityTaskFailed",
|
||||||
|
scheduled_event_id=task.scheduled_event_id,
|
||||||
|
started_event_id=task.started_event_id,
|
||||||
|
reason=reason,
|
||||||
|
details=details,
|
||||||
|
)
|
||||||
|
task.fail()
|
||||||
|
self.open_counts["openActivityTasks"] -= 1
|
||||||
|
# TODO: ensure we don't schedule multiple decisions at the same time!
|
||||||
|
self.schedule_decision_task()
|
||||||
|
@ -278,3 +278,12 @@ class SWFResponse(BaseResponse):
|
|||||||
task_token, result=result
|
task_token, result=result
|
||||||
)
|
)
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
def respond_activity_task_failed(self):
|
||||||
|
task_token = self._params["taskToken"]
|
||||||
|
reason = self._params.get("reason")
|
||||||
|
details = self._params.get("details")
|
||||||
|
self.swf_backend.respond_activity_task_failed(
|
||||||
|
task_token, reason=reason, details=details
|
||||||
|
)
|
||||||
|
return ""
|
||||||
|
@ -29,6 +29,12 @@ def test_activity_task_creation():
|
|||||||
task.complete()
|
task.complete()
|
||||||
task.state.should.equal("COMPLETED")
|
task.state.should.equal("COMPLETED")
|
||||||
|
|
||||||
|
# NB: this doesn't make any sense for SWF, a task shouldn't go from a
|
||||||
|
# "COMPLETED" state to a "FAILED" one, but this is an internal state on our
|
||||||
|
# side and we don't care about invalid state transitions for now.
|
||||||
|
task.fail()
|
||||||
|
task.state.should.equal("FAILED")
|
||||||
|
|
||||||
def test_activity_task_full_dict_representation():
|
def test_activity_task_full_dict_representation():
|
||||||
wfe = make_workflow_execution()
|
wfe = make_workflow_execution()
|
||||||
wft = wfe.workflow_type
|
wft = wfe.workflow_type
|
||||||
|
@ -72,7 +72,7 @@ def test_count_pending_decision_tasks_on_non_existent_task_list():
|
|||||||
|
|
||||||
# RespondActivityTaskCompleted endpoint
|
# RespondActivityTaskCompleted endpoint
|
||||||
@mock_swf
|
@mock_swf
|
||||||
def test_poll_for_activity_task_when_one():
|
def test_respond_activity_task_completed():
|
||||||
conn = setup_workflow()
|
conn = setup_workflow()
|
||||||
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
|
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
|
||||||
conn.respond_decision_task_completed(decision_token, decisions=[
|
conn.respond_decision_task_completed(decision_token, decisions=[
|
||||||
@ -133,3 +133,41 @@ def test_respond_activity_task_completed_with_task_already_completed():
|
|||||||
conn.respond_activity_task_completed.when.called_with(
|
conn.respond_activity_task_completed.when.called_with(
|
||||||
activity_token
|
activity_token
|
||||||
).should.throw(SWFUnknownResourceFault, "Unknown activity, scheduledEventId = 5")
|
).should.throw(SWFUnknownResourceFault, "Unknown activity, scheduledEventId = 5")
|
||||||
|
|
||||||
|
|
||||||
|
# RespondActivityTaskFailed endpoint
|
||||||
|
@mock_swf
|
||||||
|
def test_respond_activity_task_failed():
|
||||||
|
conn = setup_workflow()
|
||||||
|
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
|
||||||
|
conn.respond_decision_task_completed(decision_token, decisions=[
|
||||||
|
SCHEDULE_ACTIVITY_TASK_DECISION
|
||||||
|
])
|
||||||
|
activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"]
|
||||||
|
|
||||||
|
resp = conn.respond_activity_task_failed(activity_token,
|
||||||
|
reason="short reason",
|
||||||
|
details="long details")
|
||||||
|
resp.should.be.none
|
||||||
|
|
||||||
|
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
|
||||||
|
resp["events"][-2]["eventType"].should.equal("ActivityTaskFailed")
|
||||||
|
resp["events"][-2]["activityTaskFailedEventAttributes"].should.equal(
|
||||||
|
{ "reason": "short reason", "details": "long details",
|
||||||
|
"scheduledEventId": 5, "startedEventId": 6 }
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock_swf
|
||||||
|
def test_respond_activity_task_completed_with_wrong_token():
|
||||||
|
# NB: we just test ONE failure case for RespondActivityTaskFailed
|
||||||
|
# because the safeguards are shared with RespondActivityTaskCompleted, so
|
||||||
|
# no need to retest everything end-to-end.
|
||||||
|
conn = setup_workflow()
|
||||||
|
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
|
||||||
|
conn.respond_decision_task_completed(decision_token, decisions=[
|
||||||
|
SCHEDULE_ACTIVITY_TASK_DECISION
|
||||||
|
])
|
||||||
|
conn.poll_for_activity_task("test-domain", "activity-task-list")
|
||||||
|
conn.respond_activity_task_failed.when.called_with(
|
||||||
|
"not-a-correct-token"
|
||||||
|
).should.throw(SWFValidationException, "Invalid token")
|
||||||
|
Loading…
Reference in New Issue
Block a user