Implement "RecordMarker", "StartTimer", "CancelTimer" and "CancelWorkflowExecution" decisions (#4713)

This commit is contained in:
Jim Cockburn 2021-12-27 19:26:51 +00:00 committed by GitHub
parent 56c3eb6e51
commit 8cd8fc8296
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 420 additions and 8 deletions

View File

@ -16,6 +16,7 @@ from .domain import Domain # noqa
from .generic_type import GenericType # noqa
from .history_event import HistoryEvent # noqa
from .timeout import Timeout # noqa
from .timer import Timer # noqa
from .workflow_type import WorkflowType # noqa
from .workflow_execution import WorkflowExecution # noqa
from time import sleep

View File

@ -24,6 +24,14 @@ SUPPORTED_HISTORY_EVENT_TYPES = (
"DecisionTaskTimedOut",
"WorkflowExecutionTimedOut",
"WorkflowExecutionSignaled",
"MarkerRecorded",
"TimerStarted",
"TimerCancelled",
"TimerFired",
"CancelTimerFailed",
"StartTimerFailed",
"WorkflowExecutionCanceled",
"CancelWorkflowExecutionFailed",
)

16
moto/swf/models/timer.py Normal file
View File

@ -0,0 +1,16 @@
from moto.core import BaseModel
class Timer(BaseModel):
def __init__(self, background_timer, started_event_id):
self.background_timer = background_timer
self.started_event_id = started_event_id
def start(self):
return self.background_timer.start()
def is_alive(self):
return self.background_timer.is_alive()
def cancel(self):
return self.background_timer.cancel()

View File

@ -1,4 +1,5 @@
import uuid
from threading import Timer as ThreadingTimer, Lock
from moto.core import BaseModel
from moto.core.utils import camelcase_to_underscores, unix_time
@ -15,6 +16,7 @@ from .activity_type import ActivityType
from .decision_task import DecisionTask
from .history_event import HistoryEvent
from .timeout import Timeout
from .timer import Timer
# TODO: extract decision related logic into a Decision class
@ -82,6 +84,9 @@ class WorkflowExecution(BaseModel):
# child workflows
self.child_workflow_executions = []
self._previous_started_event_id = None
# timers/thread utils
self.threading_lock = Lock()
self._timers = {}
def __repr__(self):
return "WorkflowExecution(run_id: {0})".format(self.run_id)
@ -235,9 +240,12 @@ class WorkflowExecution(BaseModel):
return max(event_ids or [0]) + 1
def _add_event(self, *args, **kwargs):
evt = HistoryEvent(self.next_event_id(), *args, **kwargs)
self._events.append(evt)
return evt
# lock here because the fire_timer function is called
# async, and want to ensure uniqueness in event ids
with self.threading_lock:
evt = HistoryEvent(self.next_event_id(), *args, **kwargs)
self._events.append(evt)
return evt
def start(self):
self.start_timestamp = unix_time()
@ -406,17 +414,21 @@ class WorkflowExecution(BaseModel):
self.fail(event_id, attributes.get("details"), attributes.get("reason"))
elif decision_type == "ScheduleActivityTask":
self.schedule_activity_task(event_id, attributes)
elif decision_type == "RecordMarker":
self.record_marker(event_id, attributes)
elif decision_type == "StartTimer":
self.start_timer(event_id, attributes)
elif decision_type == "CancelTimer":
self.cancel_timer(event_id, attributes["timerId"])
elif decision_type == "CancelWorkflowExecution":
self.cancel(event_id, attributes.get("details"))
else:
# TODO: implement Decision type: CancelTimer
# TODO: implement Decision type: CancelWorkflowExecution
# TODO: implement Decision type: ContinueAsNewWorkflowExecution
# TODO: implement Decision type: RecordMarker
# TODO: implement Decision type: RequestCancelActivityTask
# TODO: implement Decision type: RequestCancelExternalWorkflowExecution
# TODO: implement Decision type: ScheduleLambdaFunction
# TODO: implement Decision type: SignalExternalWorkflowExecution
# TODO: implement Decision type: StartChildWorkflowExecution
# TODO: implement Decision type: StartTimer
raise NotImplementedError(
"Cannot handle decision: {0}".format(decision_type)
)
@ -446,6 +458,27 @@ class WorkflowExecution(BaseModel):
reason=reason,
)
def cancel(self, event_id, details=None):
# TODO: implement length constraints on details
self.cancel_requested = True
# Can only cancel if there are no other pending desicion tasks
if self.open_counts["openDecisionTasks"] != 1:
# TODO OPERATION_NOT_PERMITTED is a valid failure state
self._add_event(
"CancelWorkflowExecutionFailed",
decision_task_completed_event_id=event_id,
cause="UNHANDLED_DECISION",
)
return
self.execution_status = "CLOSED"
self.close_status = "CANCELED"
self.close_timestamp = unix_time()
self._add_event(
"WorkflowExecutionCanceled",
decision_task_completed_event_id=event_id,
details=details,
)
def schedule_activity_task(self, event_id, attributes):
# Helper function to avoid repeating ourselves in the next sections
def fail_schedule_activity_task(_type, _cause):
@ -651,6 +684,70 @@ class WorkflowExecution(BaseModel):
timeout_type=task.timeout_type,
)
def record_marker(self, event_id, attributes):
self._add_event(
"MarkerRecorded",
decision_task_completed_event_id=event_id,
details=attributes.get("details"),
marker_name=attributes["markerName"],
)
def start_timer(self, event_id, attributes):
timer_id = attributes["timerId"]
existing_timer = self._timers.get(timer_id)
if existing_timer and existing_timer.is_alive():
# TODO 4 fail states are possible
# TIMER_ID_ALREADY_IN_USE | OPEN_TIMERS_LIMIT_EXCEEDED | TIMER_CREATION_RATE_EXCEEDED | OPERATION_NOT_PERMITTED
self._add_event(
"StartTimerFailed",
cause="TIMER_ID_ALREADY_IN_USE",
decision_task_completed_event_id=event_id,
timer_id=timer_id,
)
return
time_to_wait = attributes["startToFireTimeout"]
started_event_id = self._add_event(
"TimerStarted",
control=attributes.get("control"),
decision_task_completed_event_id=event_id,
start_to_fire_timeout=time_to_wait,
timer_id=timer_id,
).event_id
background_timer = ThreadingTimer(
float(time_to_wait), self._fire_timer, args=(started_event_id, timer_id,)
)
workflow_timer = Timer(background_timer, started_event_id)
self._timers[timer_id] = workflow_timer
workflow_timer.start()
def _fire_timer(self, started_event_id, timer_id):
self._add_event(
"TimerFired", started_event_id=started_event_id, timer_id=timer_id
)
self._timers.pop(timer_id)
def cancel_timer(self, event_id, timer_id):
requested_timer = self._timers.get(timer_id)
if not requested_timer or not requested_timer.is_alive():
# TODO there are 2 failure states
# TIMER_ID_UNKNOWN | OPERATION_NOT_PERMITTED
self._add_event(
"CancelTimerFailed",
cause="TIMER_ID_UNKNOWN",
decision_task_completed_event_id=event_id,
)
return
requested_timer.cancel()
self._timers.pop(timer_id)
self._add_event(
"TimerCancelled",
decision_task_completed_event_id=event_id,
started_event_id=requested_timer.started_event_id,
timer_id=timer_id,
)
@property
def open(self):
return self.execution_status == "OPEN"

