Handle WorkflowExecution/WorkflowType options inheritance
... and potential resulting DefaultUndefinedFault errors.
This commit is contained in:
parent
6a8636ad21
commit
3ce5b29356
@ -63,3 +63,15 @@ class SWFWorkflowExecutionAlreadyStartedFault(JSONResponseError):
|
||||
400, "Bad Request",
|
||||
body={"__type": "com.amazonaws.swf.base.model#WorkflowExecutionAlreadyStartedFault"}
|
||||
)
|
||||
|
||||
|
||||
class SWFDefaultUndefinedFault(SWFClientError):
|
||||
def __init__(self, key):
|
||||
# TODO: move that into moto.core.utils maybe?
|
||||
words = key.split("_")
|
||||
key_camel_case = words.pop(0)
|
||||
for word in words:
|
||||
key_camel_case += word.capitalize()
|
||||
super(SWFDefaultUndefinedFault, self).__init__(
|
||||
key_camel_case, "com.amazonaws.swf.base.model#DefaultUndefinedFault"
|
||||
)
|
||||
|
@ -3,6 +3,8 @@ import uuid
|
||||
|
||||
from moto.core.utils import camelcase_to_underscores
|
||||
|
||||
from ..exceptions import SWFDefaultUndefinedFault
|
||||
|
||||
|
||||
class WorkflowExecution(object):
|
||||
def __init__(self, workflow_type, workflow_id, **kwargs):
|
||||
@ -11,10 +13,16 @@ class WorkflowExecution(object):
|
||||
self.run_id = uuid.uuid4().hex
|
||||
self.execution_status = "OPEN"
|
||||
self.cancel_requested = False
|
||||
#config
|
||||
for key, value in kwargs.iteritems():
|
||||
self.__setattr__(key, value)
|
||||
#counters
|
||||
# args processing
|
||||
# NB: the order follows boto/SWF order of exceptions appearance (if no
|
||||
# param is set, # SWF will raise DefaultUndefinedFault errors in the
|
||||
# same order as the few lines that follow)
|
||||
self._set_from_kwargs_or_workflow_type(kwargs, "execution_start_to_close_timeout")
|
||||
self._set_from_kwargs_or_workflow_type(kwargs, "task_list", "task_list")
|
||||
self._set_from_kwargs_or_workflow_type(kwargs, "task_start_to_close_timeout")
|
||||
self._set_from_kwargs_or_workflow_type(kwargs, "child_policy")
|
||||
self.input = kwargs.get("input")
|
||||
# counters
|
||||
self.open_counts = {
|
||||
"openTimers": 0,
|
||||
"openDecisionTasks": 0,
|
||||
@ -25,6 +33,16 @@ class WorkflowExecution(object):
|
||||
def __repr__(self):
|
||||
return "WorkflowExecution(run_id: {})".format(self.run_id)
|
||||
|
||||
def _set_from_kwargs_or_workflow_type(self, kwargs, local_key, workflow_type_key=None):
|
||||
if workflow_type_key is None:
|
||||
workflow_type_key = "default_"+local_key
|
||||
value = kwargs.get(local_key)
|
||||
if not value and hasattr(self.workflow_type, workflow_type_key):
|
||||
value = getattr(self.workflow_type, workflow_type_key)
|
||||
if not value:
|
||||
raise SWFDefaultUndefinedFault(local_key)
|
||||
setattr(self, local_key, value)
|
||||
|
||||
@property
|
||||
def _configuration_keys(self):
|
||||
return [
|
||||
|
@ -153,7 +153,7 @@ class SWFResponse(BaseResponse):
|
||||
task_list = None
|
||||
default_child_policy = self._params.get("defaultChildPolicy")
|
||||
default_task_start_to_close_timeout = self._params.get("defaultTaskStartToCloseTimeout")
|
||||
default_execution_start_to_close_timeout = self._params.get("defaultTaskExecutionStartToCloseTimeout")
|
||||
default_execution_start_to_close_timeout = self._params.get("defaultExecutionStartToCloseTimeout")
|
||||
description = self._params.get("description")
|
||||
# TODO: add defaultTaskPriority when boto gets to support it
|
||||
# TODO: add defaultLambdaRole when boto gets to support it
|
||||
|
@ -1,4 +1,5 @@
|
||||
from sure import expect
|
||||
from nose.tools import assert_raises
|
||||
|
||||
from moto.swf.models import (
|
||||
Domain,
|
||||
@ -6,8 +7,20 @@ from moto.swf.models import (
|
||||
WorkflowType,
|
||||
WorkflowExecution,
|
||||
)
|
||||
from moto.swf.exceptions import (
|
||||
SWFDefaultUndefinedFault,
|
||||
)
|
||||
|
||||
|
||||
# utils
|
||||
def test_workflow_type():
|
||||
return WorkflowType(
|
||||
"test-workflow", "v1.0",
|
||||
task_list="queue", default_child_policy="ABANDON",
|
||||
default_execution_start_to_close_timeout="300",
|
||||
default_task_start_to_close_timeout="300",
|
||||
)
|
||||
|
||||
# Domain
|
||||
def test_domain_short_dict_representation():
|
||||
domain = Domain("foo", "52")
|
||||
@ -78,21 +91,62 @@ def test_type_string_representation():
|
||||
|
||||
# WorkflowExecution
|
||||
def test_workflow_execution_creation():
|
||||
wfe = WorkflowExecution("workflow_type_whatever", "ab1234", child_policy="TERMINATE")
|
||||
wfe.workflow_type.should.equal("workflow_type_whatever")
|
||||
wft = test_workflow_type()
|
||||
wfe = WorkflowExecution(wft, "ab1234", child_policy="TERMINATE")
|
||||
wfe.workflow_type.should.equal(wft)
|
||||
wfe.child_policy.should.equal("TERMINATE")
|
||||
|
||||
def test_workflow_execution_creation_child_policy_logic():
|
||||
WorkflowExecution(
|
||||
WorkflowType(
|
||||
"test-workflow", "v1.0",
|
||||
task_list="queue", default_child_policy="ABANDON",
|
||||
default_execution_start_to_close_timeout="300",
|
||||
default_task_start_to_close_timeout="300",
|
||||
),
|
||||
"ab1234"
|
||||
).child_policy.should.equal("ABANDON")
|
||||
|
||||
WorkflowExecution(
|
||||
WorkflowType(
|
||||
"test-workflow", "v1.0", task_list="queue",
|
||||
default_execution_start_to_close_timeout="300",
|
||||
default_task_start_to_close_timeout="300",
|
||||
),
|
||||
"ab1234",
|
||||
child_policy="REQUEST_CANCEL"
|
||||
).child_policy.should.equal("REQUEST_CANCEL")
|
||||
|
||||
with assert_raises(SWFDefaultUndefinedFault) as err:
|
||||
WorkflowExecution(WorkflowType("test-workflow", "v1.0"), "ab1234")
|
||||
|
||||
ex = err.exception
|
||||
ex.status.should.equal(400)
|
||||
ex.error_code.should.equal("DefaultUndefinedFault")
|
||||
ex.body.should.equal({
|
||||
"__type": "com.amazonaws.swf.base.model#DefaultUndefinedFault",
|
||||
"message": "executionStartToCloseTimeout"
|
||||
})
|
||||
|
||||
|
||||
def test_workflow_execution_string_representation():
|
||||
wfe = WorkflowExecution("workflow_type_whatever", "ab1234", child_policy="TERMINATE")
|
||||
wft = test_workflow_type()
|
||||
wfe = WorkflowExecution(wft, "ab1234", child_policy="TERMINATE")
|
||||
str(wfe).should.match(r"^WorkflowExecution\(run_id: .*\)")
|
||||
|
||||
def test_workflow_execution_generates_a_random_run_id():
|
||||
wfe1 = WorkflowExecution("workflow_type_whatever", "ab1234")
|
||||
wfe2 = WorkflowExecution("workflow_type_whatever", "ab1235")
|
||||
wft = test_workflow_type()
|
||||
wfe1 = WorkflowExecution(wft, "ab1234", child_policy="TERMINATE")
|
||||
wfe2 = WorkflowExecution(wft, "ab1235", child_policy="TERMINATE")
|
||||
wfe1.run_id.should_not.equal(wfe2.run_id)
|
||||
|
||||
def test_workflow_execution_short_dict_representation():
|
||||
wf_type = WorkflowType("test-workflow", "v1.0")
|
||||
wf_type = WorkflowType(
|
||||
"test-workflow", "v1.0",
|
||||
task_list="queue", default_child_policy="ABANDON",
|
||||
default_execution_start_to_close_timeout="300",
|
||||
default_task_start_to_close_timeout="300",
|
||||
)
|
||||
wfe = WorkflowExecution(wf_type, "ab1234")
|
||||
|
||||
sd = wfe.to_short_dict()
|
||||
@ -100,7 +154,12 @@ def test_workflow_execution_short_dict_representation():
|
||||
sd.should.contain("runId")
|
||||
|
||||
def test_workflow_execution_medium_dict_representation():
|
||||
wf_type = WorkflowType("test-workflow", "v1.0")
|
||||
wf_type = WorkflowType(
|
||||
"test-workflow", "v1.0",
|
||||
task_list="queue", default_child_policy="ABANDON",
|
||||
default_execution_start_to_close_timeout="300",
|
||||
default_task_start_to_close_timeout="300",
|
||||
)
|
||||
wfe = WorkflowExecution(wf_type, "ab1234")
|
||||
|
||||
md = wfe.to_medium_dict()
|
||||
@ -116,7 +175,12 @@ def test_workflow_execution_medium_dict_representation():
|
||||
md["tagList"].should.equal(["foo", "bar", "baz"])
|
||||
|
||||
def test_workflow_execution_full_dict_representation():
|
||||
wf_type = WorkflowType("test-workflow", "v1.0")
|
||||
wf_type = WorkflowType(
|
||||
"test-workflow", "v1.0",
|
||||
task_list="queue", default_child_policy="ABANDON",
|
||||
default_execution_start_to_close_timeout="300",
|
||||
default_task_start_to_close_timeout="300",
|
||||
)
|
||||
wfe = WorkflowExecution(wf_type, "ab1234")
|
||||
|
||||
fd = wfe.to_full_dict()
|
||||
@ -124,10 +188,9 @@ def test_workflow_execution_full_dict_representation():
|
||||
fd["openCounts"]["openTimers"].should.equal(0)
|
||||
fd["openCounts"]["openDecisionTasks"].should.equal(0)
|
||||
fd["openCounts"]["openActivityTasks"].should.equal(0)
|
||||
fd["executionConfiguration"].should.equal({})
|
||||
|
||||
wfe.task_list = "special"
|
||||
wfe.task_start_to_close_timeout = "45"
|
||||
fd = wfe.to_full_dict()
|
||||
fd["executionConfiguration"]["taskList"]["name"].should.equal("special")
|
||||
fd["executionConfiguration"]["taskStartToCloseTimeout"].should.equal("45")
|
||||
fd["executionConfiguration"].should.equal({
|
||||
"childPolicy": "ABANDON",
|
||||
"executionStartToCloseTimeout": "300",
|
||||
"taskList": {"name": "queue"},
|
||||
"taskStartToCloseTimeout": "300",
|
||||
})
|
||||
|
@ -15,7 +15,12 @@ from moto.swf.exceptions import (
|
||||
def setup_swf_environment():
|
||||
conn = boto.connect_swf("the_key", "the_secret")
|
||||
conn.register_domain("test-domain", "60", description="A test domain")
|
||||
conn.register_workflow_type("test-domain", "test-workflow", "v1.0")
|
||||
conn.register_workflow_type(
|
||||
"test-domain", "test-workflow", "v1.0",
|
||||
task_list="queue", default_child_policy="TERMINATE",
|
||||
default_execution_start_to_close_timeout="300",
|
||||
default_task_start_to_close_timeout="300",
|
||||
)
|
||||
conn.register_activity_type("test-domain", "test-activity", "v1.1")
|
||||
return conn
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user