diff --git a/moto/swf/models/__init__.py b/moto/swf/models/__init__.py index c47d704a2..b1c00738d 100644 --- a/moto/swf/models/__init__.py +++ b/moto/swf/models/__init__.py @@ -13,6 +13,7 @@ from ..exceptions import ( SWFTypeDeprecatedFault, ) from .activity_type import ActivityType +from .decision_task import DecisionTask from .domain import Domain from .generic_type import GenericType from .history_event import HistoryEvent @@ -162,6 +163,36 @@ class SWFBackend(BaseBackend): domain = self._get_domain(domain_name) return domain.get_workflow_execution(run_id, workflow_id) + def poll_for_decision_task(self, domain_name, task_list, identity=None): + self._check_string(domain_name) + self._check_string(task_list) + domain = self._get_domain(domain_name) + # Real SWF cases: + # - case 1: there's a decision task to return, return it + # - case 2: there's no decision task to return, so wait for timeout + # and if a new decision is schedule, start and return it + # - case 3: timeout reached, no decision, return an empty decision + # (e.g. a decision with an empty "taskToken") + # + # For the sake of simplicity, we forget case 2 for now, so either + # there's a DecisionTask to return, either we return a blank one. + # + # SWF client libraries should cope with that easily as long as tests + # aren't distributed. + # + # TODO: handle long polling (case 2) for decision tasks + decision_candidates = [] + for wf_id, wf_execution in domain.workflow_executions.iteritems(): + decision_candidates += wf_execution.scheduled_decision_tasks + if any(decision_candidates): + # TODO: handle task priorities (but not supported by boto for now) + decision = min(decision_candidates, key=lambda d: d.scheduled_at) + wfe = decision.workflow_execution + wfe.start_decision_task(decision.task_token, identity=identity) + return decision + else: + return None + swf_backends = {} for region in boto.swf.regions(): diff --git a/moto/swf/models/decision_task.py b/moto/swf/models/decision_task.py new file mode 100644 index 000000000..60d0f7131 --- /dev/null +++ b/moto/swf/models/decision_task.py @@ -0,0 +1,35 @@ +from __future__ import unicode_literals +from datetime import datetime +import uuid + + +class DecisionTask(object): + def __init__(self, workflow_execution, scheduled_event_id): + self.workflow_execution = workflow_execution + self.workflow_type = workflow_execution.workflow_type + self.task_token = str(uuid.uuid4()) + self.scheduled_event_id = scheduled_event_id + self.previous_started_event_id = 0 + self.started_event_id = None + self.state = "SCHEDULED" + # this is *not* necessarily coherent with workflow execution history, + # but that shouldn't be a problem for tests + self.scheduled_at = datetime.now() + + def to_full_dict(self): + hsh = { + "events": [ + evt.to_dict() for evt in self.workflow_execution.events + ], + "taskToken": self.task_token, + "previousStartedEventId": self.previous_started_event_id, + "workflowExecution": self.workflow_execution.to_short_dict(), + "workflowType": self.workflow_type.to_short_dict(), + } + if self.started_event_id: + hsh["startedEventId"] = self.started_event_id + return hsh + + def start(self, started_event_id): + self.state = "STARTED" + self.started_event_id = started_event_id diff --git a/moto/swf/models/history_event.py b/moto/swf/models/history_event.py index c88f9fbe9..93d0cfe41 100644 --- a/moto/swf/models/history_event.py +++ b/moto/swf/models/history_event.py @@ -50,9 +50,12 @@ class HistoryEvent(object): "taskList": {"name": wfe.task_list} } elif self.event_type == "DecisionTaskStarted": - return { + hsh = { "scheduledEventId": self.scheduled_event_id } + if hasattr(self, "identity") and self.identity: + hsh["identity"] = self.identity + return hsh else: raise NotImplementedError( "HistoryEvent does not implement attributes for type '{}'".format(self.event_type) diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index fa6d28dd0..110001641 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -4,6 +4,7 @@ import uuid from moto.core.utils import camelcase_to_underscores from ..exceptions import SWFDefaultUndefinedFault +from .decision_task import DecisionTask from .history_event import HistoryEvent @@ -32,6 +33,10 @@ class WorkflowExecution(object): } # events self.events = [] + # tasks + self.decision_tasks = [] + self.activity_tasks = [] + self.child_workflow_executions = [] def __repr__(self): return "WorkflowExecution(run_id: {})".format(self.run_id) @@ -99,13 +104,44 @@ class WorkflowExecution(object): def _add_event(self, *args, **kwargs): evt = HistoryEvent(self.next_event_id(), *args, **kwargs) self.events.append(evt) + return evt def start(self): self._add_event( "WorkflowExecutionStarted", workflow_execution=self, ) - self._add_event( + self.schedule_decision_task() + + def schedule_decision_task(self): + self.open_counts["openDecisionTasks"] += 1 + evt = self._add_event( "DecisionTaskScheduled", workflow_execution=self, ) + self.decision_tasks.append(DecisionTask(self, evt.event_id)) + + @property + def scheduled_decision_tasks(self): + return filter( + lambda t: t.state == "SCHEDULED", + self.decision_tasks + ) + + def _find_decision_task(self, task_token): + for dt in self.decision_tasks: + if dt.task_token == task_token: + return dt + raise ValueError( + "No decision task with token: {}".format(task_token) + ) + + def start_decision_task(self, task_token, identity=None): + dt = self._find_decision_task(task_token) + evt = self._add_event( + "DecisionTaskStarted", + workflow_execution=self, + scheduled_event_id=dt.scheduled_event_id, + identity=identity + ) + dt.start(evt.event_id) diff --git a/moto/swf/responses.py b/moto/swf/responses.py index 8f7aa0344..cd2b99d6c 100644 --- a/moto/swf/responses.py +++ b/moto/swf/responses.py @@ -221,3 +221,16 @@ class SWFResponse(BaseResponse): return json.dumps({ "events": [evt.to_dict() for evt in wfe.events] }) + + def poll_for_decision_task(self): + domain_name = self._params["domain"] + task_list = self._params["taskList"]["name"] + identity = self._params.get("identity") + # TODO: implement reverseOrder + decision = self.swf_backend.poll_for_decision_task( + domain_name, task_list, identity=identity + ) + if decision: + return json.dumps(decision.to_full_dict()) + else: + return json.dumps({"previousStartedEventId": 0, "startedEventId": 0}) diff --git a/tests/test_swf/test_decision_tasks.py b/tests/test_swf/test_decision_tasks.py new file mode 100644 index 000000000..133b830ab --- /dev/null +++ b/tests/test_swf/test_decision_tasks.py @@ -0,0 +1,46 @@ +import boto +from sure import expect + +from moto import mock_swf +from moto.swf.exceptions import ( + SWFUnknownResourceFault, +) + +from .utils import mock_basic_workflow_type + + +@mock_swf +def setup_workflow(): + conn = boto.connect_swf("the_key", "the_secret") + conn.register_domain("test-domain", "60", description="A test domain") + conn = mock_basic_workflow_type("test-domain", conn) + conn.register_activity_type("test-domain", "test-activity", "v1.1") + wfe = conn.start_workflow_execution("test-domain", "uid-abcd1234", "test-workflow", "v1.0") + conn.run_id = wfe["runId"] + return conn + + +# PollForDecisionTask endpoint +@mock_swf +def test_poll_for_decision_task_when_one(): + conn = setup_workflow() + + resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234") + types = [evt["eventType"] for evt in resp["events"]] + types.should.equal(["WorkflowExecutionStarted", "DecisionTaskScheduled"]) + + resp = conn.poll_for_decision_task("test-domain", "queue", identity="srv01") + types = [evt["eventType"] for evt in resp["events"]] + types.should.equal(["WorkflowExecutionStarted", "DecisionTaskScheduled", "DecisionTaskStarted"]) + + resp["events"][-1]["decisionTaskStartedEventAttributes"]["identity"].should.equal("srv01") + +@mock_swf +def test_poll_for_decision_task_when_none(): + conn = setup_workflow() + conn.poll_for_decision_task("test-domain", "queue") + + resp = conn.poll_for_decision_task("test-domain", "queue") + # this is the DecisionTask representation you get from the real SWF + # after waiting 60s when there's no decision to be taken + resp.should.equal({"previousStartedEventId": 0, "startedEventId": 0}) diff --git a/tests/test_swf/test_models.py b/tests/test_swf/test_models.py index 813afc3ca..e7153898f 100644 --- a/tests/test_swf/test_models.py +++ b/tests/test_swf/test_models.py @@ -2,6 +2,7 @@ from sure import expect from freezegun import freeze_time from moto.swf.models import ( + DecisionTask, Domain, GenericType, HistoryEvent, @@ -115,7 +116,6 @@ def test_workflow_execution_creation_child_policy_logic(): WorkflowType("test-workflow", "v1.0"), "ab1234" ).should.throw(SWFDefaultUndefinedFault) - def test_workflow_execution_string_representation(): wft = get_basic_workflow_type() wfe = WorkflowExecution(wft, "ab1234", child_policy="TERMINATE") @@ -182,6 +182,24 @@ def test_workflow_execution_full_dict_representation(): "taskStartToCloseTimeout": "300", }) +def test_workflow_execution_schedule_decision_task(): + wft = get_basic_workflow_type() + wfe = WorkflowExecution(wft, "ab1234") + wfe.open_counts["openDecisionTasks"].should.equal(0) + wfe.schedule_decision_task() + wfe.open_counts["openDecisionTasks"].should.equal(1) + +def test_workflow_execution_start_decision_task(): + wft = get_basic_workflow_type() + wfe = WorkflowExecution(wft, "ab1234") + wfe.schedule_decision_task() + dt = wfe.decision_tasks[0] + wfe.start_decision_task(dt.task_token, identity="srv01") + dt = wfe.decision_tasks[0] + dt.state.should.equal("STARTED") + wfe.events[-1].event_type.should.equal("DecisionTaskStarted") + wfe.events[-1].identity.should.equal("srv01") + # HistoryEvent @freeze_time("2015-01-01 12:00:00") @@ -207,3 +225,31 @@ def test_history_event_breaks_on_initialization_if_not_implemented(): HistoryEvent.when.called_with( 123, "UnknownHistoryEvent" ).should.throw(NotImplementedError) + + +# DecisionTask +def test_decision_task_creation(): + wft = get_basic_workflow_type() + wfe = WorkflowExecution(wft, "ab1234") + dt = DecisionTask(wfe, 123) + dt.workflow_execution.should.equal(wfe) + dt.state.should.equal("SCHEDULED") + dt.task_token.should_not.be.empty + dt.started_event_id.should.be.none + +def test_decision_task_full_dict_representation(): + wft = get_basic_workflow_type() + wfe = WorkflowExecution(wft, "ab1234") + dt = DecisionTask(wfe, 123) + + fd = dt.to_full_dict() + fd["events"].should.be.a("list") + fd["previousStartedEventId"].should.equal(0) + fd.should_not.contain("startedEventId") + fd.should.contain("taskToken") + fd["workflowExecution"].should.equal(wfe.to_short_dict()) + fd["workflowType"].should.equal(wft.to_short_dict()) + + dt.start(1234) + fd = dt.to_full_dict() + fd["startedEventId"].should.equal(1234)