View File

@ -0,0 +1,42 @@
from threading import Timer as ThreadingTimer
from moto.swf.models import Timer
from unittest.mock import Mock
def test_timer_creation():
background_timer = ThreadingTimer(30.0, lambda x: x)
under_test = Timer(background_timer, "abc123")
assert under_test.background_timer == background_timer
assert under_test.started_event_id == "abc123"
def test_timer_start_delegates_to_wrapped_timer():
background_timer = ThreadingTimer(30.0, lambda x: x)
background_timer.start = Mock()
under_test = Timer(background_timer, "abc123")
under_test.start()
background_timer.start.assert_called_once()
def test_timer_aliveness_delegates_to_wrapped_timer():
background_timer = ThreadingTimer(30.0, lambda x: x)
background_timer.is_alive = Mock()
under_test = Timer(background_timer, "abc123")
under_test.is_alive()
background_timer.is_alive.assert_called_once()
def test_timer_cancel_delegates_to_wrapped_timer():
background_timer = ThreadingTimer(30.0, lambda x: x)
background_timer.cancel = Mock()
under_test = Timer(background_timer, "abc123")
under_test.cancel()
background_timer.cancel.assert_called_once()

View File

@ -1,7 +1,17 @@
from threading import Timer as ThreadingTimer
from time import sleep
from freezegun import freeze_time
from unittest.mock import Mock, patch
import sure # noqa # pylint: disable=unused-import
from moto.swf.models import ActivityType, Timeout, WorkflowType, WorkflowExecution
from moto.swf.models import (
ActivityType,
Timeout,
Timer,
WorkflowType,
WorkflowExecution,
)
from moto.swf.exceptions import SWFDefaultUndefinedFault
from ..utils import (
auto_start_decision_tasks,
@ -541,3 +551,113 @@ def test_timeouts_are_processed_in_order_and_reevaluated():
"WorkflowExecutionTimedOut",
]
)
def test_record_marker():
wfe = make_workflow_execution()
MARKER_EVENT_ATTRIBUTES = {"markerName": "example_marker"}
wfe.record_marker(123, MARKER_EVENT_ATTRIBUTES)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("MarkerRecorded")
last_event.event_attributes["markerName"].should.equal("example_marker")
last_event.event_attributes["decisionTaskCompletedEventId"].should.equal(123)
def test_start_timer():
wfe = make_workflow_execution()
START_TIMER_EVENT_ATTRIBUTES = {"startToFireTimeout": "10", "timerId": "abc123"}
with patch("moto.swf.models.workflow_execution.ThreadingTimer"):
wfe.start_timer(123, START_TIMER_EVENT_ATTRIBUTES)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("TimerStarted")
last_event.event_attributes["startToFireTimeout"].should.equal("10")
last_event.event_attributes["timerId"].should.equal("abc123")
last_event.event_attributes["decisionTaskCompletedEventId"].should.equal(123)
def test_start_timer_correctly_fires_timer_later():
wfe = make_workflow_execution()
START_TIMER_EVENT_ATTRIBUTES = {"startToFireTimeout": "60", "timerId": "abc123"}
# Patch thread's event with one that immediately resolves
with patch("threading.Event.wait"):
wfe.start_timer(123, START_TIMER_EVENT_ATTRIBUTES)
# Small wait to let both events populate
sleep(0.5)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("TimerFired")
last_event.event_attributes["timerId"].should.equal("abc123")
last_event.event_attributes["startedEventId"].should.equal(1)
def test_start_timer_fails_if_timer_already_started():
wfe = make_workflow_execution()
existing_timer = Mock(spec=ThreadingTimer)
existing_timer.is_alive.return_value = True
wfe._timers["abc123"] = Timer(existing_timer, 1)
START_TIMER_EVENT_ATTRIBUTES = {"startToFireTimeout": "10", "timerId": "abc123"}
wfe.start_timer(123, START_TIMER_EVENT_ATTRIBUTES)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("StartTimerFailed")
last_event.event_attributes["cause"].should.equal("TIMER_ID_ALREADY_IN_USE")
last_event.event_attributes["timerId"].should.equal("abc123")
last_event.event_attributes["decisionTaskCompletedEventId"].should.equal(123)
def test_cancel_timer():
wfe = make_workflow_execution()
existing_timer = Mock(spec=ThreadingTimer)
existing_timer.is_alive.return_value = True
wfe._timers["abc123"] = Timer(existing_timer, 1)
wfe.cancel_timer(123, "abc123")
last_event = wfe.events()[-1]
last_event.event_type.should.equal("TimerCancelled")
last_event.event_attributes["startedEventId"].should.equal(1)
last_event.event_attributes["timerId"].should.equal("abc123")
last_event.event_attributes["decisionTaskCompletedEventId"].should.equal(123)
existing_timer.cancel.assert_called_once()
assert not wfe._timers.get("abc123")
def test_cancel_timer_fails_if_timer_not_found():
wfe = make_workflow_execution()
wfe.cancel_timer(123, "abc123")
last_event = wfe.events()[-1]
last_event.event_type.should.equal("CancelTimerFailed")
last_event.event_attributes["cause"].should.equal("TIMER_ID_UNKNOWN")
last_event.event_attributes["decisionTaskCompletedEventId"].should.equal(123)
def test_cancel_workflow():
wfe = make_workflow_execution()
wfe.open_counts["openDecisionTasks"] = 1
wfe.cancel(123, "I want to cancel")
last_event = wfe.events()[-1]
last_event.event_type.should.equal("WorkflowExecutionCanceled")
last_event.event_attributes["details"].should.equal("I want to cancel")
last_event.event_attributes["decisionTaskCompletedEventId"].should.equal(123)
def test_cancel_workflow_fails_if_open_decision():
wfe = make_workflow_execution()
wfe.open_counts["openDecisionTasks"] = 2
wfe.cancel(123, "I want to cancel")
last_event = wfe.events()[-1]
last_event.event_type.should.equal("CancelWorkflowExecutionFailed")
last_event.event_attributes["cause"].should.equal("UNHANDLED_DECISION")
last_event.event_attributes["decisionTaskCompletedEventId"].should.equal(123)

