Add first version of SWF endpoint RespondDecisionTaskCompleted
There's just the structure for now, for now the workflow execution doesn't know how to handle any decision type.
This commit is contained in:
parent
c72c198208
commit
d97c770849
@ -12,9 +12,13 @@ class SWFClientError(JSONResponseError):
|
||||
|
||||
|
||||
class SWFUnknownResourceFault(SWFClientError):
|
||||
def __init__(self, resource_type, resource_name):
|
||||
def __init__(self, resource_type, resource_name=None):
|
||||
if resource_name:
|
||||
message = "Unknown {}: {}".format(resource_type, resource_name)
|
||||
else:
|
||||
message = "Unknown {}".format(resource_type)
|
||||
super(SWFUnknownResourceFault, self).__init__(
|
||||
"Unknown {}: {}".format(resource_type, resource_name),
|
||||
message,
|
||||
"com.amazonaws.swf.base.model#UnknownResourceFault")
|
||||
|
||||
|
||||
@ -75,3 +79,11 @@ class SWFDefaultUndefinedFault(SWFClientError):
|
||||
super(SWFDefaultUndefinedFault, self).__init__(
|
||||
key_camel_case, "com.amazonaws.swf.base.model#DefaultUndefinedFault"
|
||||
)
|
||||
|
||||
|
||||
class SWFValidationException(SWFClientError):
|
||||
def __init__(self, message):
|
||||
super(SWFValidationException, self).__init__(
|
||||
message,
|
||||
"com.amazon.coral.validate#ValidationException"
|
||||
)
|
||||
|
@ -11,6 +11,7 @@ from ..exceptions import (
|
||||
SWFSerializationException,
|
||||
SWFTypeAlreadyExistsFault,
|
||||
SWFTypeDeprecatedFault,
|
||||
SWFValidationException,
|
||||
)
|
||||
from .activity_type import ActivityType
|
||||
from .decision_task import DecisionTask
|
||||
@ -201,6 +202,59 @@ class SWFBackend(BaseBackend):
|
||||
count += wfe.open_counts["openDecisionTasks"]
|
||||
return count
|
||||
|
||||
def respond_decision_task_completed(self, task_token,
|
||||
decisions=None,
|
||||
execution_context=None):
|
||||
self._check_string(task_token)
|
||||
self._check_none_or_string(execution_context)
|
||||
# let's find decision task
|
||||
decision_task = None
|
||||
for domain in self.domains:
|
||||
for _, wfe in domain.workflow_executions.iteritems():
|
||||
for dt in wfe.decision_tasks:
|
||||
if dt.task_token == task_token:
|
||||
decision_task = dt
|
||||
# no decision task found
|
||||
if not decision_task:
|
||||
# In the real world, SWF distinguishes an obviously invalid token and a
|
||||
# token that has no corresponding decision task. For the latter it seems
|
||||
# to wait until a task with that token comes up (which looks like a smart
|
||||
# choice in an eventually-consistent system). The call doesn't seem to
|
||||
# timeout shortly, it takes 3 or 4 minutes to result in:
|
||||
# BotoServerError: 500 Internal Server Error
|
||||
# {"__type":"com.amazon.coral.service#InternalFailure"}
|
||||
# This behavior is not documented clearly in SWF docs and we'll ignore it
|
||||
# in moto, as there is no obvious reason to rely on it in tests.
|
||||
raise SWFValidationException("Invalid token")
|
||||
# decision task found, but WorflowExecution is CLOSED
|
||||
wfe = decision_task.workflow_execution
|
||||
if wfe.execution_status != "OPEN":
|
||||
raise SWFUnknownResourceFault(
|
||||
"execution",
|
||||
"WorkflowExecution=[workflowId={}, runId={}]".format(
|
||||
wfe.workflow_id, wfe.run_id
|
||||
)
|
||||
)
|
||||
# decision task found, but already completed
|
||||
if decision_task.state != "STARTED":
|
||||
if decision_task.state == "COMPLETED":
|
||||
raise SWFUnknownResourceFault(
|
||||
"decision task, scheduledEventId = {}".format(decision_task.scheduled_event_id)
|
||||
)
|
||||
else:
|
||||
raise ValueError(
|
||||
"This shouldn't happen: you have to PollForDecisionTask to get a token, "
|
||||
"which changes DecisionTask status to 'STARTED' ; then it can only change "
|
||||
"to 'COMPLETED'. If you didn't hack moto/swf internals, this is probably "
|
||||
"a bug in moto, please report it, thanks!"
|
||||
)
|
||||
# everything's good
|
||||
if decision_task:
|
||||
wfe = decision_task.workflow_execution
|
||||
wfe.complete_decision_task(decision_task.task_token,
|
||||
decisions=decisions,
|
||||
execution_context=execution_context)
|
||||
|
||||
|
||||
swf_backends = {}
|
||||
for region in boto.swf.regions():
|
||||
|
@ -34,3 +34,6 @@ class DecisionTask(object):
|
||||
def start(self, started_event_id):
|
||||
self.state = "STARTED"
|
||||
self.started_event_id = started_event_id
|
||||
|
||||
def complete(self):
|
||||
self.state = "COMPLETED"
|
||||
|
@ -56,6 +56,14 @@ class HistoryEvent(object):
|
||||
if hasattr(self, "identity") and self.identity:
|
||||
hsh["identity"] = self.identity
|
||||
return hsh
|
||||
elif self.event_type == "DecisionTaskCompleted":
|
||||
hsh = {
|
||||
"scheduledEventId": self.scheduled_event_id,
|
||||
"startedEventId": self.started_event_id,
|
||||
}
|
||||
if hasattr(self, "execution_context") and self.execution_context:
|
||||
hsh["executionContext"] = self.execution_context
|
||||
return hsh
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
"HistoryEvent does not implement attributes for type '{}'".format(self.event_type)
|
||||
|
@ -151,3 +151,42 @@ class WorkflowExecution(object):
|
||||
identity=identity
|
||||
)
|
||||
dt.start(evt.event_id)
|
||||
|
||||
def complete_decision_task(self, task_token, decisions=None, execution_context=None):
|
||||
# TODO: check if decision can really complete in case of malformed "decisions"
|
||||
dt = self._find_decision_task(task_token)
|
||||
evt = self._add_event(
|
||||
"DecisionTaskCompleted",
|
||||
scheduled_event_id=dt.scheduled_event_id,
|
||||
started_event_id=dt.started_event_id,
|
||||
execution_context=execution_context,
|
||||
)
|
||||
dt.complete()
|
||||
self.handle_decisions(decisions)
|
||||
|
||||
def handle_decisions(self, decisions):
|
||||
"""
|
||||
Handles a Decision according to SWF docs.
|
||||
See: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_Decision.html
|
||||
"""
|
||||
# 'decisions' can be None per boto.swf defaults, so better exiting
|
||||
# directly for falsy values
|
||||
if not decisions:
|
||||
return
|
||||
# handle each decision separately, in order
|
||||
for decision in decisions:
|
||||
decision_type = decision["decisionType"]
|
||||
# TODO: implement Decision type: CancelTimer
|
||||
# TODO: implement Decision type: CancelWorkflowExecution
|
||||
# TODO: implement Decision type: CompleteWorkflowExecution
|
||||
# TODO: implement Decision type: ContinueAsNewWorkflowExecution
|
||||
# TODO: implement Decision type: FailWorkflowExecution
|
||||
# TODO: implement Decision type: RecordMarker
|
||||
# TODO: implement Decision type: RequestCancelActivityTask
|
||||
# TODO: implement Decision type: RequestCancelExternalWorkflowExecution
|
||||
# TODO: implement Decision type: ScheduleActivityTask
|
||||
# TODO: implement Decision type: ScheduleLambdaFunction
|
||||
# TODO: implement Decision type: SignalExternalWorkflowExecution
|
||||
# TODO: implement Decision type: StartChildWorkflowExecution
|
||||
# TODO: implement Decision type: StartTimer
|
||||
raise NotImplementedError("Cannot handle decision: {}".format(decision_type))
|
||||
|
@ -242,3 +242,13 @@ class SWFResponse(BaseResponse):
|
||||
task_list = self._params["taskList"]["name"]
|
||||
count = self.swf_backend.count_pending_decision_tasks(domain_name, task_list)
|
||||
return json.dumps({"count": count, "truncated": False})
|
||||
|
||||
|
||||
def respond_decision_task_completed(self):
|
||||
task_token = self._params["taskToken"]
|
||||
execution_context = self._params.get("executionContext")
|
||||
decisions = self._params.get("decisions")
|
||||
self.swf_backend.respond_decision_task_completed(
|
||||
task_token, decisions=decisions, execution_context=execution_context
|
||||
)
|
||||
return ""
|
||||
|
@ -2,8 +2,10 @@ import boto
|
||||
from sure import expect
|
||||
|
||||
from moto import mock_swf
|
||||
from moto.swf import swf_backend
|
||||
from moto.swf.exceptions import (
|
||||
SWFUnknownResourceFault,
|
||||
SWFValidationException,
|
||||
)
|
||||
|
||||
from .utils import mock_basic_workflow_type
|
||||
@ -72,3 +74,63 @@ def test_count_pending_decision_tasks_on_non_existent_task_list():
|
||||
conn = setup_workflow()
|
||||
resp = conn.count_pending_decision_tasks("test-domain", "non-existent")
|
||||
resp.should.equal({"count": 0, "truncated": False})
|
||||
|
||||
|
||||
# RespondDecisionTaskCompleted endpoint
|
||||
@mock_swf
|
||||
def test_respond_decision_task_completed_with_no_decision():
|
||||
conn = setup_workflow()
|
||||
|
||||
resp = conn.poll_for_decision_task("test-domain", "queue")
|
||||
task_token = resp["taskToken"]
|
||||
|
||||
resp = conn.respond_decision_task_completed(task_token)
|
||||
resp.should.be.none
|
||||
|
||||
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
|
||||
types = [evt["eventType"] for evt in resp["events"]]
|
||||
types.should.equal([
|
||||
"WorkflowExecutionStarted",
|
||||
"DecisionTaskScheduled",
|
||||
"DecisionTaskStarted",
|
||||
"DecisionTaskCompleted",
|
||||
])
|
||||
evt = resp["events"][-1]
|
||||
evt["decisionTaskCompletedEventAttributes"].should.equal({
|
||||
"scheduledEventId": 2,
|
||||
"startedEventId": 3,
|
||||
})
|
||||
|
||||
@mock_swf
|
||||
def test_respond_decision_task_completed_with_wrong_token():
|
||||
conn = setup_workflow()
|
||||
resp = conn.poll_for_decision_task("test-domain", "queue")
|
||||
conn.respond_decision_task_completed.when.called_with(
|
||||
"not-a-correct-token"
|
||||
).should.throw(SWFValidationException)
|
||||
|
||||
@mock_swf
|
||||
def test_respond_decision_task_completed_on_close_workflow_execution():
|
||||
conn = setup_workflow()
|
||||
resp = conn.poll_for_decision_task("test-domain", "queue")
|
||||
task_token = resp["taskToken"]
|
||||
|
||||
# bad: we're closing workflow execution manually, but endpoints are not coded for now..
|
||||
wfe = swf_backend.domains[0].workflow_executions.values()[0]
|
||||
wfe.execution_status = "CLOSED"
|
||||
# /bad
|
||||
|
||||
conn.respond_decision_task_completed.when.called_with(
|
||||
task_token
|
||||
).should.throw(SWFUnknownResourceFault)
|
||||
|
||||
@mock_swf
|
||||
def test_respond_decision_task_completed_with_task_already_completed():
|
||||
conn = setup_workflow()
|
||||
resp = conn.poll_for_decision_task("test-domain", "queue")
|
||||
task_token = resp["taskToken"]
|
||||
conn.respond_decision_task_completed(task_token)
|
||||
|
||||
conn.respond_decision_task_completed.when.called_with(
|
||||
task_token
|
||||
).should.throw(SWFUnknownResourceFault)
|
||||
|
@ -10,6 +10,7 @@ from moto.swf.exceptions import (
|
||||
SWFTypeDeprecatedFault,
|
||||
SWFWorkflowExecutionAlreadyStartedFault,
|
||||
SWFDefaultUndefinedFault,
|
||||
SWFValidationException,
|
||||
)
|
||||
from moto.swf.models import (
|
||||
WorkflowType,
|
||||
@ -35,6 +36,16 @@ def test_swf_unknown_resource_fault():
|
||||
"message": "Unknown type: detail"
|
||||
})
|
||||
|
||||
def test_swf_unknown_resource_fault_with_only_one_parameter():
|
||||
ex = SWFUnknownResourceFault("foo bar baz")
|
||||
|
||||
ex.status.should.equal(400)
|
||||
ex.error_code.should.equal("UnknownResourceFault")
|
||||
ex.body.should.equal({
|
||||
"__type": "com.amazonaws.swf.base.model#UnknownResourceFault",
|
||||
"message": "Unknown foo bar baz"
|
||||
})
|
||||
|
||||
def test_swf_domain_already_exists_fault():
|
||||
ex = SWFDomainAlreadyExistsFault("domain-name")
|
||||
|
||||
@ -103,3 +114,13 @@ def test_swf_default_undefined_fault():
|
||||
"__type": "com.amazonaws.swf.base.model#DefaultUndefinedFault",
|
||||
"message": "executionStartToCloseTimeout",
|
||||
})
|
||||
|
||||
def test_swf_validation_exception():
|
||||
ex = SWFValidationException("Invalid token")
|
||||
|
||||
ex.status.should.equal(400)
|
||||
ex.error_code.should.equal("ValidationException")
|
||||
ex.body.should.equal({
|
||||
"__type": "com.amazon.coral.validate#ValidationException",
|
||||
"message": "Invalid token",
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user