From d97c770849a21615551c4a9d8e5f104db540c339 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Barth Date: Mon, 12 Oct 2015 11:08:52 +0200 Subject: [PATCH] Add first version of SWF endpoint RespondDecisionTaskCompleted There's just the structure for now, for now the workflow execution doesn't know how to handle any decision type. --- moto/swf/exceptions.py | 16 ++++++- moto/swf/models/__init__.py | 54 +++++++++++++++++++++++ moto/swf/models/decision_task.py | 3 ++ moto/swf/models/history_event.py | 8 ++++ moto/swf/models/workflow_execution.py | 39 +++++++++++++++++ moto/swf/responses.py | 10 +++++ tests/test_swf/test_decision_tasks.py | 62 +++++++++++++++++++++++++++ tests/test_swf/test_exceptions.py | 21 +++++++++ 8 files changed, 211 insertions(+), 2 deletions(-) diff --git a/moto/swf/exceptions.py b/moto/swf/exceptions.py index cbab4e200..5f3daf108 100644 --- a/moto/swf/exceptions.py +++ b/moto/swf/exceptions.py @@ -12,9 +12,13 @@ class SWFClientError(JSONResponseError): class SWFUnknownResourceFault(SWFClientError): - def __init__(self, resource_type, resource_name): + def __init__(self, resource_type, resource_name=None): + if resource_name: + message = "Unknown {}: {}".format(resource_type, resource_name) + else: + message = "Unknown {}".format(resource_type) super(SWFUnknownResourceFault, self).__init__( - "Unknown {}: {}".format(resource_type, resource_name), + message, "com.amazonaws.swf.base.model#UnknownResourceFault") @@ -75,3 +79,11 @@ class SWFDefaultUndefinedFault(SWFClientError): super(SWFDefaultUndefinedFault, self).__init__( key_camel_case, "com.amazonaws.swf.base.model#DefaultUndefinedFault" ) + + +class SWFValidationException(SWFClientError): + def __init__(self, message): + super(SWFValidationException, self).__init__( + message, + "com.amazon.coral.validate#ValidationException" + ) diff --git a/moto/swf/models/__init__.py b/moto/swf/models/__init__.py index 5b4b96c87..c91da492e 100644 --- a/moto/swf/models/__init__.py +++ b/moto/swf/models/__init__.py @@ -11,6 +11,7 @@ from ..exceptions import ( SWFSerializationException, SWFTypeAlreadyExistsFault, SWFTypeDeprecatedFault, + SWFValidationException, ) from .activity_type import ActivityType from .decision_task import DecisionTask @@ -201,6 +202,59 @@ class SWFBackend(BaseBackend): count += wfe.open_counts["openDecisionTasks"] return count + def respond_decision_task_completed(self, task_token, + decisions=None, + execution_context=None): + self._check_string(task_token) + self._check_none_or_string(execution_context) + # let's find decision task + decision_task = None + for domain in self.domains: + for _, wfe in domain.workflow_executions.iteritems(): + for dt in wfe.decision_tasks: + if dt.task_token == task_token: + decision_task = dt + # no decision task found + if not decision_task: + # In the real world, SWF distinguishes an obviously invalid token and a + # token that has no corresponding decision task. For the latter it seems + # to wait until a task with that token comes up (which looks like a smart + # choice in an eventually-consistent system). The call doesn't seem to + # timeout shortly, it takes 3 or 4 minutes to result in: + # BotoServerError: 500 Internal Server Error + # {"__type":"com.amazon.coral.service#InternalFailure"} + # This behavior is not documented clearly in SWF docs and we'll ignore it + # in moto, as there is no obvious reason to rely on it in tests. + raise SWFValidationException("Invalid token") + # decision task found, but WorflowExecution is CLOSED + wfe = decision_task.workflow_execution + if wfe.execution_status != "OPEN": + raise SWFUnknownResourceFault( + "execution", + "WorkflowExecution=[workflowId={}, runId={}]".format( + wfe.workflow_id, wfe.run_id + ) + ) + # decision task found, but already completed + if decision_task.state != "STARTED": + if decision_task.state == "COMPLETED": + raise SWFUnknownResourceFault( + "decision task, scheduledEventId = {}".format(decision_task.scheduled_event_id) + ) + else: + raise ValueError( + "This shouldn't happen: you have to PollForDecisionTask to get a token, " + "which changes DecisionTask status to 'STARTED' ; then it can only change " + "to 'COMPLETED'. If you didn't hack moto/swf internals, this is probably " + "a bug in moto, please report it, thanks!" + ) + # everything's good + if decision_task: + wfe = decision_task.workflow_execution + wfe.complete_decision_task(decision_task.task_token, + decisions=decisions, + execution_context=execution_context) + swf_backends = {} for region in boto.swf.regions(): diff --git a/moto/swf/models/decision_task.py b/moto/swf/models/decision_task.py index e9c3c00f6..967c94fa5 100644 --- a/moto/swf/models/decision_task.py +++ b/moto/swf/models/decision_task.py @@ -34,3 +34,6 @@ class DecisionTask(object): def start(self, started_event_id): self.state = "STARTED" self.started_event_id = started_event_id + + def complete(self): + self.state = "COMPLETED" diff --git a/moto/swf/models/history_event.py b/moto/swf/models/history_event.py index 93d0cfe41..10e97ecb3 100644 --- a/moto/swf/models/history_event.py +++ b/moto/swf/models/history_event.py @@ -56,6 +56,14 @@ class HistoryEvent(object): if hasattr(self, "identity") and self.identity: hsh["identity"] = self.identity return hsh + elif self.event_type == "DecisionTaskCompleted": + hsh = { + "scheduledEventId": self.scheduled_event_id, + "startedEventId": self.started_event_id, + } + if hasattr(self, "execution_context") and self.execution_context: + hsh["executionContext"] = self.execution_context + return hsh else: raise NotImplementedError( "HistoryEvent does not implement attributes for type '{}'".format(self.event_type) diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index 0bd632787..bee4305c0 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -151,3 +151,42 @@ class WorkflowExecution(object): identity=identity ) dt.start(evt.event_id) + + def complete_decision_task(self, task_token, decisions=None, execution_context=None): + # TODO: check if decision can really complete in case of malformed "decisions" + dt = self._find_decision_task(task_token) + evt = self._add_event( + "DecisionTaskCompleted", + scheduled_event_id=dt.scheduled_event_id, + started_event_id=dt.started_event_id, + execution_context=execution_context, + ) + dt.complete() + self.handle_decisions(decisions) + + def handle_decisions(self, decisions): + """ + Handles a Decision according to SWF docs. + See: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_Decision.html + """ + # 'decisions' can be None per boto.swf defaults, so better exiting + # directly for falsy values + if not decisions: + return + # handle each decision separately, in order + for decision in decisions: + decision_type = decision["decisionType"] + # TODO: implement Decision type: CancelTimer + # TODO: implement Decision type: CancelWorkflowExecution + # TODO: implement Decision type: CompleteWorkflowExecution + # TODO: implement Decision type: ContinueAsNewWorkflowExecution + # TODO: implement Decision type: FailWorkflowExecution + # TODO: implement Decision type: RecordMarker + # TODO: implement Decision type: RequestCancelActivityTask + # TODO: implement Decision type: RequestCancelExternalWorkflowExecution + # TODO: implement Decision type: ScheduleActivityTask + # TODO: implement Decision type: ScheduleLambdaFunction + # TODO: implement Decision type: SignalExternalWorkflowExecution + # TODO: implement Decision type: StartChildWorkflowExecution + # TODO: implement Decision type: StartTimer + raise NotImplementedError("Cannot handle decision: {}".format(decision_type)) diff --git a/moto/swf/responses.py b/moto/swf/responses.py index 7832e3ebf..9000d03f0 100644 --- a/moto/swf/responses.py +++ b/moto/swf/responses.py @@ -242,3 +242,13 @@ class SWFResponse(BaseResponse): task_list = self._params["taskList"]["name"] count = self.swf_backend.count_pending_decision_tasks(domain_name, task_list) return json.dumps({"count": count, "truncated": False}) + + + def respond_decision_task_completed(self): + task_token = self._params["taskToken"] + execution_context = self._params.get("executionContext") + decisions = self._params.get("decisions") + self.swf_backend.respond_decision_task_completed( + task_token, decisions=decisions, execution_context=execution_context + ) + return "" diff --git a/tests/test_swf/test_decision_tasks.py b/tests/test_swf/test_decision_tasks.py index 06cdd4556..fe84f2ebd 100644 --- a/tests/test_swf/test_decision_tasks.py +++ b/tests/test_swf/test_decision_tasks.py @@ -2,8 +2,10 @@ import boto from sure import expect from moto import mock_swf +from moto.swf import swf_backend from moto.swf.exceptions import ( SWFUnknownResourceFault, + SWFValidationException, ) from .utils import mock_basic_workflow_type @@ -72,3 +74,63 @@ def test_count_pending_decision_tasks_on_non_existent_task_list(): conn = setup_workflow() resp = conn.count_pending_decision_tasks("test-domain", "non-existent") resp.should.equal({"count": 0, "truncated": False}) + + +# RespondDecisionTaskCompleted endpoint +@mock_swf +def test_respond_decision_task_completed_with_no_decision(): + conn = setup_workflow() + + resp = conn.poll_for_decision_task("test-domain", "queue") + task_token = resp["taskToken"] + + resp = conn.respond_decision_task_completed(task_token) + resp.should.be.none + + resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234") + types = [evt["eventType"] for evt in resp["events"]] + types.should.equal([ + "WorkflowExecutionStarted", + "DecisionTaskScheduled", + "DecisionTaskStarted", + "DecisionTaskCompleted", + ]) + evt = resp["events"][-1] + evt["decisionTaskCompletedEventAttributes"].should.equal({ + "scheduledEventId": 2, + "startedEventId": 3, + }) + +@mock_swf +def test_respond_decision_task_completed_with_wrong_token(): + conn = setup_workflow() + resp = conn.poll_for_decision_task("test-domain", "queue") + conn.respond_decision_task_completed.when.called_with( + "not-a-correct-token" + ).should.throw(SWFValidationException) + +@mock_swf +def test_respond_decision_task_completed_on_close_workflow_execution(): + conn = setup_workflow() + resp = conn.poll_for_decision_task("test-domain", "queue") + task_token = resp["taskToken"] + + # bad: we're closing workflow execution manually, but endpoints are not coded for now.. + wfe = swf_backend.domains[0].workflow_executions.values()[0] + wfe.execution_status = "CLOSED" + # /bad + + conn.respond_decision_task_completed.when.called_with( + task_token + ).should.throw(SWFUnknownResourceFault) + +@mock_swf +def test_respond_decision_task_completed_with_task_already_completed(): + conn = setup_workflow() + resp = conn.poll_for_decision_task("test-domain", "queue") + task_token = resp["taskToken"] + conn.respond_decision_task_completed(task_token) + + conn.respond_decision_task_completed.when.called_with( + task_token + ).should.throw(SWFUnknownResourceFault) diff --git a/tests/test_swf/test_exceptions.py b/tests/test_swf/test_exceptions.py index 27d48c261..a98e16feb 100644 --- a/tests/test_swf/test_exceptions.py +++ b/tests/test_swf/test_exceptions.py @@ -10,6 +10,7 @@ from moto.swf.exceptions import ( SWFTypeDeprecatedFault, SWFWorkflowExecutionAlreadyStartedFault, SWFDefaultUndefinedFault, + SWFValidationException, ) from moto.swf.models import ( WorkflowType, @@ -35,6 +36,16 @@ def test_swf_unknown_resource_fault(): "message": "Unknown type: detail" }) +def test_swf_unknown_resource_fault_with_only_one_parameter(): + ex = SWFUnknownResourceFault("foo bar baz") + + ex.status.should.equal(400) + ex.error_code.should.equal("UnknownResourceFault") + ex.body.should.equal({ + "__type": "com.amazonaws.swf.base.model#UnknownResourceFault", + "message": "Unknown foo bar baz" + }) + def test_swf_domain_already_exists_fault(): ex = SWFDomainAlreadyExistsFault("domain-name") @@ -103,3 +114,13 @@ def test_swf_default_undefined_fault(): "__type": "com.amazonaws.swf.base.model#DefaultUndefinedFault", "message": "executionStartToCloseTimeout", }) + +def test_swf_validation_exception(): + ex = SWFValidationException("Invalid token") + + ex.status.should.equal(400) + ex.error_code.should.equal("ValidationException") + ex.body.should.equal({ + "__type": "com.amazon.coral.validate#ValidationException", + "message": "Invalid token", + })