From 90c8797abdb33793dbaf62139e2cccc13a852d83 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Barth Date: Tue, 3 Nov 2015 00:28:13 +0100 Subject: [PATCH] Implement heartbeat timeout on SWF activity tasks --- moto/swf/models/__init__.py | 27 +++++++++++++++ moto/swf/models/activity_task.py | 19 ++++++++++- moto/swf/models/history_event.py | 9 +++++ moto/swf/models/workflow_execution.py | 21 ++++++++++++ tests/test_swf/models/test_activity_task.py | 26 +++++++++++++- .../test_swf/responses/test_activity_tasks.py | 33 +++++++++++------- tests/test_swf/responses/test_timeouts.py | 34 +++++++++++++++++++ tests/test_swf/utils.py | 23 +++++++++++++ 8 files changed, 178 insertions(+), 14 deletions(-) create mode 100644 tests/test_swf/responses/test_timeouts.py diff --git a/moto/swf/models/__init__.py b/moto/swf/models/__init__.py index 37c7827d7..13566ffe5 100644 --- a/moto/swf/models/__init__.py +++ b/moto/swf/models/__init__.py @@ -61,6 +61,11 @@ class SWFBackend(BaseBackend): if not isinstance(i, basestring): raise SWFSerializationException(parameter) + def _process_timeouts(self): + for domain in self.domains: + for wfe in domain.workflow_executions: + wfe._process_timeouts() + def list_domains(self, status, reverse_order=None): self._check_string(status) domains = [domain for domain in self.domains @@ -159,12 +164,16 @@ class SWFBackend(BaseBackend): self._check_string(domain_name) self._check_string(run_id) self._check_string(workflow_id) + # process timeouts on all objects + self._process_timeouts() domain = self._get_domain(domain_name) return domain.get_workflow_execution(workflow_id, run_id=run_id) def poll_for_decision_task(self, domain_name, task_list, identity=None): self._check_string(domain_name) self._check_string(task_list) + # process timeouts on all objects + self._process_timeouts() domain = self._get_domain(domain_name) # Real SWF cases: # - case 1: there's a decision task to return, return it @@ -196,6 +205,8 @@ class SWFBackend(BaseBackend): def count_pending_decision_tasks(self, domain_name, task_list): self._check_string(domain_name) self._check_string(task_list) + # process timeouts on all objects + self._process_timeouts() domain = self._get_domain(domain_name) count = 0 for wfe in domain.workflow_executions: @@ -208,6 +219,8 @@ class SWFBackend(BaseBackend): execution_context=None): self._check_string(task_token) self._check_none_or_string(execution_context) + # process timeouts on all objects + self._process_timeouts() # let's find decision task decision_task = None for domain in self.domains: @@ -259,6 +272,8 @@ class SWFBackend(BaseBackend): def poll_for_activity_task(self, domain_name, task_list, identity=None): self._check_string(domain_name) self._check_string(task_list) + # process timeouts on all objects + self._process_timeouts() domain = self._get_domain(domain_name) # Real SWF cases: # - case 1: there's an activity task to return, return it @@ -290,6 +305,8 @@ class SWFBackend(BaseBackend): def count_pending_activity_tasks(self, domain_name, task_list): self._check_string(domain_name) self._check_string(task_list) + # process timeouts on all objects + self._process_timeouts() domain = self._get_domain(domain_name) count = 0 for _task_list, tasks in domain.activity_task_lists.iteritems(): @@ -339,6 +356,8 @@ class SWFBackend(BaseBackend): def respond_activity_task_completed(self, task_token, result=None): self._check_string(task_token) self._check_none_or_string(result) + # process timeouts on all objects + self._process_timeouts() activity_task = self._find_activity_task_from_token(task_token) wfe = activity_task.workflow_execution wfe.complete_activity_task(activity_task.task_token, result=result) @@ -348,6 +367,8 @@ class SWFBackend(BaseBackend): # TODO: implement length limits on reason and details (common pb with client libs) self._check_none_or_string(reason) self._check_none_or_string(details) + # process timeouts on all objects + self._process_timeouts() activity_task = self._find_activity_task_from_token(task_token) wfe = activity_task.workflow_execution wfe.fail_activity_task(activity_task.task_token, reason=reason, details=details) @@ -360,6 +381,8 @@ class SWFBackend(BaseBackend): self._check_none_or_string(details) self._check_none_or_string(reason) self._check_none_or_string(run_id) + # process timeouts on all objects + self._process_timeouts() domain = self._get_domain(domain_name) wfe = domain.get_workflow_execution(workflow_id, run_id=run_id, raise_if_closed=True) wfe.terminate(child_policy=child_policy, details=details, reason=reason) @@ -367,8 +390,12 @@ class SWFBackend(BaseBackend): def record_activity_task_heartbeat(self, task_token, details=None): self._check_string(task_token) self._check_none_or_string(details) + # process timeouts on all objects + self._process_timeouts() activity_task = self._find_activity_task_from_token(task_token) activity_task.reset_heartbeat_clock() + if details: + activity_task.details = details swf_backends = {} diff --git a/moto/swf/models/activity_task.py b/moto/swf/models/activity_task.py index b099bc677..635c371ca 100644 --- a/moto/swf/models/activity_task.py +++ b/moto/swf/models/activity_task.py @@ -7,20 +7,27 @@ from ..utils import now_timestamp class ActivityTask(object): def __init__(self, activity_id, activity_type, scheduled_event_id, - workflow_execution, input=None): + workflow_execution, timeouts, input=None): self.activity_id = activity_id self.activity_type = activity_type + self.details = None self.input = input self.last_heartbeat_timestamp = now_timestamp() self.scheduled_event_id = scheduled_event_id self.started_event_id = None self.state = "SCHEDULED" self.task_token = str(uuid.uuid4()) + self.timeouts = timeouts + self.timeout_type = None self.workflow_execution = workflow_execution # this is *not* necessarily coherent with workflow execution history, # but that shouldn't be a problem for tests self.scheduled_at = datetime.now() + @property + def open(self): + return self.state in ["SCHEDULED", "STARTED"] + def to_full_dict(self): hsh = { "activityId": self.activity_id, @@ -45,3 +52,13 @@ class ActivityTask(object): def reset_heartbeat_clock(self): self.last_heartbeat_timestamp = now_timestamp() + + def has_timedout(self): + heartbeat_timeout_at = self.last_heartbeat_timestamp + \ + int(self.timeouts["heartbeatTimeout"]) + return heartbeat_timeout_at < now_timestamp() + + def process_timeouts(self): + if self.has_timedout(): + self.state = "TIMED_OUT" + self.timeout_type = "HEARTBEAT" diff --git a/moto/swf/models/history_event.py b/moto/swf/models/history_event.py index aa5d09498..37bf9b62b 100644 --- a/moto/swf/models/history_event.py +++ b/moto/swf/models/history_event.py @@ -143,6 +143,15 @@ class HistoryEvent(object): if self.reason: hsh["reason"] = self.reason return hsh + elif self.event_type == "ActivityTaskTimedOut": + hsh = { + "scheduledEventId": self.scheduled_event_id, + "startedEventId": self.started_event_id, + "timeoutType": self.timeout_type, + } + if self.details: + hsh["details"] = self.details + return hsh else: raise NotImplementedError( "HistoryEvent does not implement attributes for type '{}'".format(self.event_type) diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index 5674e892a..cb4cf89af 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -146,6 +146,26 @@ class WorkflowExecution(object): hsh["latestActivityTaskTimestamp"] = self.latest_activity_task_timestamp return hsh + def _process_timeouts(self): + self.should_schedule_decision_next = False + # TODO: process timeouts on workflow itself + # TODO: process timeouts on decision tasks + # activity tasks timeouts + for task in self.activity_tasks: + if task.open and task.has_timedout(): + self.should_schedule_decision_next = True + task.process_timeouts() + self._add_event( + "ActivityTaskTimedOut", + details=task.details, + scheduled_event_id=task.scheduled_event_id, + started_event_id=task.started_event_id, + timeout_type=task.timeout_type, + ) + # schedule decision task if needed + if self.should_schedule_decision_next: + self.schedule_decision_task() + def events(self, reverse_order=False): if reverse_order: return reversed(self._events) @@ -416,6 +436,7 @@ class WorkflowExecution(object): input=attributes.get("input"), scheduled_event_id=evt.event_id, workflow_execution=self, + timeouts=timeouts, ) self.domain.add_to_activity_task_list(task_list, task) self.open_counts["openActivityTasks"] += 1 diff --git a/tests/test_swf/models/test_activity_task.py b/tests/test_swf/models/test_activity_task.py index 5c47d091e..53e9c2cf8 100644 --- a/tests/test_swf/models/test_activity_task.py +++ b/tests/test_swf/models/test_activity_task.py @@ -6,7 +6,7 @@ from moto.swf.models import ( ActivityType, ) -from ..utils import make_workflow_execution +from ..utils import make_workflow_execution, ACTIVITY_TASK_TIMEOUTS def test_activity_task_creation(): @@ -17,6 +17,7 @@ def test_activity_task_creation(): input="optional", scheduled_event_id=117, workflow_execution=wfe, + timeouts=ACTIVITY_TASK_TIMEOUTS, ) task.workflow_execution.should.equal(wfe) task.state.should.equal("SCHEDULED") @@ -44,6 +45,7 @@ def test_activity_task_full_dict_representation(): activity_type=ActivityType("foo", "v1.0"), input="optional", scheduled_event_id=117, + timeouts=ACTIVITY_TASK_TIMEOUTS, workflow_execution=wfe, ) at.start(1234) @@ -69,6 +71,7 @@ def test_activity_task_reset_heartbeat_clock(): activity_type="foo", input="optional", scheduled_event_id=117, + timeouts=ACTIVITY_TASK_TIMEOUTS, workflow_execution=wfe, ) @@ -78,3 +81,24 @@ def test_activity_task_reset_heartbeat_clock(): task.reset_heartbeat_clock() task.last_heartbeat_timestamp.should.equal(1420113600.0) + +def test_activity_task_has_timedout(): + wfe = make_workflow_execution() + + with freeze_time("2015-01-01 12:00:00"): + task = ActivityTask( + activity_id="my-activity-123", + activity_type="foo", + input="optional", + scheduled_event_id=117, + timeouts=ACTIVITY_TASK_TIMEOUTS, + workflow_execution=wfe, + ) + task.has_timedout().should.equal(False) + + # activity task timeout is 300s == 5mins + with freeze_time("2015-01-01 12:06:00"): + task.has_timedout().should.equal(True) + task.process_timeouts() + task.state.should.equal("TIMED_OUT") + task.timeout_type.should.equal("HEARTBEAT") diff --git a/tests/test_swf/responses/test_activity_tasks.py b/tests/test_swf/responses/test_activity_tasks.py index 643c38f3f..6f84c663e 100644 --- a/tests/test_swf/responses/test_activity_tasks.py +++ b/tests/test_swf/responses/test_activity_tasks.py @@ -1,4 +1,5 @@ import boto +from freezegun import freeze_time from sure import expect from moto import mock_swf @@ -8,18 +9,9 @@ from moto.swf.exceptions import ( SWFUnknownResourceFault, ) -from ..utils import setup_workflow +from ..utils import setup_workflow, SCHEDULE_ACTIVITY_TASK_DECISION -SCHEDULE_ACTIVITY_TASK_DECISION = { - "decisionType": "ScheduleActivityTask", - "scheduleActivityTaskDecisionAttributes": { - "activityId": "my-activity-001", - "activityType": { "name": "test-activity", "version": "v1.1" }, - "taskList": { "name": "activity-task-list" }, - } -} - # PollForActivityTask endpoint @mock_swf def test_poll_for_activity_task_when_one(): @@ -183,8 +175,7 @@ def test_record_activity_task_heartbeat(): ]) activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"] - resp = conn.record_activity_task_heartbeat(activity_token, details="some progress details") - # TODO: check that "details" are reflected in ActivityTaskTimedOut event when a timeout occurs + resp = conn.record_activity_task_heartbeat(activity_token) resp.should.equal({"cancelRequested": False}) @mock_swf @@ -199,3 +190,21 @@ def test_record_activity_task_heartbeat_with_wrong_token(): conn.record_activity_task_heartbeat.when.called_with( "bad-token", details="some progress details" ).should.throw(SWFValidationException) + +@mock_swf +def test_record_activity_task_heartbeat_sets_details_in_case_of_timeout(): + conn = setup_workflow() + decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"] + conn.respond_decision_task_completed(decision_token, decisions=[ + SCHEDULE_ACTIVITY_TASK_DECISION + ]) + with freeze_time("2015-01-01 12:00:00"): + activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"] + conn.record_activity_task_heartbeat(activity_token, details="some progress details") + + with freeze_time("2015-01-01 12:05:30"): + # => Activity Task Heartbeat timeout reached!! + resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234") + resp["events"][-2]["eventType"].should.equal("ActivityTaskTimedOut") + attrs = resp["events"][-2]["activityTaskTimedOutEventAttributes"] + attrs["details"].should.equal("some progress details") diff --git a/tests/test_swf/responses/test_timeouts.py b/tests/test_swf/responses/test_timeouts.py new file mode 100644 index 000000000..ca2377795 --- /dev/null +++ b/tests/test_swf/responses/test_timeouts.py @@ -0,0 +1,34 @@ +import boto +from freezegun import freeze_time +from sure import expect + +from moto import mock_swf + +from ..utils import setup_workflow, SCHEDULE_ACTIVITY_TASK_DECISION + + +# Activity Task Heartbeat timeout +# Default value in workflow helpers: 5 mins +@mock_swf +def test_activity_task_heartbeat_timeout(): + with freeze_time("2015-01-01 12:00:00"): + conn = setup_workflow() + decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"] + conn.respond_decision_task_completed(decision_token, decisions=[ + SCHEDULE_ACTIVITY_TASK_DECISION + ]) + conn.poll_for_activity_task("test-domain", "activity-task-list", identity="surprise") + + with freeze_time("2015-01-01 12:04:30"): + resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234") + resp["events"][-1]["eventType"].should.equal("ActivityTaskStarted") + + with freeze_time("2015-01-01 12:05:30"): + # => Activity Task Heartbeat timeout reached!! + resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234") + + resp["events"][-2]["eventType"].should.equal("ActivityTaskTimedOut") + attrs = resp["events"][-2]["activityTaskTimedOutEventAttributes"] + attrs["timeoutType"].should.equal("HEARTBEAT") + + resp["events"][-1]["eventType"].should.equal("DecisionTaskScheduled") diff --git a/tests/test_swf/utils.py b/tests/test_swf/utils.py index 6fcec9d46..e6d73fe9a 100644 --- a/tests/test_swf/utils.py +++ b/tests/test_swf/utils.py @@ -9,6 +9,29 @@ from moto.swf.models import ( ) +# Some useful constants +# Here are some activity timeouts we use in moto/swf tests ; they're extracted +# from semi-real world example, the goal is mostly to have predictible and +# intuitive behaviour in moto/swf own tests... +ACTIVITY_TASK_TIMEOUTS = { + "heartbeatTimeout": "300", # 5 mins + "scheduleToStartTimeout": "1800", # 30 mins + "startToCloseTimeout": "1800", # 30 mins + "scheduleToCloseTimeout": "2700", # 45 mins +} + +# Some useful decisions +SCHEDULE_ACTIVITY_TASK_DECISION = { + "decisionType": "ScheduleActivityTask", + "scheduleActivityTaskDecisionAttributes": { + "activityId": "my-activity-001", + "activityType": { "name": "test-activity", "version": "v1.1" }, + "taskList": { "name": "activity-task-list" }, + } +} +for key, value in ACTIVITY_TASK_TIMEOUTS.iteritems(): + SCHEDULE_ACTIVITY_TASK_DECISION["scheduleActivityTaskDecisionAttributes"][key] = value + # A test Domain def get_basic_domain(): return Domain("test-domain", "90")