From f576f3765c03e25dd56516171cbcf9036061c702 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Barth Date: Mon, 2 Nov 2015 10:26:40 +0100 Subject: [PATCH] Add SWF endpoint RecordActivityTaskHeartbeat --- moto/swf/models/__init__.py | 6 ++++ moto/swf/models/activity_task.py | 6 ++++ moto/swf/models/history_event.py | 4 +-- moto/swf/models/workflow_execution.py | 14 ++++------ moto/swf/responses.py | 9 ++++++ moto/swf/utils.py | 7 +++++ tests/test_swf/models/test_activity_task.py | 20 +++++++++++++ .../test_swf/responses/test_activity_tasks.py | 28 +++++++++++++++++++ tests/test_swf/test_utils.py | 12 +++++++- 9 files changed, 94 insertions(+), 12 deletions(-) diff --git a/moto/swf/models/__init__.py b/moto/swf/models/__init__.py index 27f56a711..37c7827d7 100644 --- a/moto/swf/models/__init__.py +++ b/moto/swf/models/__init__.py @@ -364,6 +364,12 @@ class SWFBackend(BaseBackend): wfe = domain.get_workflow_execution(workflow_id, run_id=run_id, raise_if_closed=True) wfe.terminate(child_policy=child_policy, details=details, reason=reason) + def record_activity_task_heartbeat(self, task_token, details=None): + self._check_string(task_token) + self._check_none_or_string(details) + activity_task = self._find_activity_task_from_token(task_token) + activity_task.reset_heartbeat_clock() + swf_backends = {} for region in boto.swf.regions(): diff --git a/moto/swf/models/activity_task.py b/moto/swf/models/activity_task.py index 6baa01b5d..b099bc677 100644 --- a/moto/swf/models/activity_task.py +++ b/moto/swf/models/activity_task.py @@ -2,6 +2,8 @@ from __future__ import unicode_literals from datetime import datetime import uuid +from ..utils import now_timestamp + class ActivityTask(object): def __init__(self, activity_id, activity_type, scheduled_event_id, @@ -9,6 +11,7 @@ class ActivityTask(object): self.activity_id = activity_id self.activity_type = activity_type self.input = input + self.last_heartbeat_timestamp = now_timestamp() self.scheduled_event_id = scheduled_event_id self.started_event_id = None self.state = "SCHEDULED" @@ -39,3 +42,6 @@ class ActivityTask(object): def fail(self): self.state = "FAILED" + + def reset_heartbeat_clock(self): + self.last_heartbeat_timestamp = now_timestamp() diff --git a/moto/swf/models/history_event.py b/moto/swf/models/history_event.py index eb3c1f795..aa5d09498 100644 --- a/moto/swf/models/history_event.py +++ b/moto/swf/models/history_event.py @@ -2,14 +2,14 @@ from __future__ import unicode_literals from datetime import datetime from time import mktime -from ..utils import decapitalize +from ..utils import decapitalize, now_timestamp class HistoryEvent(object): def __init__(self, event_id, event_type, **kwargs): self.event_id = event_id self.event_type = event_type - self.event_timestamp = float(mktime(datetime.now().timetuple())) + self.event_timestamp = now_timestamp() for key, value in kwargs.iteritems(): self.__setattr__(key, value) # break soon if attributes are not valid diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index 10d134dca..5674e892a 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -13,7 +13,7 @@ from ..exceptions import ( SWFValidationException, SWFDecisionValidationException, ) -from ..utils import decapitalize +from ..utils import decapitalize, now_timestamp from .activity_task import ActivityTask from .activity_type import ActivityType from .decision_task import DecisionTask @@ -161,12 +161,8 @@ class WorkflowExecution(object): self._events.append(evt) return evt - # TODO: move it in utils - def _now_timestamp(self): - return float(mktime(datetime.now().timetuple())) - def start(self): - self.start_timestamp = self._now_timestamp() + self.start_timestamp = now_timestamp() self._add_event( "WorkflowExecutionStarted", workflow_execution=self, @@ -333,7 +329,7 @@ class WorkflowExecution(object): def complete(self, event_id, result=None): self.execution_status = "CLOSED" self.close_status = "COMPLETED" - self.close_timestamp = self._now_timestamp() + self.close_timestamp = now_timestamp() evt = self._add_event( "WorkflowExecutionCompleted", decision_task_completed_event_id=event_id, @@ -344,7 +340,7 @@ class WorkflowExecution(object): # TODO: implement lenght constraints on details/reason self.execution_status = "CLOSED" self.close_status = "FAILED" - self.close_timestamp = self._now_timestamp() + self.close_timestamp = now_timestamp() evt = self._add_event( "WorkflowExecutionFailed", decision_task_completed_event_id=event_id, @@ -423,7 +419,7 @@ class WorkflowExecution(object): ) self.domain.add_to_activity_task_list(task_list, task) self.open_counts["openActivityTasks"] += 1 - self.latest_activity_task_timestamp = self._now_timestamp() + self.latest_activity_task_timestamp = now_timestamp() def _find_activity_task(self, task_token): for task in self.activity_tasks: diff --git a/moto/swf/responses.py b/moto/swf/responses.py index ffadc73f2..3c4d6ec88 100644 --- a/moto/swf/responses.py +++ b/moto/swf/responses.py @@ -300,3 +300,12 @@ class SWFResponse(BaseResponse): details=details, reason=reason, run_id=run_id ) return "" + + def record_activity_task_heartbeat(self): + task_token = self._params["taskToken"] + details = self._params.get("details") + self.swf_backend.record_activity_task_heartbeat( + task_token, details=details + ) + # TODO: make it dynamic when we implement activity tasks cancellation + return json.dumps({"cancelRequested": False}) diff --git a/moto/swf/utils.py b/moto/swf/utils.py index 1b85f4ca9..02603bea9 100644 --- a/moto/swf/utils.py +++ b/moto/swf/utils.py @@ -1,2 +1,9 @@ +from datetime import datetime +from time import mktime + + def decapitalize(key): return key[0].lower() + key[1:] + +def now_timestamp(): + return float(mktime(datetime.now().timetuple())) diff --git a/tests/test_swf/models/test_activity_task.py b/tests/test_swf/models/test_activity_task.py index 93c842c8e..5c47d091e 100644 --- a/tests/test_swf/models/test_activity_task.py +++ b/tests/test_swf/models/test_activity_task.py @@ -1,3 +1,4 @@ +from freezegun import freeze_time from sure import expect from moto.swf.models import ( @@ -58,3 +59,22 @@ def test_activity_task_full_dict_representation(): at.start(1234) fd = at.to_full_dict() fd["startedEventId"].should.equal(1234) + +def test_activity_task_reset_heartbeat_clock(): + wfe = make_workflow_execution() + + with freeze_time("2015-01-01 12:00:00"): + task = ActivityTask( + activity_id="my-activity-123", + activity_type="foo", + input="optional", + scheduled_event_id=117, + workflow_execution=wfe, + ) + + task.last_heartbeat_timestamp.should.equal(1420110000.0) + + with freeze_time("2015-01-01 13:00:00"): + task.reset_heartbeat_clock() + + task.last_heartbeat_timestamp.should.equal(1420113600.0) diff --git a/tests/test_swf/responses/test_activity_tasks.py b/tests/test_swf/responses/test_activity_tasks.py index 3c7f82b5e..643c38f3f 100644 --- a/tests/test_swf/responses/test_activity_tasks.py +++ b/tests/test_swf/responses/test_activity_tasks.py @@ -171,3 +171,31 @@ def test_respond_activity_task_completed_with_wrong_token(): conn.respond_activity_task_failed.when.called_with( "not-a-correct-token" ).should.throw(SWFValidationException, "Invalid token") + + +# RecordActivityTaskHeartbeat endpoint +@mock_swf +def test_record_activity_task_heartbeat(): + conn = setup_workflow() + decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"] + conn.respond_decision_task_completed(decision_token, decisions=[ + SCHEDULE_ACTIVITY_TASK_DECISION + ]) + activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"] + + resp = conn.record_activity_task_heartbeat(activity_token, details="some progress details") + # TODO: check that "details" are reflected in ActivityTaskTimedOut event when a timeout occurs + resp.should.equal({"cancelRequested": False}) + +@mock_swf +def test_record_activity_task_heartbeat_with_wrong_token(): + conn = setup_workflow() + decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"] + conn.respond_decision_task_completed(decision_token, decisions=[ + SCHEDULE_ACTIVITY_TASK_DECISION + ]) + conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"] + + conn.record_activity_task_heartbeat.when.called_with( + "bad-token", details="some progress details" + ).should.throw(SWFValidationException) diff --git a/tests/test_swf/test_utils.py b/tests/test_swf/test_utils.py index 6d11ba5fc..33bb8ada6 100644 --- a/tests/test_swf/test_utils.py +++ b/tests/test_swf/test_utils.py @@ -1,5 +1,11 @@ +from freezegun import freeze_time from sure import expect -from moto.swf.utils import decapitalize + +from moto.swf.utils import ( + decapitalize, + now_timestamp, +) + def test_decapitalize(): cases = { @@ -9,3 +15,7 @@ def test_decapitalize(): } for before, after in cases.iteritems(): decapitalize(before).should.equal(after) + +@freeze_time("2015-01-01 12:00:00") +def test_now_timestamp(): + now_timestamp().should.equal(1420110000.0)