View File

@ -2,6 +2,7 @@ from boto.swf.exceptions import SWFResponseError
from botocore.exceptions import ClientError
from datetime import datetime
from freezegun import freeze_time
from time import sleep
import sure # noqa # pylint: disable=unused-import
import pytest
@ -815,3 +816,130 @@ def test_respond_decision_task_completed_with_schedule_activity_task_boto3():
if not settings.TEST_SERVER_MODE:
ts = resp["latestActivityTaskTimestamp"].strftime("%Y-%m-%d %H:%M:%S")
ts.should.equal("2015-01-01 12:00:00")
@mock_swf
def test_record_marker_decision():
client = setup_workflow_boto3()
resp = client.poll_for_decision_task(
domain="test-domain", taskList={"name": "queue"}
)
task_token = resp["taskToken"]
decisions = [
{
"decisionType": "RecordMarker",
"recordMarkerDecisionAttributes": {"markerName": "TheMarker",},
}
]
client.respond_decision_task_completed(taskToken=task_token, decisions=decisions)
resp = client.get_workflow_execution_history(
domain="test-domain",
execution={"runId": client.run_id, "workflowId": "uid-abcd1234"},
)
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal(
[
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"DecisionTaskCompleted",
"MarkerRecorded",
]
)
resp["events"][-1]["markerRecordedEventAttributes"].should.equal(
{"decisionTaskCompletedEventId": 4, "markerName": "TheMarker"}
)
@mock_swf
def test_start_and_fire_timer_decision():
client = setup_workflow_boto3()
resp = client.poll_for_decision_task(
domain="test-domain", taskList={"name": "queue"}
)
task_token = resp["taskToken"]
decisions = [
{
"decisionType": "StartTimer",
"startTimerDecisionAttributes": {
"startToFireTimeout": "1",
"timerId": "timer1",
},
}
]
client.respond_decision_task_completed(taskToken=task_token, decisions=decisions)
sleep(1.1)
resp = client.get_workflow_execution_history(
domain="test-domain",
execution={"runId": client.run_id, "workflowId": "uid-abcd1234"},
)
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal(
[
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"DecisionTaskCompleted",
"TimerStarted",
"TimerFired",
]
)
resp["events"][-2]["timerStartedEventAttributes"].should.equal(
{
"decisionTaskCompletedEventId": 4,
"startToFireTimeout": "1",
"timerId": "timer1",
}
)
resp["events"][-1]["timerFiredEventAttributes"].should.equal(
{"startedEventId": 5, "timerId": "timer1"}
)
@mock_swf
def test_cancel_workflow_decision():
client = setup_workflow_boto3()
resp = client.poll_for_decision_task(
domain="test-domain", taskList={"name": "queue"}
)
task_token = resp["taskToken"]
decisions = [
{
"decisionType": "CancelWorkflowExecution",
"cancelWorkflowExecutionDecisionAttributes": {
"details": "decide to cancel"
},
}
]
client.respond_decision_task_completed(taskToken=task_token, decisions=decisions)
resp = client.get_workflow_execution_history(
domain="test-domain",
execution={"runId": client.run_id, "workflowId": "uid-abcd1234"},
)
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal(
[
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"DecisionTaskCompleted",
"WorkflowExecutionCanceled",
]
)
resp["events"][-1]["workflowExecutionCanceledEventAttributes"].should.equal(
{"decisionTaskCompletedEventId": 4, "details": "decide to cancel"}
)
workflow_result = client.describe_workflow_execution(
domain="test-domain",
execution={"runId": client.run_id, "workflowId": "uid-abcd1234"},
)["executionInfo"]
workflow_result.should.contain("closeTimestamp")
workflow_result["executionStatus"].should.equal("CLOSED")
workflow_result["closeStatus"].should.equal("CANCELED")
workflow_result["cancelRequested"].should.equal(True)