Fix heartbeatTimeout of NONE and polling responses when there are no tasks (#3680)
* fix heartbeatTimeout of NONE resulting in ValueError and polling returning empty string taskToken when it shouldn't be returned * fix expected taskToken in impacted tests Co-authored-by: Clint Parham <cparham@aligntech.com>
This commit is contained in:
parent
f246d4162f
commit
b60de10c79
@ -73,7 +73,10 @@ class ActivityTask(BaseModel):
|
|||||||
def first_timeout(self):
|
def first_timeout(self):
|
||||||
if not self.open or not self.workflow_execution.open:
|
if not self.open or not self.workflow_execution.open:
|
||||||
return None
|
return None
|
||||||
# TODO: handle the "NONE" case
|
|
||||||
|
if self.timeouts["heartbeatTimeout"] == "NONE":
|
||||||
|
return None
|
||||||
|
|
||||||
heartbeat_timeout_at = self.last_heartbeat_timestamp + int(
|
heartbeat_timeout_at = self.last_heartbeat_timestamp + int(
|
||||||
self.timeouts["heartbeatTimeout"]
|
self.timeouts["heartbeatTimeout"]
|
||||||
)
|
)
|
||||||
|
@ -446,9 +446,7 @@ class SWFResponse(BaseResponse):
|
|||||||
if decision:
|
if decision:
|
||||||
return json.dumps(decision.to_full_dict(reverse_order=reverse_order))
|
return json.dumps(decision.to_full_dict(reverse_order=reverse_order))
|
||||||
else:
|
else:
|
||||||
return json.dumps(
|
return json.dumps({"previousStartedEventId": 0, "startedEventId": 0})
|
||||||
{"previousStartedEventId": 0, "startedEventId": 0, "taskToken": ""}
|
|
||||||
)
|
|
||||||
|
|
||||||
def count_pending_decision_tasks(self):
|
def count_pending_decision_tasks(self):
|
||||||
domain_name = self._params["domain"]
|
domain_name = self._params["domain"]
|
||||||
@ -482,7 +480,7 @@ class SWFResponse(BaseResponse):
|
|||||||
if activity_task:
|
if activity_task:
|
||||||
return json.dumps(activity_task.to_full_dict())
|
return json.dumps(activity_task.to_full_dict())
|
||||||
else:
|
else:
|
||||||
return json.dumps({"startedEventId": 0, "taskToken": ""})
|
return json.dumps({"startedEventId": 0})
|
||||||
|
|
||||||
def count_pending_activity_tasks(self):
|
def count_pending_activity_tasks(self):
|
||||||
domain_name = self._params["domain"]
|
domain_name = self._params["domain"]
|
||||||
|
@ -108,6 +108,24 @@ def test_activity_task_first_timeout():
|
|||||||
task.timeout_type.should.equal("HEARTBEAT")
|
task.timeout_type.should.equal("HEARTBEAT")
|
||||||
|
|
||||||
|
|
||||||
|
def test_activity_task_first_timeout_with_heartbeat_timeout_none():
|
||||||
|
wfe = make_workflow_execution()
|
||||||
|
|
||||||
|
activity_task_timeouts = ACTIVITY_TASK_TIMEOUTS.copy()
|
||||||
|
activity_task_timeouts["heartbeatTimeout"] = "NONE"
|
||||||
|
|
||||||
|
with freeze_time("2015-01-01 12:00:00"):
|
||||||
|
task = ActivityTask(
|
||||||
|
activity_id="my-activity-123",
|
||||||
|
activity_type="foo",
|
||||||
|
input="optional",
|
||||||
|
scheduled_event_id=117,
|
||||||
|
timeouts=activity_task_timeouts,
|
||||||
|
workflow_execution=wfe,
|
||||||
|
)
|
||||||
|
task.first_timeout().should.be.none
|
||||||
|
|
||||||
|
|
||||||
def test_activity_task_cannot_timeout_on_closed_workflow_execution():
|
def test_activity_task_cannot_timeout_on_closed_workflow_execution():
|
||||||
with freeze_time("2015-01-01 12:00:00"):
|
with freeze_time("2015-01-01 12:00:00"):
|
||||||
wfe = make_workflow_execution()
|
wfe = make_workflow_execution()
|
||||||
|
@ -35,14 +35,14 @@ def test_poll_for_activity_task_when_one():
|
|||||||
def test_poll_for_activity_task_when_none():
|
def test_poll_for_activity_task_when_none():
|
||||||
conn = setup_workflow()
|
conn = setup_workflow()
|
||||||
resp = conn.poll_for_activity_task("test-domain", "activity-task-list")
|
resp = conn.poll_for_activity_task("test-domain", "activity-task-list")
|
||||||
resp.should.equal({"startedEventId": 0, "taskToken": ""})
|
resp.should.equal({"startedEventId": 0})
|
||||||
|
|
||||||
|
|
||||||
@mock_swf_deprecated
|
@mock_swf_deprecated
|
||||||
def test_poll_for_activity_task_on_non_existent_queue():
|
def test_poll_for_activity_task_on_non_existent_queue():
|
||||||
conn = setup_workflow()
|
conn = setup_workflow()
|
||||||
resp = conn.poll_for_activity_task("test-domain", "non-existent-queue")
|
resp = conn.poll_for_activity_task("test-domain", "non-existent-queue")
|
||||||
resp.should.equal({"startedEventId": 0, "taskToken": ""})
|
resp.should.equal({"startedEventId": 0})
|
||||||
|
|
||||||
|
|
||||||
# CountPendingActivityTasks endpoint
|
# CountPendingActivityTasks endpoint
|
||||||
|
@ -62,18 +62,14 @@ def test_poll_for_decision_task_when_none():
|
|||||||
resp = conn.poll_for_decision_task("test-domain", "queue")
|
resp = conn.poll_for_decision_task("test-domain", "queue")
|
||||||
# this is the DecisionTask representation you get from the real SWF
|
# this is the DecisionTask representation you get from the real SWF
|
||||||
# after waiting 60s when there's no decision to be taken
|
# after waiting 60s when there's no decision to be taken
|
||||||
resp.should.equal(
|
resp.should.equal({"previousStartedEventId": 0, "startedEventId": 0})
|
||||||
{"previousStartedEventId": 0, "startedEventId": 0, "taskToken": ""}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@mock_swf_deprecated
|
@mock_swf_deprecated
|
||||||
def test_poll_for_decision_task_on_non_existent_queue():
|
def test_poll_for_decision_task_on_non_existent_queue():
|
||||||
conn = setup_workflow()
|
conn = setup_workflow()
|
||||||
resp = conn.poll_for_decision_task("test-domain", "non-existent-queue")
|
resp = conn.poll_for_decision_task("test-domain", "non-existent-queue")
|
||||||
resp.should.equal(
|
resp.should.equal({"previousStartedEventId": 0, "startedEventId": 0})
|
||||||
{"previousStartedEventId": 0, "startedEventId": 0, "taskToken": ""}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@mock_swf_deprecated
|
@mock_swf_deprecated
|
||||||
|
Loading…
Reference in New Issue
Block a user