diff --git a/moto/swf/models/domain.py b/moto/swf/models/domain.py index 4b30d3932..4ed914528 100644 --- a/moto/swf/models/domain.py +++ b/moto/swf/models/domain.py @@ -23,6 +23,7 @@ class Domain(object): # of "workflow_id (client determined)" => WorkflowExecution() # here. self.workflow_executions = {} + self.task_lists = defaultdict(list) def __repr__(self): return "Domain(name: %(name)s, status: %(status)s)" % self.__dict__ @@ -83,3 +84,6 @@ class Domain(object): ) ) return wfe + + def add_to_task_list(self, task_list, obj): + self.task_lists[task_list].append(obj) diff --git a/moto/swf/models/generic_type.py b/moto/swf/models/generic_type.py index 334382ecd..904296ab3 100644 --- a/moto/swf/models/generic_type.py +++ b/moto/swf/models/generic_type.py @@ -12,6 +12,13 @@ class GenericType(object): self.description = kwargs.pop("description") for key, value in kwargs.iteritems(): self.__setattr__(key, value) + # default values set to none + for key in self._configuration_keys: + attr = camelcase_to_underscores(key) + if not hasattr(self, attr): + self.__setattr__(attr, None) + if not hasattr(self, "task_list"): + self.task_list = None def __repr__(self): cls = self.__class__.__name__ @@ -49,12 +56,10 @@ class GenericType(object): "typeInfo": self.to_medium_dict(), "configuration": {} } - if hasattr(self, "task_list"): + if self.task_list: hsh["configuration"]["defaultTaskList"] = {"name": self.task_list} for key in self._configuration_keys: attr = camelcase_to_underscores(key) - if not hasattr(self, attr): - continue if not getattr(self, attr): continue hsh["configuration"][key] = getattr(self, attr) diff --git a/moto/swf/models/history_event.py b/moto/swf/models/history_event.py index 0cb8e83eb..eca00fb0b 100644 --- a/moto/swf/models/history_event.py +++ b/moto/swf/models/history_event.py @@ -81,6 +81,30 @@ class HistoryEvent(object): if hasattr(self, "reason") and self.reason: hsh["reason"] = self.reason return hsh + elif self.event_type == "ActivityTaskScheduled": + hsh = { + "activityId": self.attributes["activityId"], + "activityType": self.activity_type.to_short_dict(), + "decisionTaskCompletedEventId": self.decision_task_completed_event_id, + "taskList": { + "name": self.task_list, + }, + } + for attr in ["control", "heartbeatTimeout", "input", "scheduleToCloseTimeout", + "scheduleToStartTimeout", "startToCloseTimeout", "taskPriority"]: + if self.attributes.get(attr): + hsh[attr] = self.attributes[attr] + return hsh + elif self.event_type == "ScheduleActivityTaskFailed": + # TODO: implement other possible failure mode: OPEN_ACTIVITIES_LIMIT_EXCEEDED + # NB: some failure modes are not implemented and probably won't be implemented in the + # future, such as ACTIVITY_CREATION_RATE_EXCEEDED or OPERATION_NOT_PERMITTED + return { + "activityId": self.activity_id, + "activityType": self.activity_type.to_short_dict(), + "cause": self.cause, + "decisionTaskCompletedEventId": self.decision_task_completed_event_id, + } else: raise NotImplementedError( "HistoryEvent does not implement attributes for type '{}'".format(self.event_type) diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index 5445f9a71..df75a99b8 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -14,6 +14,8 @@ from ..exceptions import ( SWFDecisionValidationException, ) from ..utils import decapitalize +from .activity_task import ActivityTask +from .activity_type import ActivityType from .decision_task import DecisionTask from .history_event import HistoryEvent @@ -209,7 +211,10 @@ class WorkflowExecution(object): execution_context=execution_context, ) dt.complete() + self.should_schedule_decision_next = False self.handle_decisions(evt.event_id, decisions) + if self.should_schedule_decision_next: + self.schedule_decision_task() def _check_decision_attributes(self, kind, value, decision_id): problems = [] @@ -293,6 +298,8 @@ class WorkflowExecution(object): self.complete(event_id, attributes.get("result")) elif decision_type == "FailWorkflowExecution": self.fail(event_id, attributes.get("details"), attributes.get("reason")) + elif decision_type == "ScheduleActivityTask": + self.schedule_activity_task(event_id, attributes) else: # TODO: implement Decision type: CancelTimer # TODO: implement Decision type: CancelWorkflowExecution @@ -300,7 +307,6 @@ class WorkflowExecution(object): # TODO: implement Decision type: RecordMarker # TODO: implement Decision type: RequestCancelActivityTask # TODO: implement Decision type: RequestCancelExternalWorkflowExecution - # TODO: implement Decision type: ScheduleActivityTask # TODO: implement Decision type: ScheduleLambdaFunction # TODO: implement Decision type: SignalExternalWorkflowExecution # TODO: implement Decision type: StartChildWorkflowExecution @@ -331,3 +337,96 @@ class WorkflowExecution(object): details=details, reason=reason, ) + + def schedule_activity_task(self, event_id, attributes): + activity_type = self.domain.get_type( + "activity", + attributes["activityType"]["name"], + attributes["activityType"]["version"], + ignore_empty=True, + ) + if not activity_type: + fake_type = ActivityType(attributes["activityType"]["name"], + attributes["activityType"]["version"]) + self._add_event( + "ScheduleActivityTaskFailed", + activity_id=attributes["activityId"], + activity_type=fake_type, + cause="ACTIVITY_TYPE_DOES_NOT_EXIST", + decision_task_completed_event_id=event_id, + ) + self.should_schedule_decision_next = True + return + if activity_type.status == "DEPRECATED": + self._add_event( + "ScheduleActivityTaskFailed", + activity_id=attributes["activityId"], + activity_type=activity_type, + cause="ACTIVITY_TYPE_DEPRECATED", + decision_task_completed_event_id=event_id, + ) + self.should_schedule_decision_next = True + return + if any(at for at in self.activity_tasks + if at.activity_id == attributes["activityId"]): + self._add_event( + "ScheduleActivityTaskFailed", + activity_id=attributes["activityId"], + activity_type=activity_type, + cause="ACTIVITY_ID_ALREADY_IN_USE", + decision_task_completed_event_id=event_id, + ) + self.should_schedule_decision_next = True + return + + # find task list or default task list, else fail + task_list = attributes.get("taskList", {}).get("name") + if not task_list and activity_type.task_list: + task_list = activity_type.task_list + if not task_list: + self._add_event( + "ScheduleActivityTaskFailed", + activity_id=attributes["activityId"], + activity_type=activity_type, + cause="DEFAULT_TASK_LIST_UNDEFINED", + decision_task_completed_event_id=event_id, + ) + self.should_schedule_decision_next = True + return + + # find timeouts or default timeout, else fail + timeouts = {} + for _type in ["scheduleToStartTimeout", "scheduleToCloseTimeout", "startToCloseTimeout", "heartbeatTimeout"]: + default_key = "default_task_"+camelcase_to_underscores(_type) + default_value = getattr(activity_type, default_key) + timeouts[_type] = attributes.get(_type, default_value) + if not timeouts[_type]: + error_key = default_key.replace("default_task_", "default_") + self._add_event( + "ScheduleActivityTaskFailed", + activity_id=attributes["activityId"], + activity_type=activity_type, + cause="{}_UNDEFINED".format(error_key.upper()), + decision_task_completed_event_id=event_id, + ) + self.should_schedule_decision_next = True + return + + task = ActivityTask( + activity_id=attributes["activityId"], + activity_type=activity_type, + input=attributes.get("input"), + workflow_execution=self, + ) + # Only add event and increment counters if nothing went wrong + # TODO: don't store activity tasks in 2 places... + self.activity_tasks.append(task) + self.domain.add_to_task_list(task_list, task) + self._add_event( + "ActivityTaskScheduled", + decision_task_completed_event_id=event_id, + activity_type=activity_type, + attributes=attributes, + task_list=task_list, + ) + self.open_counts["openActivityTasks"] += 1 diff --git a/tests/test_swf/test_decision_tasks.py b/tests/test_swf/test_decision_tasks.py index 568d68c2d..ecf59223f 100644 --- a/tests/test_swf/test_decision_tasks.py +++ b/tests/test_swf/test_decision_tasks.py @@ -17,7 +17,13 @@ def setup_workflow(): conn = boto.connect_swf("the_key", "the_secret") conn.register_domain("test-domain", "60", description="A test domain") conn = mock_basic_workflow_type("test-domain", conn) - conn.register_activity_type("test-domain", "test-activity", "v1.1") + conn.register_activity_type( + "test-domain", "test-activity", "v1.1", + default_task_heartbeat_timeout="600", + default_task_schedule_to_close_timeout="600", + default_task_schedule_to_start_timeout="600", + default_task_start_to_close_timeout="600", + ) wfe = conn.start_workflow_execution("test-domain", "uid-abcd1234", "test-workflow", "v1.0") conn.run_id = wfe["runId"] return conn @@ -266,3 +272,50 @@ def test_respond_decision_task_completed_with_fail_workflow_execution(): attrs = resp["events"][-1]["workflowExecutionFailedEventAttributes"] attrs["reason"].should.equal("my rules") attrs["details"].should.equal("foo") + +@mock_swf +def test_respond_decision_task_completed_with_schedule_activity_task(): + conn = setup_workflow() + resp = conn.poll_for_decision_task("test-domain", "queue") + task_token = resp["taskToken"] + + decisions = [{ + "decisionType": "ScheduleActivityTask", + "scheduleActivityTaskDecisionAttributes": { + "activityId": "my-activity-001", + "activityType": { + "name": "test-activity", + "version": "v1.1" + }, + "heartbeatTimeout": "60", + "input": "123", + "taskList": { + "name": "my-task-list" + }, + } + }] + resp = conn.respond_decision_task_completed(task_token, decisions=decisions) + resp.should.be.none + + resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234") + types = [evt["eventType"] for evt in resp["events"]] + types.should.equal([ + "WorkflowExecutionStarted", + "DecisionTaskScheduled", + "DecisionTaskStarted", + "DecisionTaskCompleted", + "ActivityTaskScheduled", + ]) + resp["events"][-1]["activityTaskScheduledEventAttributes"].should.equal({ + "decisionTaskCompletedEventId": 4, + "activityId": "my-activity-001", + "activityType": { + "name": "test-activity", + "version": "v1.1", + }, + "heartbeatTimeout": "60", + "input": "123", + "taskList": { + "name": "my-task-list" + }, + }) diff --git a/tests/test_swf/test_models.py b/tests/test_swf/test_models.py index 45e541e75..a298139e8 100644 --- a/tests/test_swf/test_models.py +++ b/tests/test_swf/test_models.py @@ -3,6 +3,7 @@ from freezegun import freeze_time from moto.swf.models import ( ActivityTask, + ActivityType, DecisionTask, Domain, GenericType, @@ -24,6 +25,7 @@ from .utils import ( # TODO: move them in utils def make_workflow_execution(**kwargs): domain = get_basic_domain() + domain.add_type(ActivityType("test-activity", "v1.1")) wft = get_basic_workflow_type() return WorkflowExecution(domain, wft, "ab1234", **kwargs) @@ -47,6 +49,13 @@ def test_domain_string_representation(): domain = Domain("my-domain", "60") str(domain).should.equal("Domain(name: my-domain, status: REGISTERED)") +def test_domain_add_to_task_list(): + domain = Domain("my-domain", "60") + domain.add_to_task_list("foo", "bar") + dict(domain.task_lists).should.equal({ + "foo": ["bar"] + }) + # GenericType (ActivityType, WorkflowType) class FooType(GenericType): @@ -265,6 +274,178 @@ def test_workflow_execution_fail(): wfe.events()[-1].details.should.equal("some details") wfe.events()[-1].reason.should.equal("my rules") +def test_workflow_execution_schedule_activity_task(): + wfe = make_workflow_execution() + wfe.schedule_activity_task(123, { + "activityId": "my-activity-001", + "activityType": { "name": "test-activity", "version": "v1.1" }, + "taskList": { "name": "task-list-name" }, + "scheduleToStartTimeout": "600", + "scheduleToCloseTimeout": "600", + "startToCloseTimeout": "600", + "heartbeatTimeout": "300", + }) + + wfe.open_counts["openActivityTasks"].should.equal(1) + last_event = wfe.events()[-1] + last_event.event_type.should.equal("ActivityTaskScheduled") + last_event.decision_task_completed_event_id.should.equal(123) + last_event.task_list.should.equal("task-list-name") + + wfe.activity_tasks.should.have.length_of(1) + task = wfe.activity_tasks[0] + task.activity_id.should.equal("my-activity-001") + task.activity_type.name.should.equal("test-activity") + wfe.domain.task_lists["task-list-name"].should.contain(task) + +def test_workflow_execution_schedule_activity_task_without_task_list_should_take_default(): + wfe = make_workflow_execution() + wfe.domain.add_type( + ActivityType("test-activity", "v1.2", task_list="foobar") + ) + wfe.schedule_activity_task(123, { + "activityId": "my-activity-001", + "activityType": { "name": "test-activity", "version": "v1.2" }, + "scheduleToStartTimeout": "600", + "scheduleToCloseTimeout": "600", + "startToCloseTimeout": "600", + "heartbeatTimeout": "300", + }) + + wfe.open_counts["openActivityTasks"].should.equal(1) + last_event = wfe.events()[-1] + last_event.event_type.should.equal("ActivityTaskScheduled") + last_event.task_list.should.equal("foobar") + + task = wfe.activity_tasks[0] + wfe.domain.task_lists["foobar"].should.contain(task) + +def test_workflow_execution_schedule_activity_task_should_fail_if_wrong_attributes(): + wfe = make_workflow_execution() + at = ActivityType("test-activity", "v1.1") + at.status = "DEPRECATED" + wfe.domain.add_type(at) + wfe.domain.add_type(ActivityType("test-activity", "v1.2")) + + hsh = { + "activityId": "my-activity-001", + "activityType": { "name": "test-activity-does-not-exists", "version": "v1.1" }, + } + + wfe.schedule_activity_task(123, hsh) + last_event = wfe.events()[-1] + last_event.event_type.should.equal("ScheduleActivityTaskFailed") + last_event.cause.should.equal("ACTIVITY_TYPE_DOES_NOT_EXIST") + + hsh["activityType"]["name"] = "test-activity" + wfe.schedule_activity_task(123, hsh) + last_event = wfe.events()[-1] + last_event.event_type.should.equal("ScheduleActivityTaskFailed") + last_event.cause.should.equal("ACTIVITY_TYPE_DEPRECATED") + + hsh["activityType"]["version"] = "v1.2" + wfe.schedule_activity_task(123, hsh) + last_event = wfe.events()[-1] + last_event.event_type.should.equal("ScheduleActivityTaskFailed") + last_event.cause.should.equal("DEFAULT_TASK_LIST_UNDEFINED") + + hsh["taskList"] = { "name": "foobar" } + wfe.schedule_activity_task(123, hsh) + last_event = wfe.events()[-1] + last_event.event_type.should.equal("ScheduleActivityTaskFailed") + last_event.cause.should.equal("DEFAULT_SCHEDULE_TO_START_TIMEOUT_UNDEFINED") + + hsh["scheduleToStartTimeout"] = "600" + wfe.schedule_activity_task(123, hsh) + last_event = wfe.events()[-1] + last_event.event_type.should.equal("ScheduleActivityTaskFailed") + last_event.cause.should.equal("DEFAULT_SCHEDULE_TO_CLOSE_TIMEOUT_UNDEFINED") + + hsh["scheduleToCloseTimeout"] = "600" + wfe.schedule_activity_task(123, hsh) + last_event = wfe.events()[-1] + last_event.event_type.should.equal("ScheduleActivityTaskFailed") + last_event.cause.should.equal("DEFAULT_START_TO_CLOSE_TIMEOUT_UNDEFINED") + + hsh["startToCloseTimeout"] = "600" + wfe.schedule_activity_task(123, hsh) + last_event = wfe.events()[-1] + last_event.event_type.should.equal("ScheduleActivityTaskFailed") + last_event.cause.should.equal("DEFAULT_HEARTBEAT_TIMEOUT_UNDEFINED") + + wfe.open_counts["openActivityTasks"].should.equal(0) + wfe.activity_tasks.should.have.length_of(0) + wfe.domain.task_lists.should.have.length_of(0) + + hsh["heartbeatTimeout"] = "300" + wfe.schedule_activity_task(123, hsh) + last_event = wfe.events()[-1] + last_event.event_type.should.equal("ActivityTaskScheduled") + + task = wfe.activity_tasks[0] + wfe.domain.task_lists["foobar"].should.contain(task) + wfe.open_counts["openDecisionTasks"].should.equal(0) + wfe.open_counts["openActivityTasks"].should.equal(1) + +def test_workflow_execution_schedule_activity_task_failure_triggers_new_decision(): + wfe = make_workflow_execution() + wfe.start() + task_token = wfe.decision_tasks[-1].task_token + wfe.start_decision_task(task_token) + wfe.complete_decision_task(task_token, decisions=[ + { + "decisionType": "ScheduleActivityTask", + "scheduleActivityTaskDecisionAttributes": { + "activityId": "my-activity-001", + "activityType": { "name": "test-activity-does-not-exist", "version": "v1.2" }, + } + }, + { + "decisionType": "ScheduleActivityTask", + "scheduleActivityTaskDecisionAttributes": { + "activityId": "my-activity-001", + "activityType": { "name": "test-activity-does-not-exist", "version": "v1.2" }, + } + }, + ]) + + wfe.open_counts["openActivityTasks"].should.equal(0) + wfe.open_counts["openDecisionTasks"].should.equal(1) + last_events = wfe.events()[-3:] + last_events[0].event_type.should.equal("ScheduleActivityTaskFailed") + last_events[1].event_type.should.equal("ScheduleActivityTaskFailed") + last_events[2].event_type.should.equal("DecisionTaskScheduled") + +def test_workflow_execution_schedule_activity_task_with_same_activity_id(): + wfe = make_workflow_execution() + + wfe.schedule_activity_task(123, { + "activityId": "my-activity-001", + "activityType": { "name": "test-activity", "version": "v1.1" }, + "taskList": { "name": "task-list-name" }, + "scheduleToStartTimeout": "600", + "scheduleToCloseTimeout": "600", + "startToCloseTimeout": "600", + "heartbeatTimeout": "300", + }) + wfe.open_counts["openActivityTasks"].should.equal(1) + last_event = wfe.events()[-1] + last_event.event_type.should.equal("ActivityTaskScheduled") + + wfe.schedule_activity_task(123, { + "activityId": "my-activity-001", + "activityType": { "name": "test-activity", "version": "v1.1" }, + "taskList": { "name": "task-list-name" }, + "scheduleToStartTimeout": "600", + "scheduleToCloseTimeout": "600", + "startToCloseTimeout": "600", + "heartbeatTimeout": "300", + }) + wfe.open_counts["openActivityTasks"].should.equal(1) + last_event = wfe.events()[-1] + last_event.event_type.should.equal("ScheduleActivityTaskFailed") + last_event.cause.should.equal("ACTIVITY_ID_ALREADY_IN_USE") + # HistoryEvent @freeze_time("2015-01-01 12:00:00")