Ensure activity and decision tasks cannot progress on a closed workflow
This is a second barrier because I'm a little nervous about this and I don't want moto/swf to make any activity progress while in the real world service, it's strictly impossible once the execution is closed. Python doesn't seem to have any nice way of freezing an object so here we go with a manual boundary...
This commit is contained in:
parent
65c95ab5bc
commit
61bb550052
@ -119,3 +119,8 @@ class SWFDecisionValidationException(SWFClientError):
|
|||||||
prefix.format(count) + "; ".join(messages),
|
prefix.format(count) + "; ".join(messages),
|
||||||
"com.amazon.coral.validate#ValidationException"
|
"com.amazon.coral.validate#ValidationException"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class SWFWorkflowExecutionClosedError(Exception):
|
||||||
|
def __str__(self):
|
||||||
|
return repr("Cannot change this object because the WorkflowExecution is closed")
|
||||||
|
@ -2,6 +2,7 @@ from __future__ import unicode_literals
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
|
from ..exceptions import SWFWorkflowExecutionClosedError
|
||||||
from ..utils import now_timestamp
|
from ..utils import now_timestamp
|
||||||
|
|
||||||
|
|
||||||
@ -24,6 +25,10 @@ class ActivityTask(object):
|
|||||||
# but that shouldn't be a problem for tests
|
# but that shouldn't be a problem for tests
|
||||||
self.scheduled_at = datetime.now()
|
self.scheduled_at = datetime.now()
|
||||||
|
|
||||||
|
def _check_workflow_execution_open(self):
|
||||||
|
if not self.workflow_execution.open:
|
||||||
|
raise SWFWorkflowExecutionClosedError()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def open(self):
|
def open(self):
|
||||||
return self.state in ["SCHEDULED", "STARTED"]
|
return self.state in ["SCHEDULED", "STARTED"]
|
||||||
@ -45,9 +50,11 @@ class ActivityTask(object):
|
|||||||
self.started_event_id = started_event_id
|
self.started_event_id = started_event_id
|
||||||
|
|
||||||
def complete(self):
|
def complete(self):
|
||||||
|
self._check_workflow_execution_open()
|
||||||
self.state = "COMPLETED"
|
self.state = "COMPLETED"
|
||||||
|
|
||||||
def fail(self):
|
def fail(self):
|
||||||
|
self._check_workflow_execution_open()
|
||||||
self.state = "FAILED"
|
self.state = "FAILED"
|
||||||
|
|
||||||
def reset_heartbeat_clock(self):
|
def reset_heartbeat_clock(self):
|
||||||
@ -63,5 +70,9 @@ class ActivityTask(object):
|
|||||||
|
|
||||||
def process_timeouts(self):
|
def process_timeouts(self):
|
||||||
if self.has_timedout():
|
if self.has_timedout():
|
||||||
|
self.timeout()
|
||||||
|
|
||||||
|
def timeout(self):
|
||||||
|
self._check_workflow_execution_open()
|
||||||
self.state = "TIMED_OUT"
|
self.state = "TIMED_OUT"
|
||||||
self.timeout_type = "HEARTBEAT"
|
self.timeout_type = "HEARTBEAT"
|
||||||
|
@ -2,6 +2,7 @@ from __future__ import unicode_literals
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
|
from ..exceptions import SWFWorkflowExecutionClosedError
|
||||||
from ..utils import now_timestamp
|
from ..utils import now_timestamp
|
||||||
|
|
||||||
|
|
||||||
@ -21,6 +22,10 @@ class DecisionTask(object):
|
|||||||
self.scheduled_at = datetime.now()
|
self.scheduled_at = datetime.now()
|
||||||
self.timeout_type = None
|
self.timeout_type = None
|
||||||
|
|
||||||
|
def _check_workflow_execution_open(self):
|
||||||
|
if not self.workflow_execution.open:
|
||||||
|
raise SWFWorkflowExecutionClosedError()
|
||||||
|
|
||||||
def to_full_dict(self, reverse_order=False):
|
def to_full_dict(self, reverse_order=False):
|
||||||
events = self.workflow_execution.events(reverse_order=reverse_order)
|
events = self.workflow_execution.events(reverse_order=reverse_order)
|
||||||
hsh = {
|
hsh = {
|
||||||
@ -42,6 +47,7 @@ class DecisionTask(object):
|
|||||||
self.started_event_id = started_event_id
|
self.started_event_id = started_event_id
|
||||||
|
|
||||||
def complete(self):
|
def complete(self):
|
||||||
|
self._check_workflow_execution_open()
|
||||||
self.state = "COMPLETED"
|
self.state = "COMPLETED"
|
||||||
|
|
||||||
def has_timedout(self):
|
def has_timedout(self):
|
||||||
@ -54,5 +60,9 @@ class DecisionTask(object):
|
|||||||
|
|
||||||
def process_timeouts(self):
|
def process_timeouts(self):
|
||||||
if self.has_timedout():
|
if self.has_timedout():
|
||||||
|
self.timeout()
|
||||||
|
|
||||||
|
def timeout(self):
|
||||||
|
self._check_workflow_execution_open()
|
||||||
self.state = "TIMED_OUT"
|
self.state = "TIMED_OUT"
|
||||||
self.timeout_type = "START_TO_CLOSE"
|
self.timeout_type = "START_TO_CLOSE"
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
from freezegun import freeze_time
|
from freezegun import freeze_time
|
||||||
from sure import expect
|
from sure import expect
|
||||||
|
|
||||||
|
from moto.swf.exceptions import SWFWorkflowExecutionClosedError
|
||||||
from moto.swf.models import (
|
from moto.swf.models import (
|
||||||
ActivityTask,
|
ActivityTask,
|
||||||
ActivityType,
|
ActivityType,
|
||||||
@ -123,3 +124,21 @@ def test_activity_task_cannot_timeout_on_closed_workflow_execution():
|
|||||||
wfe.has_timedout().should.equal(True)
|
wfe.has_timedout().should.equal(True)
|
||||||
wfe.process_timeouts()
|
wfe.process_timeouts()
|
||||||
task.has_timedout().should.equal(False)
|
task.has_timedout().should.equal(False)
|
||||||
|
|
||||||
|
def test_activity_task_cannot_change_state_on_closed_workflow_execution():
|
||||||
|
wfe = make_workflow_execution()
|
||||||
|
wfe.start()
|
||||||
|
|
||||||
|
task = ActivityTask(
|
||||||
|
activity_id="my-activity-123",
|
||||||
|
activity_type="foo",
|
||||||
|
input="optional",
|
||||||
|
scheduled_event_id=117,
|
||||||
|
timeouts=ACTIVITY_TASK_TIMEOUTS,
|
||||||
|
workflow_execution=wfe,
|
||||||
|
)
|
||||||
|
wfe.complete(123)
|
||||||
|
|
||||||
|
task.timeout.when.called_with().should.throw(SWFWorkflowExecutionClosedError)
|
||||||
|
task.complete.when.called_with().should.throw(SWFWorkflowExecutionClosedError)
|
||||||
|
task.fail.when.called_with().should.throw(SWFWorkflowExecutionClosedError)
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
from freezegun import freeze_time
|
from freezegun import freeze_time
|
||||||
from sure import expect
|
from sure import expect
|
||||||
|
|
||||||
|
from moto.swf.exceptions import SWFWorkflowExecutionClosedError
|
||||||
from moto.swf.models import DecisionTask
|
from moto.swf.models import DecisionTask
|
||||||
|
|
||||||
from ..utils import make_workflow_execution
|
from ..utils import make_workflow_execution
|
||||||
@ -61,3 +62,13 @@ def test_decision_task_cannot_timeout_on_closed_workflow_execution():
|
|||||||
wfe.has_timedout().should.equal(True)
|
wfe.has_timedout().should.equal(True)
|
||||||
wfe.process_timeouts()
|
wfe.process_timeouts()
|
||||||
dt.has_timedout().should.equal(False)
|
dt.has_timedout().should.equal(False)
|
||||||
|
|
||||||
|
def test_decision_task_cannot_change_state_on_closed_workflow_execution():
|
||||||
|
wfe = make_workflow_execution()
|
||||||
|
wfe.start()
|
||||||
|
task = DecisionTask(wfe, 123)
|
||||||
|
|
||||||
|
wfe.complete(123)
|
||||||
|
|
||||||
|
task.timeout.when.called_with().should.throw(SWFWorkflowExecutionClosedError)
|
||||||
|
task.complete.when.called_with().should.throw(SWFWorkflowExecutionClosedError)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user