Merge pull request #2768 from EpicWink/swf-previous-started-event-id
Keep track of previous started event ID in SWF executions
This commit is contained in:
commit
fd4d42557f
@ -15,7 +15,7 @@ class DecisionTask(BaseModel):
|
|||||||
self.workflow_type = workflow_execution.workflow_type
|
self.workflow_type = workflow_execution.workflow_type
|
||||||
self.task_token = str(uuid.uuid4())
|
self.task_token = str(uuid.uuid4())
|
||||||
self.scheduled_event_id = scheduled_event_id
|
self.scheduled_event_id = scheduled_event_id
|
||||||
self.previous_started_event_id = 0
|
self.previous_started_event_id = None
|
||||||
self.started_event_id = None
|
self.started_event_id = None
|
||||||
self.started_timestamp = None
|
self.started_timestamp = None
|
||||||
self.start_to_close_timeout = (
|
self.start_to_close_timeout = (
|
||||||
@ -40,18 +40,20 @@ class DecisionTask(BaseModel):
|
|||||||
hsh = {
|
hsh = {
|
||||||
"events": [evt.to_dict() for evt in events],
|
"events": [evt.to_dict() for evt in events],
|
||||||
"taskToken": self.task_token,
|
"taskToken": self.task_token,
|
||||||
"previousStartedEventId": self.previous_started_event_id,
|
|
||||||
"workflowExecution": self.workflow_execution.to_short_dict(),
|
"workflowExecution": self.workflow_execution.to_short_dict(),
|
||||||
"workflowType": self.workflow_type.to_short_dict(),
|
"workflowType": self.workflow_type.to_short_dict(),
|
||||||
}
|
}
|
||||||
|
if self.previous_started_event_id is not None:
|
||||||
|
hsh["previousStartedEventId"] = self.previous_started_event_id
|
||||||
if self.started_event_id:
|
if self.started_event_id:
|
||||||
hsh["startedEventId"] = self.started_event_id
|
hsh["startedEventId"] = self.started_event_id
|
||||||
return hsh
|
return hsh
|
||||||
|
|
||||||
def start(self, started_event_id):
|
def start(self, started_event_id, previous_started_event_id=None):
|
||||||
self.state = "STARTED"
|
self.state = "STARTED"
|
||||||
self.started_timestamp = unix_time()
|
self.started_timestamp = unix_time()
|
||||||
self.started_event_id = started_event_id
|
self.started_event_id = started_event_id
|
||||||
|
self.previous_started_event_id = previous_started_event_id
|
||||||
|
|
||||||
def complete(self):
|
def complete(self):
|
||||||
self._check_workflow_execution_open()
|
self._check_workflow_execution_open()
|
||||||
|
@ -82,6 +82,7 @@ class WorkflowExecution(BaseModel):
|
|||||||
self._events = []
|
self._events = []
|
||||||
# child workflows
|
# child workflows
|
||||||
self.child_workflow_executions = []
|
self.child_workflow_executions = []
|
||||||
|
self._previous_started_event_id = None
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "WorkflowExecution(run_id: {0})".format(self.run_id)
|
return "WorkflowExecution(run_id: {0})".format(self.run_id)
|
||||||
@ -295,7 +296,8 @@ class WorkflowExecution(BaseModel):
|
|||||||
scheduled_event_id=dt.scheduled_event_id,
|
scheduled_event_id=dt.scheduled_event_id,
|
||||||
identity=identity,
|
identity=identity,
|
||||||
)
|
)
|
||||||
dt.start(evt.event_id)
|
dt.start(evt.event_id, self._previous_started_event_id)
|
||||||
|
self._previous_started_event_id = evt.event_id
|
||||||
|
|
||||||
def complete_decision_task(
|
def complete_decision_task(
|
||||||
self, task_token, decisions=None, execution_context=None
|
self, task_token, decisions=None, execution_context=None
|
||||||
|
@ -24,15 +24,16 @@ def test_decision_task_full_dict_representation():
|
|||||||
|
|
||||||
fd = dt.to_full_dict()
|
fd = dt.to_full_dict()
|
||||||
fd["events"].should.be.a("list")
|
fd["events"].should.be.a("list")
|
||||||
fd["previousStartedEventId"].should.equal(0)
|
fd.should_not.contain("previousStartedEventId")
|
||||||
fd.should_not.contain("startedEventId")
|
fd.should_not.contain("startedEventId")
|
||||||
fd.should.contain("taskToken")
|
fd.should.contain("taskToken")
|
||||||
fd["workflowExecution"].should.equal(wfe.to_short_dict())
|
fd["workflowExecution"].should.equal(wfe.to_short_dict())
|
||||||
fd["workflowType"].should.equal(wft.to_short_dict())
|
fd["workflowType"].should.equal(wft.to_short_dict())
|
||||||
|
|
||||||
dt.start(1234)
|
dt.start(1234, 1230)
|
||||||
fd = dt.to_full_dict()
|
fd = dt.to_full_dict()
|
||||||
fd["startedEventId"].should.equal(1234)
|
fd["startedEventId"].should.equal(1234)
|
||||||
|
fd["previousStartedEventId"].should.equal(1230)
|
||||||
|
|
||||||
|
|
||||||
def test_decision_task_first_timeout():
|
def test_decision_task_first_timeout():
|
||||||
|
@ -30,6 +30,30 @@ def test_poll_for_decision_task_when_one():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_swf_deprecated
|
||||||
|
def test_poll_for_decision_task_previous_started_event_id():
|
||||||
|
conn = setup_workflow()
|
||||||
|
|
||||||
|
resp = conn.poll_for_decision_task("test-domain", "queue")
|
||||||
|
assert resp["workflowExecution"]["runId"] == conn.run_id
|
||||||
|
assert "previousStartedEventId" not in resp
|
||||||
|
|
||||||
|
# Require a failing decision, in this case a non-existant activity type
|
||||||
|
attrs = {
|
||||||
|
"activityId": "spam",
|
||||||
|
"activityType": {"name": "test-activity", "version": "v1.42"},
|
||||||
|
"taskList": "eggs",
|
||||||
|
}
|
||||||
|
decision = {
|
||||||
|
"decisionType": "ScheduleActivityTask",
|
||||||
|
"scheduleActivityTaskDecisionAttributes": attrs,
|
||||||
|
}
|
||||||
|
conn.respond_decision_task_completed(resp["taskToken"], decisions=[decision])
|
||||||
|
resp = conn.poll_for_decision_task("test-domain", "queue")
|
||||||
|
assert resp["workflowExecution"]["runId"] == conn.run_id
|
||||||
|
assert resp["previousStartedEventId"] == 3
|
||||||
|
|
||||||
|
|
||||||
@mock_swf_deprecated
|
@mock_swf_deprecated
|
||||||
def test_poll_for_decision_task_when_none():
|
def test_poll_for_decision_task_when_none():
|
||||||
conn = setup_workflow()
|
conn = setup_workflow()
|
||||||
|
Loading…
Reference in New Issue
Block a user