Merge pull request #451 from botify-labs/feature/add-swf

[wip] Add SWF support
This commit is contained in:
Steve Pulec 2015-11-26 09:50:01 -05:00
commit eabcb3d39c
40 changed files with 4369 additions and 0 deletions

View File

@ -41,3 +41,4 @@ Moto is written by Steve Pulec with contributions from:
* [Zack Kourouma](https://github.com/zkourouma)
* [Pior Bastida](https://github.com/pior)
* [Dustin J. Mitchell](https://github.com/djmitche)
* [Jean-Baptiste Barth](https://github.com/jbbarth)

View File

@ -28,3 +28,4 @@ from .sns import mock_sns # flake8: noqa
from .sqs import mock_sqs # flake8: noqa
from .sts import mock_sts # flake8: noqa
from .route53 import mock_route53 # flake8: noqa
from .swf import mock_swf # flake8: noqa

View File

@ -23,6 +23,22 @@ def camelcase_to_underscores(argument):
return result
def underscores_to_camelcase(argument):
''' Converts a camelcase param like the_new_attribute to the equivalent
camelcase version like theNewAttribute. Note that the first letter is
NOT capitalized by this function '''
result = ''
previous_was_underscore = False
for char in argument:
if char != '_':
if previous_was_underscore:
result += char.upper()
else:
result += char
previous_was_underscore = char == '_'
return result
def method_names_from_class(clazz):
# On Python 2, methods are different from functions, and the `inspect`
# predicates distinguish between them. On Python 3, methods are just

12
moto/swf/__init__.py Normal file
View File

@ -0,0 +1,12 @@
from __future__ import unicode_literals
from .models import swf_backends
from ..core.models import MockAWS
swf_backend = swf_backends['us-east-1']
def mock_swf(func=None):
if func:
return MockAWS(swf_backends)(func)
else:
return MockAWS(swf_backends)

85
moto/swf/constants.py Normal file
View File

@ -0,0 +1,85 @@
# List decision fields and if they're required or not
#
# See http://docs.aws.amazon.com/amazonswf/latest/apireference/API_RespondDecisionTaskCompleted.html
# and subsequent docs for each decision type.
DECISIONS_FIELDS = {
"cancelTimerDecisionAttributes": {
"timerId": { "type": "string", "required": True }
},
"cancelWorkflowExecutionDecisionAttributes": {
"details": { "type": "string", "required": False }
},
"completeWorkflowExecutionDecisionAttributes": {
"result": { "type": "string", "required": False }
},
"continueAsNewWorkflowExecutionDecisionAttributes": {
"childPolicy": { "type": "string", "required": False },
"executionStartToCloseTimeout": { "type": "string", "required": False },
"input": { "type": "string", "required": False },
"lambdaRole": { "type": "string", "required": False },
"tagList": { "type": "string", "array": True, "required": False },
"taskList": { "type": "TaskList", "required": False },
"taskPriority": { "type": "string", "required": False },
"taskStartToCloseTimeout": { "type": "string", "required": False },
"workflowTypeVersion": { "type": "string", "required": False }
},
"failWorkflowExecutionDecisionAttributes": {
"details": { "type": "string", "required": False },
"reason": { "type": "string", "required": False }
},
"recordMarkerDecisionAttributes": {
"details": { "type": "string", "required": False },
"markerName": { "type": "string", "required": True }
},
"requestCancelActivityTaskDecisionAttributes": {
"activityId": { "type": "string", "required": True }
},
"requestCancelExternalWorkflowExecutionDecisionAttributes": {
"control": { "type": "string", "required": False },
"runId": { "type": "string", "required": False },
"workflowId": { "type": "string", "required": True }
},
"scheduleActivityTaskDecisionAttributes": {
"activityId": { "type": "string", "required": True },
"activityType": { "type": "ActivityType", "required": True },
"control": { "type": "string", "required": False },
"heartbeatTimeout": { "type": "string", "required": False },
"input": { "type": "string", "required": False },
"scheduleToCloseTimeout": { "type": "string", "required": False },
"scheduleToStartTimeout": { "type": "string", "required": False },
"startToCloseTimeout": { "type": "string", "required": False },
"taskList": { "type": "TaskList", "required": False },
"taskPriority": { "type": "string", "required": False }
},
"scheduleLambdaFunctionDecisionAttributes": {
"id": { "type": "string", "required": True },
"input": { "type": "string", "required": False },
"name": { "type": "string", "required": True },
"startToCloseTimeout": { "type": "string", "required": False }
},
"signalExternalWorkflowExecutionDecisionAttributes": {
"control": { "type": "string", "required": False },
"input": { "type": "string", "required": False },
"runId": { "type": "string", "required": False },
"signalName": { "type": "string", "required": True },
"workflowId": { "type": "string", "required": True }
},
"startChildWorkflowExecutionDecisionAttributes": {
"childPolicy": { "type": "string", "required": False },
"control": { "type": "string", "required": False },
"executionStartToCloseTimeout": { "type": "string", "required": False },
"input": { "type": "string", "required": False },
"lambdaRole": { "type": "string", "required": False },
"tagList": { "type": "string", "array": True, "required": False },
"taskList": { "type": "TaskList", "required": False },
"taskPriority": { "type": "string", "required": False },
"taskStartToCloseTimeout": { "type": "string", "required": False },
"workflowId": { "type": "string", "required": True },
"workflowType": { "type": "WorkflowType", "required": True }
},
"startTimerDecisionAttributes": {
"control": { "type": "string", "required": False },
"startToFireTimeout": { "type": "string", "required": True },
"timerId": { "type": "string", "required": True }
}
}

126
moto/swf/exceptions.py Normal file
View File

@ -0,0 +1,126 @@
from __future__ import unicode_literals
from boto.exception import JSONResponseError
class SWFClientError(JSONResponseError):
def __init__(self, message, __type):
super(SWFClientError, self).__init__(
400, "Bad Request",
body={"message": message, "__type": __type}
)
class SWFUnknownResourceFault(SWFClientError):
def __init__(self, resource_type, resource_name=None):
if resource_name:
message = "Unknown {0}: {1}".format(resource_type, resource_name)
else:
message = "Unknown {0}".format(resource_type)
super(SWFUnknownResourceFault, self).__init__(
message,
"com.amazonaws.swf.base.model#UnknownResourceFault")
class SWFDomainAlreadyExistsFault(SWFClientError):
def __init__(self, domain_name):
super(SWFDomainAlreadyExistsFault, self).__init__(
domain_name,
"com.amazonaws.swf.base.model#DomainAlreadyExistsFault")
class SWFDomainDeprecatedFault(SWFClientError):
def __init__(self, domain_name):
super(SWFDomainDeprecatedFault, self).__init__(
domain_name,
"com.amazonaws.swf.base.model#DomainDeprecatedFault")
class SWFSerializationException(JSONResponseError):
def __init__(self, value):
message = "class java.lang.Foo can not be converted to an String "
message += " (not a real SWF exception ; happened on: {0})".format(value)
__type = "com.amazonaws.swf.base.model#SerializationException"
super(SWFSerializationException, self).__init__(
400, "Bad Request",
body={"Message": message, "__type": __type}
)
class SWFTypeAlreadyExistsFault(SWFClientError):
def __init__(self, _type):
super(SWFTypeAlreadyExistsFault, self).__init__(
"{0}=[name={1}, version={2}]".format(_type.__class__.__name__, _type.name, _type.version),
"com.amazonaws.swf.base.model#TypeAlreadyExistsFault")
class SWFTypeDeprecatedFault(SWFClientError):
def __init__(self, _type):
super(SWFTypeDeprecatedFault, self).__init__(
"{0}=[name={1}, version={2}]".format(_type.__class__.__name__, _type.name, _type.version),
"com.amazonaws.swf.base.model#TypeDeprecatedFault")
class SWFWorkflowExecutionAlreadyStartedFault(JSONResponseError):
def __init__(self):
super(SWFWorkflowExecutionAlreadyStartedFault, self).__init__(
400, "Bad Request",
body={"__type": "com.amazonaws.swf.base.model#WorkflowExecutionAlreadyStartedFault"}
)
class SWFDefaultUndefinedFault(SWFClientError):
def __init__(self, key):
# TODO: move that into moto.core.utils maybe?
words = key.split("_")
key_camel_case = words.pop(0)
for word in words:
key_camel_case += word.capitalize()
super(SWFDefaultUndefinedFault, self).__init__(
key_camel_case, "com.amazonaws.swf.base.model#DefaultUndefinedFault"
)
class SWFValidationException(SWFClientError):
def __init__(self, message):
super(SWFValidationException, self).__init__(
message,
"com.amazon.coral.validate#ValidationException"
)
class SWFDecisionValidationException(SWFClientError):
def __init__(self, problems):
# messages
messages = []
for pb in problems:
if pb["type"] == "null_value":
messages.append(
"Value null at '%(where)s' failed to satisfy constraint: "\
"Member must not be null" % pb
)
elif pb["type"] == "bad_decision_type":
messages.append(
"Value '%(value)s' at '%(where)s' failed to satisfy constraint: " \
"Member must satisfy enum value set: " \
"[%(possible_values)s]" % pb
)
else:
raise ValueError(
"Unhandled decision constraint type: {0}".format(pb["type"])
)
# prefix
count = len(problems)
if count < 2:
prefix = "{0} validation error detected: "
else:
prefix = "{0} validation errors detected: "
super(SWFDecisionValidationException, self).__init__(
prefix.format(count) + "; ".join(messages),
"com.amazon.coral.validate#ValidationException"
)
class SWFWorkflowExecutionClosedError(Exception):
def __str__(self):
return repr("Cannot change this object because the WorkflowExecution is closed")

337
moto/swf/models/__init__.py Normal file
View File

@ -0,0 +1,337 @@
from __future__ import unicode_literals
import six
import boto.swf
from moto.core import BaseBackend
from ..exceptions import (
SWFUnknownResourceFault,
SWFDomainAlreadyExistsFault,
SWFDomainDeprecatedFault,
SWFTypeAlreadyExistsFault,
SWFTypeDeprecatedFault,
SWFValidationException,
)
from .activity_task import ActivityTask
from .activity_type import ActivityType
from .decision_task import DecisionTask
from .domain import Domain
from .generic_type import GenericType
from .history_event import HistoryEvent
from .timeout import Timeout
from .workflow_type import WorkflowType
from .workflow_execution import WorkflowExecution
KNOWN_SWF_TYPES = {
"activity": ActivityType,
"workflow": WorkflowType,
}
class SWFBackend(BaseBackend):
def __init__(self, region_name):
self.region_name = region_name
self.domains = []
super(SWFBackend, self).__init__()
def reset(self):
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def _get_domain(self, name, ignore_empty=False):
matching = [domain for domain in self.domains if domain.name == name]
if not matching and not ignore_empty:
raise SWFUnknownResourceFault("domain", name)
if matching:
return matching[0]
return None
def _process_timeouts(self):
for domain in self.domains:
for wfe in domain.workflow_executions:
wfe._process_timeouts()
def list_domains(self, status, reverse_order=None):
domains = [domain for domain in self.domains
if domain.status == status]
domains = sorted(domains, key=lambda domain: domain.name)
if reverse_order:
domains = reversed(domains)
return domains
def register_domain(self, name, workflow_execution_retention_period_in_days,
description=None):
if self._get_domain(name, ignore_empty=True):
raise SWFDomainAlreadyExistsFault(name)
domain = Domain(name, workflow_execution_retention_period_in_days,
description)
self.domains.append(domain)
def deprecate_domain(self, name):
domain = self._get_domain(name)
if domain.status == "DEPRECATED":
raise SWFDomainDeprecatedFault(name)
domain.status = "DEPRECATED"
def describe_domain(self, name):
return self._get_domain(name)
def list_types(self, kind, domain_name, status, reverse_order=None):
domain = self._get_domain(domain_name)
_types = domain.find_types(kind, status)
_types = sorted(_types, key=lambda domain: domain.name)
if reverse_order:
_types = reversed(_types)
return _types
def register_type(self, kind, domain_name, name, version, **kwargs):
domain = self._get_domain(domain_name)
_type = domain.get_type(kind, name, version, ignore_empty=True)
if _type:
raise SWFTypeAlreadyExistsFault(_type)
_class = KNOWN_SWF_TYPES[kind]
_type = _class(name, version, **kwargs)
domain.add_type(_type)
def deprecate_type(self, kind, domain_name, name, version):
domain = self._get_domain(domain_name)
_type = domain.get_type(kind, name, version)
if _type.status == "DEPRECATED":
raise SWFTypeDeprecatedFault(_type)
_type.status = "DEPRECATED"
def describe_type(self, kind, domain_name, name, version):
domain = self._get_domain(domain_name)
return domain.get_type(kind, name, version)
def start_workflow_execution(self, domain_name, workflow_id,
workflow_name, workflow_version,
tag_list=None, **kwargs):
domain = self._get_domain(domain_name)
wf_type = domain.get_type("workflow", workflow_name, workflow_version)
if wf_type.status == "DEPRECATED":
raise SWFTypeDeprecatedFault(wf_type)
wfe = WorkflowExecution(domain, wf_type, workflow_id,
tag_list=tag_list, **kwargs)
domain.add_workflow_execution(wfe)
wfe.start()
return wfe
def describe_workflow_execution(self, domain_name, run_id, workflow_id):
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
return domain.get_workflow_execution(workflow_id, run_id=run_id)
def poll_for_decision_task(self, domain_name, task_list, identity=None):
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
# Real SWF cases:
# - case 1: there's a decision task to return, return it
# - case 2: there's no decision task to return, so wait for timeout
# and if a new decision is schedule, start and return it
# - case 3: timeout reached, no decision, return an empty decision
# (e.g. a decision with an empty "taskToken")
#
# For the sake of simplicity, we forget case 2 for now, so either
# there's a DecisionTask to return, either we return a blank one.
#
# SWF client libraries should cope with that easily as long as tests
# aren't distributed.
#
# TODO: handle long polling (case 2) for decision tasks
candidates = []
for _task_list, tasks in domain.decision_task_lists.items():
if _task_list == task_list:
candidates += [t for t in tasks if t.state == "SCHEDULED"]
if any(candidates):
# TODO: handle task priorities (but not supported by boto for now)
task = min(candidates, key=lambda d: d.scheduled_at)
wfe = task.workflow_execution
wfe.start_decision_task(task.task_token, identity=identity)
return task
else:
return None
def count_pending_decision_tasks(self, domain_name, task_list):
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
count = 0
for wfe in domain.workflow_executions:
if wfe.task_list == task_list:
count += wfe.open_counts["openDecisionTasks"]
return count
def respond_decision_task_completed(self, task_token,
decisions=None,
execution_context=None):
# process timeouts on all objects
self._process_timeouts()
# let's find decision task
decision_task = None
for domain in self.domains:
for wfe in domain.workflow_executions:
for dt in wfe.decision_tasks:
if dt.task_token == task_token:
decision_task = dt
# no decision task found
if not decision_task:
# In the real world, SWF distinguishes an obviously invalid token and a
# token that has no corresponding decision task. For the latter it seems
# to wait until a task with that token comes up (which looks like a smart
# choice in an eventually-consistent system). The call doesn't seem to
# timeout shortly, it takes 3 or 4 minutes to result in:
# BotoServerError: 500 Internal Server Error
# {"__type":"com.amazon.coral.service#InternalFailure"}
# This behavior is not documented clearly in SWF docs and we'll ignore it
# in moto, as there is no obvious reason to rely on it in tests.
raise SWFValidationException("Invalid token")
# decision task found, but WorflowExecution is CLOSED
wfe = decision_task.workflow_execution
if not wfe.open:
raise SWFUnknownResourceFault(
"execution",
"WorkflowExecution=[workflowId={0}, runId={1}]".format(
wfe.workflow_id, wfe.run_id
)
)
# decision task found, but already completed
if decision_task.state != "STARTED":
if decision_task.state == "COMPLETED":
raise SWFUnknownResourceFault(
"decision task, scheduledEventId = {0}".format(decision_task.scheduled_event_id)
)
else:
raise ValueError(
"This shouldn't happen: you have to PollForDecisionTask to get a token, "
"which changes DecisionTask status to 'STARTED' ; then it can only change "
"to 'COMPLETED'. If you didn't hack moto/swf internals, this is probably "
"a bug in moto, please report it, thanks!"
)
# everything's good
if decision_task:
wfe = decision_task.workflow_execution
wfe.complete_decision_task(decision_task.task_token,
decisions=decisions,
execution_context=execution_context)
def poll_for_activity_task(self, domain_name, task_list, identity=None):
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
# Real SWF cases:
# - case 1: there's an activity task to return, return it
# - case 2: there's no activity task to return, so wait for timeout
# and if a new activity is scheduled, return it
# - case 3: timeout reached, no activity task, return an empty response
# (e.g. a response with an empty "taskToken")
#
# For the sake of simplicity, we forget case 2 for now, so either
# there's an ActivityTask to return, either we return a blank one.
#
# SWF client libraries should cope with that easily as long as tests
# aren't distributed.
#
# TODO: handle long polling (case 2) for activity tasks
candidates = []
for _task_list, tasks in domain.activity_task_lists.items():
if _task_list == task_list:
candidates += [t for t in tasks if t.state == "SCHEDULED"]
if any(candidates):
# TODO: handle task priorities (but not supported by boto for now)
task = min(candidates, key=lambda d: d.scheduled_at)
wfe = task.workflow_execution
wfe.start_activity_task(task.task_token, identity=identity)
return task
else:
return None
def count_pending_activity_tasks(self, domain_name, task_list):
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
count = 0
for _task_list, tasks in domain.activity_task_lists.items():
if _task_list == task_list:
pending = [t for t in tasks if t.state in ["SCHEDULED", "STARTED"]]
count += len(pending)
return count
def _find_activity_task_from_token(self, task_token):
activity_task = None
for domain in self.domains:
for wfe in domain.workflow_executions:
for task in wfe.activity_tasks:
if task.task_token == task_token:
activity_task = task
# no task found
if not activity_task:
# Same as for decision tasks, we raise an invalid token BOTH for clearly
# wrong SWF tokens and OK tokens but not used correctly. This should not
# be a problem in moto.
raise SWFValidationException("Invalid token")
# activity task found, but WorflowExecution is CLOSED
wfe = activity_task.workflow_execution
if not wfe.open:
raise SWFUnknownResourceFault(
"execution",
"WorkflowExecution=[workflowId={0}, runId={1}]".format(
wfe.workflow_id, wfe.run_id
)
)
# activity task found, but already completed
if activity_task.state != "STARTED":
if activity_task.state == "COMPLETED":
raise SWFUnknownResourceFault(
"activity, scheduledEventId = {0}".format(activity_task.scheduled_event_id)
)
else:
raise ValueError(
"This shouldn't happen: you have to PollForActivityTask to get a token, "
"which changes ActivityTask status to 'STARTED' ; then it can only change "
"to 'COMPLETED'. If you didn't hack moto/swf internals, this is probably "
"a bug in moto, please report it, thanks!"
)
# everything's good
return activity_task
def respond_activity_task_completed(self, task_token, result=None):
# process timeouts on all objects
self._process_timeouts()
activity_task = self._find_activity_task_from_token(task_token)
wfe = activity_task.workflow_execution
wfe.complete_activity_task(activity_task.task_token, result=result)
def respond_activity_task_failed(self, task_token, reason=None, details=None):
# process timeouts on all objects
self._process_timeouts()
activity_task = self._find_activity_task_from_token(task_token)
wfe = activity_task.workflow_execution
wfe.fail_activity_task(activity_task.task_token, reason=reason, details=details)
def terminate_workflow_execution(self, domain_name, workflow_id, child_policy=None,
details=None, reason=None, run_id=None):
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
wfe = domain.get_workflow_execution(workflow_id, run_id=run_id, raise_if_closed=True)
wfe.terminate(child_policy=child_policy, details=details, reason=reason)
def record_activity_task_heartbeat(self, task_token, details=None):
# process timeouts on all objects
self._process_timeouts()
activity_task = self._find_activity_task_from_token(task_token)
activity_task.reset_heartbeat_clock()
if details:
activity_task.details = details
swf_backends = {}
for region in boto.swf.regions():
swf_backends[region.name] = SWFBackend(region.name)

View File

@ -0,0 +1,84 @@
from __future__ import unicode_literals
from datetime import datetime
import uuid
from ..exceptions import SWFWorkflowExecutionClosedError
from ..utils import now_timestamp
from .timeout import Timeout
class ActivityTask(object):
def __init__(self, activity_id, activity_type, scheduled_event_id,
workflow_execution, timeouts, input=None):
self.activity_id = activity_id
self.activity_type = activity_type
self.details = None
self.input = input
self.last_heartbeat_timestamp = now_timestamp()
self.scheduled_event_id = scheduled_event_id
self.started_event_id = None
self.state = "SCHEDULED"
self.task_token = str(uuid.uuid4())
self.timeouts = timeouts
self.timeout_type = None
self.workflow_execution = workflow_execution
# this is *not* necessarily coherent with workflow execution history,
# but that shouldn't be a problem for tests
self.scheduled_at = datetime.utcnow()
def _check_workflow_execution_open(self):
if not self.workflow_execution.open:
raise SWFWorkflowExecutionClosedError()
@property
def open(self):
return self.state in ["SCHEDULED", "STARTED"]
def to_full_dict(self):
hsh = {
"activityId": self.activity_id,
"activityType": self.activity_type.to_short_dict(),
"taskToken": self.task_token,
"startedEventId": self.started_event_id,
"workflowExecution": self.workflow_execution.to_short_dict(),
}
if self.input:
hsh["input"] = self.input
return hsh
def start(self, started_event_id):
self.state = "STARTED"
self.started_event_id = started_event_id
def complete(self):
self._check_workflow_execution_open()
self.state = "COMPLETED"
def fail(self):
self._check_workflow_execution_open()
self.state = "FAILED"
def reset_heartbeat_clock(self):
self.last_heartbeat_timestamp = now_timestamp()
def first_timeout(self):
if not self.open or not self.workflow_execution.open:
return None
# TODO: handle the "NONE" case
heartbeat_timeout_at = self.last_heartbeat_timestamp + \
int(self.timeouts["heartbeatTimeout"])
_timeout = Timeout(self, heartbeat_timeout_at, "HEARTBEAT")
if _timeout.reached:
return _timeout
def process_timeouts(self):
_timeout = self.first_timeout()
if _timeout:
self.timeout(_timeout)
def timeout(self, _timeout):
self._check_workflow_execution_open()
self.state = "TIMED_OUT"
self.timeout_type = _timeout.kind

View File

@ -0,0 +1,16 @@
from .generic_type import GenericType
class ActivityType(GenericType):
@property
def _configuration_keys(self):
return [
"defaultTaskHeartbeatTimeout",
"defaultTaskScheduleToCloseTimeout",
"defaultTaskScheduleToStartTimeout",
"defaultTaskStartToCloseTimeout",
]
@property
def kind(self):
return "activity"

View File

@ -0,0 +1,76 @@
from __future__ import unicode_literals
from datetime import datetime
import uuid
from ..exceptions import SWFWorkflowExecutionClosedError
from ..utils import now_timestamp
from .timeout import Timeout
class DecisionTask(object):
def __init__(self, workflow_execution, scheduled_event_id):
self.workflow_execution = workflow_execution
self.workflow_type = workflow_execution.workflow_type
self.task_token = str(uuid.uuid4())
self.scheduled_event_id = scheduled_event_id
self.previous_started_event_id = 0
self.started_event_id = None
self.started_timestamp = None
self.start_to_close_timeout = self.workflow_execution.task_start_to_close_timeout
self.state = "SCHEDULED"
# this is *not* necessarily coherent with workflow execution history,
# but that shouldn't be a problem for tests
self.scheduled_at = datetime.utcnow()
self.timeout_type = None
@property
def started(self):
return self.state == "STARTED"
def _check_workflow_execution_open(self):
if not self.workflow_execution.open:
raise SWFWorkflowExecutionClosedError()
def to_full_dict(self, reverse_order=False):
events = self.workflow_execution.events(reverse_order=reverse_order)
hsh = {
"events": [
evt.to_dict() for evt in events
],
"taskToken": self.task_token,
"previousStartedEventId": self.previous_started_event_id,
"workflowExecution": self.workflow_execution.to_short_dict(),
"workflowType": self.workflow_type.to_short_dict(),
}
if self.started_event_id:
hsh["startedEventId"] = self.started_event_id
return hsh
def start(self, started_event_id):
self.state = "STARTED"
self.started_timestamp = now_timestamp()
self.started_event_id = started_event_id
def complete(self):
self._check_workflow_execution_open()
self.state = "COMPLETED"
def first_timeout(self):
if not self.started or not self.workflow_execution.open:
return None
# TODO: handle the "NONE" case
start_to_close_at = self.started_timestamp + int(self.start_to_close_timeout)
_timeout = Timeout(self, start_to_close_at, "START_TO_CLOSE")
if _timeout.reached:
return _timeout
def process_timeouts(self):
_timeout = self.first_timeout()
if _timeout:
self.timeout(_timeout)
def timeout(self, _timeout):
self._check_workflow_execution_open()
self.state = "TIMED_OUT"
self.timeout_type = _timeout.kind

124
moto/swf/models/domain.py Normal file
View File

@ -0,0 +1,124 @@
from __future__ import unicode_literals
from collections import defaultdict
from ..exceptions import (
SWFUnknownResourceFault,
SWFWorkflowExecutionAlreadyStartedFault,
)
class Domain(object):
def __init__(self, name, retention, description=None):
self.name = name
self.retention = retention
self.description = description
self.status = "REGISTERED"
self.types = {
"activity": defaultdict(dict),
"workflow": defaultdict(dict),
}
# Workflow executions have an id, which unicity is guaranteed
# at domain level (not super clear in the docs, but I checked
# that against SWF API) ; hence the storage method as a dict
# of "workflow_id (client determined)" => WorkflowExecution()
# here.
self.workflow_executions = []
self.activity_task_lists = {}
self.decision_task_lists = {}
def __repr__(self):
return "Domain(name: %(name)s, status: %(status)s)" % self.__dict__
def to_short_dict(self):
hsh = {
"name": self.name,
"status": self.status,
}
if self.description:
hsh["description"] = self.description
return hsh
def to_full_dict(self):
return {
"domainInfo": self.to_short_dict(),
"configuration": {
"workflowExecutionRetentionPeriodInDays": self.retention,
}
}
def get_type(self, kind, name, version, ignore_empty=False):
try:
return self.types[kind][name][version]
except KeyError:
if not ignore_empty:
raise SWFUnknownResourceFault(
"type",
"{0}Type=[name={1}, version={2}]".format(
kind.capitalize(), name, version
)
)
def add_type(self, _type):
self.types[_type.kind][_type.name][_type.version] = _type
def find_types(self, kind, status):
_all = []
for family in self.types[kind].values():
for _type in family.values():
if _type.status == status:
_all.append(_type)
return _all
def add_workflow_execution(self, workflow_execution):
_id = workflow_execution.workflow_id
if self.get_workflow_execution(_id, raise_if_none=False):
raise SWFWorkflowExecutionAlreadyStartedFault()
self.workflow_executions.append(workflow_execution)
def get_workflow_execution(self, workflow_id, run_id=None,
raise_if_none=True, raise_if_closed=False):
# query
if run_id:
_all = [w for w in self.workflow_executions
if w.workflow_id == workflow_id and w.run_id == run_id]
else:
_all = [w for w in self.workflow_executions
if w.workflow_id == workflow_id and w.open]
# reduce
wfe = _all[0] if _all else None
# raise if closed / none
if raise_if_closed and wfe and wfe.execution_status == "CLOSED":
wfe = None
if not wfe and raise_if_none:
if run_id:
args = ["execution", "WorkflowExecution=[workflowId={0}, runId={1}]".format(
workflow_id, run_id)]
else:
args = ["execution, workflowId = {0}".format(workflow_id)]
raise SWFUnknownResourceFault(*args)
# at last return workflow execution
return wfe
def add_to_activity_task_list(self, task_list, obj):
if not task_list in self.activity_task_lists:
self.activity_task_lists[task_list] = []
self.activity_task_lists[task_list].append(obj)
@property
def activity_tasks(self):
_all = []
for tasks in self.activity_task_lists.values():
_all += tasks
return _all
def add_to_decision_task_list(self, task_list, obj):
if not task_list in self.decision_task_lists:
self.decision_task_lists[task_list] = []
self.decision_task_lists[task_list].append(obj)
@property
def decision_tasks(self):
_all = []
for tasks in self.decision_task_lists.values():
_all += tasks
return _all

View File

@ -0,0 +1,66 @@
from __future__ import unicode_literals
from moto.core.utils import camelcase_to_underscores
class GenericType(object):
def __init__(self, name, version, **kwargs):
self.name = name
self.version = version
self.status = "REGISTERED"
if "description" in kwargs:
self.description = kwargs.pop("description")
for key, value in kwargs.items():
self.__setattr__(key, value)
# default values set to none
for key in self._configuration_keys:
attr = camelcase_to_underscores(key)
if not hasattr(self, attr):
self.__setattr__(attr, None)
if not hasattr(self, "task_list"):
self.task_list = None
def __repr__(self):
cls = self.__class__.__name__
attrs = "name: %(name)s, version: %(version)s, status: %(status)s" % self.__dict__
return "{0}({1})".format(cls, attrs)
@property
def kind(self):
raise NotImplementedError()
@property
def _configuration_keys(self):
raise NotImplementedError()
def to_short_dict(self):
return {
"name": self.name,
"version": self.version,
}
def to_medium_dict(self):
hsh = {
"{0}Type".format(self.kind): self.to_short_dict(),
"creationDate": 1420066800,
"status": self.status,
}
if self.status == "DEPRECATED":
hsh["deprecationDate"] = 1422745200
if hasattr(self, "description"):
hsh["description"] = self.description
return hsh
def to_full_dict(self):
hsh = {
"typeInfo": self.to_medium_dict(),
"configuration": {}
}
if self.task_list:
hsh["configuration"]["defaultTaskList"] = {"name": self.task_list}
for key in self._configuration_keys:
attr = camelcase_to_underscores(key)
if not getattr(self, attr):
continue
hsh["configuration"][key] = getattr(self, attr)
return hsh

View File

@ -0,0 +1,66 @@
from __future__ import unicode_literals
from datetime import datetime
from time import mktime
from moto.core.utils import underscores_to_camelcase
from ..utils import decapitalize, now_timestamp
# We keep track of which history event types we support
# so that we'll be able to catch specific formatting
# for new events if needed.
SUPPORTED_HISTORY_EVENT_TYPES = (
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"DecisionTaskCompleted",
"WorkflowExecutionCompleted",
"WorkflowExecutionFailed",
"ActivityTaskScheduled",
"ScheduleActivityTaskFailed",
"ActivityTaskStarted",
"ActivityTaskCompleted",
"ActivityTaskFailed",
"WorkflowExecutionTerminated",
"ActivityTaskTimedOut",
"DecisionTaskTimedOut",
"WorkflowExecutionTimedOut",
)
class HistoryEvent(object):
def __init__(self, event_id, event_type, event_timestamp=None, **kwargs):
if event_type not in SUPPORTED_HISTORY_EVENT_TYPES:
raise NotImplementedError(
"HistoryEvent does not implement attributes for type '{0}'".format(event_type)
)
self.event_id = event_id
self.event_type = event_type
if event_timestamp:
self.event_timestamp = event_timestamp
else:
self.event_timestamp = now_timestamp()
# pre-populate a dict: {"camelCaseKey": value}
self.event_attributes = {}
for key, value in kwargs.items():
if value:
camel_key = underscores_to_camelcase(key)
if key == "task_list":
value = { "name": value }
elif key == "workflow_type":
value = { "name": value.name, "version": value.version }
elif key == "activity_type":
value = value.to_short_dict()
self.event_attributes[camel_key] = value
def to_dict(self):
return {
"eventId": self.event_id,
"eventType": self.event_type,
"eventTimestamp": self.event_timestamp,
self._attributes_key(): self.event_attributes
}
def _attributes_key(self):
key = "{0}EventAttributes".format(self.event_type)
return decapitalize(key)

View File

@ -0,0 +1,12 @@
from ..utils import now_timestamp
class Timeout(object):
def __init__(self, obj, timestamp, kind):
self.obj = obj
self.timestamp = timestamp
self.kind = kind
@property
def reached(self):
return now_timestamp() >= self.timestamp

View File

@ -0,0 +1,616 @@
from __future__ import unicode_literals
from datetime import datetime
from time import mktime
import uuid
from moto.core.utils import camelcase_to_underscores
from ..constants import (
DECISIONS_FIELDS,
)
from ..exceptions import (
SWFDefaultUndefinedFault,
SWFValidationException,
SWFDecisionValidationException,
)
from ..utils import decapitalize, now_timestamp
from .activity_task import ActivityTask
from .activity_type import ActivityType
from .decision_task import DecisionTask
from .history_event import HistoryEvent
from .timeout import Timeout
# TODO: extract decision related logic into a Decision class
class WorkflowExecution(object):
# NB: the list is ordered exactly as in SWF validation exceptions so we can
# mimic error messages closely ; don't reorder it without checking SWF.
KNOWN_DECISION_TYPES = [
"CompleteWorkflowExecution",
"StartTimer",
"RequestCancelExternalWorkflowExecution",
"SignalExternalWorkflowExecution",
"CancelTimer",
"RecordMarker",
"ScheduleActivityTask",
"ContinueAsNewWorkflowExecution",
"ScheduleLambdaFunction",
"FailWorkflowExecution",
"RequestCancelActivityTask",
"StartChildWorkflowExecution",
"CancelWorkflowExecution"
]
def __init__(self, domain, workflow_type, workflow_id, **kwargs):
self.domain = domain
self.workflow_id = workflow_id
self.run_id = uuid.uuid4().hex
# WorkflowExecutionInfo
self.cancel_requested = False
# TODO: check valid values among:
# COMPLETED | FAILED | CANCELED | TERMINATED | CONTINUED_AS_NEW | TIMED_OUT
# TODO: implement them all
self.close_cause = None
self.close_status = None
self.close_timestamp = None
self.execution_status = "OPEN"
self.latest_activity_task_timestamp = None
self.latest_execution_context = None
self.parent = None
self.start_timestamp = None
self.tag_list = [] # TODO
self.timeout_type = None
self.workflow_type = workflow_type
# args processing
# NB: the order follows boto/SWF order of exceptions appearance (if no
# param is set, # SWF will raise DefaultUndefinedFault errors in the
# same order as the few lines that follow)
self._set_from_kwargs_or_workflow_type(kwargs, "execution_start_to_close_timeout")
self._set_from_kwargs_or_workflow_type(kwargs, "task_list", "task_list")
self._set_from_kwargs_or_workflow_type(kwargs, "task_start_to_close_timeout")
self._set_from_kwargs_or_workflow_type(kwargs, "child_policy")
self.input = kwargs.get("input")
# counters
self.open_counts = {
"openTimers": 0,
"openDecisionTasks": 0,
"openActivityTasks": 0,
"openChildWorkflowExecutions": 0,
"openLambdaFunctions": 0,
}
# events
self._events = []
# child workflows
self.child_workflow_executions = []
def __repr__(self):
return "WorkflowExecution(run_id: {0})".format(self.run_id)
def _set_from_kwargs_or_workflow_type(self, kwargs, local_key, workflow_type_key=None):
if workflow_type_key is None:
workflow_type_key = "default_"+local_key
value = kwargs.get(local_key)
if not value and hasattr(self.workflow_type, workflow_type_key):
value = getattr(self.workflow_type, workflow_type_key)
if not value:
raise SWFDefaultUndefinedFault(local_key)
setattr(self, local_key, value)
@property
def _configuration_keys(self):
return [
"executionStartToCloseTimeout",
"childPolicy",
"taskPriority",
"taskStartToCloseTimeout",
]
def to_short_dict(self):
return {
"workflowId": self.workflow_id,
"runId": self.run_id
}
def to_medium_dict(self):
hsh = {
"execution": self.to_short_dict(),
"workflowType": self.workflow_type.to_short_dict(),
"startTimestamp": 1420066800.123,
"executionStatus": self.execution_status,
"cancelRequested": self.cancel_requested,
}
if hasattr(self, "tag_list") and self.tag_list:
hsh["tagList"] = self.tag_list
return hsh
def to_full_dict(self):
hsh = {
"executionInfo": self.to_medium_dict(),
"executionConfiguration": {
"taskList": {"name": self.task_list}
}
}
#configuration
for key in self._configuration_keys:
attr = camelcase_to_underscores(key)
if not hasattr(self, attr):
continue
if not getattr(self, attr):
continue
hsh["executionConfiguration"][key] = getattr(self, attr)
#counters
hsh["openCounts"] = self.open_counts
#latest things
if self.latest_execution_context:
hsh["latestExecutionContext"] = self.latest_execution_context
if self.latest_activity_task_timestamp:
hsh["latestActivityTaskTimestamp"] = self.latest_activity_task_timestamp
return hsh
def _process_timeouts(self):
"""
SWF timeouts can happen on different objects (workflow executions,
activity tasks, decision tasks) and should be processed in order.
A specific timeout can change the workflow execution state and have an
impact on other timeouts: for instance, if the workflow execution
timeouts, subsequent timeouts on activity or decision tasks are
irrelevant ; if an activity task timeouts, other timeouts on this task
are irrelevant, and a new decision is fired, which could well timeout
before the end of the workflow.
So the idea here is to find the earliest timeout that would have been
triggered, process it, then make the workflow state progress and repeat
the whole process.
"""
timeout_candidates = []
# workflow execution timeout
timeout_candidates.append(self.first_timeout())
# decision tasks timeouts
for task in self.decision_tasks:
timeout_candidates.append(task.first_timeout())
# activity tasks timeouts
for task in self.activity_tasks:
timeout_candidates.append(task.first_timeout())
# remove blank values (foo.first_timeout() is a Timeout or None)
timeout_candidates = list(filter(None, timeout_candidates))
# now find the first timeout to process
first_timeout = None
if timeout_candidates:
first_timeout = min(
timeout_candidates,
key=lambda t: t.timestamp
)
if first_timeout:
should_schedule_decision_next = False
if isinstance(first_timeout.obj, WorkflowExecution):
self.timeout(first_timeout)
elif isinstance(first_timeout.obj, DecisionTask):
self.timeout_decision_task(first_timeout)
should_schedule_decision_next = True
elif isinstance(first_timeout.obj, ActivityTask):
self.timeout_activity_task(first_timeout)
should_schedule_decision_next = True
else:
raise NotImplementedError("Unhandled timeout object")
# schedule decision task if needed
if should_schedule_decision_next:
self.schedule_decision_task()
# the workflow execution progressed, let's see if another
# timeout should be processed
self._process_timeouts()
def events(self, reverse_order=False):
if reverse_order:
return reversed(self._events)
else:
return self._events
def next_event_id(self):
event_ids = [evt.event_id for evt in self._events]
return max(event_ids or [0]) + 1
def _add_event(self, *args, **kwargs):
evt = HistoryEvent(self.next_event_id(), *args, **kwargs)
self._events.append(evt)
return evt
def start(self):
self.start_timestamp = now_timestamp()
self._add_event(
"WorkflowExecutionStarted",
child_policy=self.child_policy,
execution_start_to_close_timeout=self.execution_start_to_close_timeout,
# TODO: fix this hardcoded value
parent_initiated_event_id=0,
task_list=self.task_list,
task_start_to_close_timeout=self.task_start_to_close_timeout,
workflow_type=self.workflow_type,
)
self.schedule_decision_task()
def _schedule_decision_task(self):
evt = self._add_event(
"DecisionTaskScheduled",
start_to_close_timeout=self.task_start_to_close_timeout,
task_list=self.task_list,
)
self.domain.add_to_decision_task_list(
self.task_list,
DecisionTask(self, evt.event_id),
)
self.open_counts["openDecisionTasks"] += 1
def schedule_decision_task(self):
self._schedule_decision_task()
# Shortcut for tests: helps having auto-starting decision tasks when needed
def schedule_and_start_decision_task(self, identity=None):
self._schedule_decision_task()
decision_task = self.decision_tasks[-1]
self.start_decision_task(decision_task.task_token, identity=identity)
@property
def decision_tasks(self):
return [t for t in self.domain.decision_tasks
if t.workflow_execution == self]
@property
def activity_tasks(self):
return [t for t in self.domain.activity_tasks
if t.workflow_execution == self]
def _find_decision_task(self, task_token):
for dt in self.decision_tasks:
if dt.task_token == task_token:
return dt
raise ValueError(
"No decision task with token: {0}".format(task_token)
)
def start_decision_task(self, task_token, identity=None):
dt = self._find_decision_task(task_token)
evt = self._add_event(
"DecisionTaskStarted",
scheduled_event_id=dt.scheduled_event_id,
identity=identity
)
dt.start(evt.event_id)
def complete_decision_task(self, task_token, decisions=None, execution_context=None):
# 'decisions' can be None per boto.swf defaults, so replace it with something iterable
if not decisions:
decisions = []
# In case of a malformed or invalid decision task, SWF will raise an error and
# it won't perform any of the decisions in the decision set.
self.validate_decisions(decisions)
dt = self._find_decision_task(task_token)
evt = self._add_event(
"DecisionTaskCompleted",
scheduled_event_id=dt.scheduled_event_id,
started_event_id=dt.started_event_id,
execution_context=execution_context,
)
dt.complete()
self.should_schedule_decision_next = False
self.handle_decisions(evt.event_id, decisions)
if self.should_schedule_decision_next:
self.schedule_decision_task()
self.latest_execution_context = execution_context
def _check_decision_attributes(self, kind, value, decision_id):
problems = []
constraints = DECISIONS_FIELDS.get(kind, {})
for key, constraint in constraints.items():
if constraint["required"] and not value.get(key):
problems.append({
"type": "null_value",
"where": "decisions.{0}.member.{1}.{2}".format(
decision_id, kind, key
)
})
return problems
def validate_decisions(self, decisions):
"""
Performs some basic validations on decisions. The real SWF service
seems to break early and *not* process any decision if there's a
validation problem, such as a malformed decision for instance. I didn't
find an explicit documentation for that though, so criticisms welcome.
"""
problems = []
# check close decision is last
# (the real SWF service also works that way if you provide 2 close decision tasks)
for dcs in decisions[:-1]:
close_decision_types = [
"CompleteWorkflowExecution",
"FailWorkflowExecution",
"CancelWorkflowExecution",
]
if dcs["decisionType"] in close_decision_types:
raise SWFValidationException(
"Close must be last decision in list"
)
decision_number = 0
for dcs in decisions:
decision_number += 1
# check decision types mandatory attributes
# NB: the real SWF service seems to check attributes even for attributes list
# that are not in line with the decisionType, so we do the same
attrs_to_check = [d for d in dcs.keys() if d.endswith("DecisionAttributes")]
if dcs["decisionType"] in self.KNOWN_DECISION_TYPES:
decision_type = dcs["decisionType"]
decision_attr = "{0}DecisionAttributes".format(decapitalize(decision_type))
attrs_to_check.append(decision_attr)
for attr in attrs_to_check:
problems += self._check_decision_attributes(attr, dcs.get(attr, {}), decision_number)
# check decision type is correct
if dcs["decisionType"] not in self.KNOWN_DECISION_TYPES:
problems.append({
"type": "bad_decision_type",
"value": dcs["decisionType"],
"where": "decisions.{0}.member.decisionType".format(decision_number),
"possible_values": ", ".join(self.KNOWN_DECISION_TYPES),
})
# raise if any problem
if any(problems):
raise SWFDecisionValidationException(problems)
def handle_decisions(self, event_id, decisions):
"""
Handles a Decision according to SWF docs.
See: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_Decision.html
"""
# handle each decision separately, in order
for decision in decisions:
decision_type = decision["decisionType"]
attributes_key = "{0}DecisionAttributes".format(decapitalize(decision_type))
attributes = decision.get(attributes_key, {})
if decision_type == "CompleteWorkflowExecution":
self.complete(event_id, attributes.get("result"))
elif decision_type == "FailWorkflowExecution":
self.fail(event_id, attributes.get("details"), attributes.get("reason"))
elif decision_type == "ScheduleActivityTask":
self.schedule_activity_task(event_id, attributes)
else:
# TODO: implement Decision type: CancelTimer
# TODO: implement Decision type: CancelWorkflowExecution
# TODO: implement Decision type: ContinueAsNewWorkflowExecution
# TODO: implement Decision type: RecordMarker
# TODO: implement Decision type: RequestCancelActivityTask
# TODO: implement Decision type: RequestCancelExternalWorkflowExecution
# TODO: implement Decision type: ScheduleLambdaFunction
# TODO: implement Decision type: SignalExternalWorkflowExecution
# TODO: implement Decision type: StartChildWorkflowExecution
# TODO: implement Decision type: StartTimer
raise NotImplementedError("Cannot handle decision: {0}".format(decision_type))
# finally decrement counter if and only if everything went well
self.open_counts["openDecisionTasks"] -= 1
def complete(self, event_id, result=None):
self.execution_status = "CLOSED"
self.close_status = "COMPLETED"
self.close_timestamp = now_timestamp()
self._add_event(
"WorkflowExecutionCompleted",
decision_task_completed_event_id=event_id,
result=result,
)
def fail(self, event_id, details=None, reason=None):
# TODO: implement lenght constraints on details/reason
self.execution_status = "CLOSED"
self.close_status = "FAILED"
self.close_timestamp = now_timestamp()
self._add_event(
"WorkflowExecutionFailed",
decision_task_completed_event_id=event_id,
details=details,
reason=reason,
)
def schedule_activity_task(self, event_id, attributes):
# Helper function to avoid repeating ourselves in the next sections
def fail_schedule_activity_task(_type, _cause):
# TODO: implement other possible failure mode: OPEN_ACTIVITIES_LIMIT_EXCEEDED
# NB: some failure modes are not implemented and probably won't be implemented in
# the future, such as ACTIVITY_CREATION_RATE_EXCEEDED or OPERATION_NOT_PERMITTED
self._add_event(
"ScheduleActivityTaskFailed",
activity_id=attributes["activityId"],
activity_type=_type,
cause=_cause,
decision_task_completed_event_id=event_id,
)
self.should_schedule_decision_next = True
activity_type = self.domain.get_type(
"activity",
attributes["activityType"]["name"],
attributes["activityType"]["version"],
ignore_empty=True,
)
if not activity_type:
fake_type = ActivityType(attributes["activityType"]["name"],
attributes["activityType"]["version"])
fail_schedule_activity_task(fake_type,
"ACTIVITY_TYPE_DOES_NOT_EXIST")
return
if activity_type.status == "DEPRECATED":
fail_schedule_activity_task(activity_type,
"ACTIVITY_TYPE_DEPRECATED")
return
if any(at for at in self.activity_tasks if at.activity_id == attributes["activityId"]):
fail_schedule_activity_task(activity_type,
"ACTIVITY_ID_ALREADY_IN_USE")
return
# find task list or default task list, else fail
task_list = attributes.get("taskList", {}).get("name")
if not task_list and activity_type.task_list:
task_list = activity_type.task_list
if not task_list:
fail_schedule_activity_task(activity_type,
"DEFAULT_TASK_LIST_UNDEFINED")
return
# find timeouts or default timeout, else fail
timeouts = {}
for _type in ["scheduleToStartTimeout", "scheduleToCloseTimeout", "startToCloseTimeout", "heartbeatTimeout"]:
default_key = "default_task_"+camelcase_to_underscores(_type)
default_value = getattr(activity_type, default_key)
timeouts[_type] = attributes.get(_type, default_value)
if not timeouts[_type]:
error_key = default_key.replace("default_task_", "default_")
fail_schedule_activity_task(activity_type,
"{0}_UNDEFINED".format(error_key.upper()))
return
# Only add event and increment counters now that nothing went wrong
evt = self._add_event(
"ActivityTaskScheduled",
activity_id=attributes["activityId"],
activity_type=activity_type,
control=attributes.get("control"),
decision_task_completed_event_id=event_id,
heartbeat_timeout=attributes.get("heartbeatTimeout"),
input=attributes.get("input"),
schedule_to_close_timeout=attributes.get("scheduleToCloseTimeout"),
schedule_to_start_timeout=attributes.get("scheduleToStartTimeout"),
start_to_close_timeout=attributes.get("startToCloseTimeout"),
task_list=task_list,
task_priority=attributes.get("taskPriority"),
)
task = ActivityTask(
activity_id=attributes["activityId"],
activity_type=activity_type,
input=attributes.get("input"),
scheduled_event_id=evt.event_id,
workflow_execution=self,
timeouts=timeouts,
)
self.domain.add_to_activity_task_list(task_list, task)
self.open_counts["openActivityTasks"] += 1
self.latest_activity_task_timestamp = now_timestamp()
def _find_activity_task(self, task_token):
for task in self.activity_tasks:
if task.task_token == task_token:
return task
raise ValueError(
"No activity task with token: {0}".format(task_token)
)
def start_activity_task(self, task_token, identity=None):
task = self._find_activity_task(task_token)
evt = self._add_event(
"ActivityTaskStarted",
scheduled_event_id=task.scheduled_event_id,
identity=identity
)
task.start(evt.event_id)
def complete_activity_task(self, task_token, result=None):
task = self._find_activity_task(task_token)
self._add_event(
"ActivityTaskCompleted",
scheduled_event_id=task.scheduled_event_id,
started_event_id=task.started_event_id,
result=result,
)
task.complete()
self.open_counts["openActivityTasks"] -= 1
# TODO: ensure we don't schedule multiple decisions at the same time!
self.schedule_decision_task()
def fail_activity_task(self, task_token, reason=None, details=None):
task = self._find_activity_task(task_token)
self._add_event(
"ActivityTaskFailed",
scheduled_event_id=task.scheduled_event_id,
started_event_id=task.started_event_id,
reason=reason,
details=details,
)
task.fail()
self.open_counts["openActivityTasks"] -= 1
# TODO: ensure we don't schedule multiple decisions at the same time!
self.schedule_decision_task()
def terminate(self, child_policy=None, details=None, reason=None):
# TODO: handle child policy for child workflows here
# TODO: handle cause="CHILD_POLICY_APPLIED"
# Until this, we set cause manually to "OPERATOR_INITIATED"
cause = "OPERATOR_INITIATED"
if not child_policy:
child_policy = self.child_policy
self._add_event(
"WorkflowExecutionTerminated",
cause=cause,
child_policy=child_policy,
details=details,
reason=reason,
)
self.execution_status = "CLOSED"
self.close_status = "TERMINATED"
self.close_cause = "OPERATOR_INITIATED"
def first_timeout(self):
if not self.open or not self.start_timestamp:
return None
start_to_close_at = self.start_timestamp + int(self.execution_start_to_close_timeout)
_timeout = Timeout(self, start_to_close_at, "START_TO_CLOSE")
if _timeout.reached:
return _timeout
def timeout(self, timeout):
# TODO: process child policy on child workflows here or in the triggering function
self.execution_status = "CLOSED"
self.close_status = "TIMED_OUT"
self.timeout_type = timeout.kind
self._add_event(
"WorkflowExecutionTimedOut",
child_policy=self.child_policy,
event_timestamp=timeout.timestamp,
timeout_type=self.timeout_type,
)
def timeout_decision_task(self, _timeout):
task = _timeout.obj
task.timeout(_timeout)
self._add_event(
"DecisionTaskTimedOut",
event_timestamp=_timeout.timestamp,
scheduled_event_id=task.scheduled_event_id,
started_event_id=task.started_event_id,
timeout_type=task.timeout_type,
)
def timeout_activity_task(self, _timeout):
task = _timeout.obj
task.timeout(_timeout)
self._add_event(
"ActivityTaskTimedOut",
details=task.details,
event_timestamp=_timeout.timestamp,
scheduled_event_id=task.scheduled_event_id,
started_event_id=task.started_event_id,
timeout_type=task.timeout_type,
)
@property
def open(self):
return self.execution_status == "OPEN"

View File

@ -0,0 +1,15 @@
from .generic_type import GenericType
class WorkflowType(GenericType):
@property
def _configuration_keys(self):
return [
"defaultChildPolicy",
"defaultExecutionStartToCloseTimeout",
"defaultTaskStartToCloseTimeout",
]
@property
def kind(self):
return "workflow"

377
moto/swf/responses.py Normal file
View File

@ -0,0 +1,377 @@
import json
import six
from moto.core.responses import BaseResponse
from werkzeug.exceptions import HTTPException
from moto.core.utils import camelcase_to_underscores, method_names_from_class
from .exceptions import SWFSerializationException
from .models import swf_backends
class SWFResponse(BaseResponse):
@property
def swf_backend(self):
return swf_backends[self.region]
# SWF parameters are passed through a JSON body, so let's ease retrieval
@property
def _params(self):
return json.loads(self.body.decode("utf-8"))
def _check_none_or_string(self, parameter):
if parameter is not None:
self._check_string(parameter)
def _check_string(self, parameter):
if not isinstance(parameter, six.string_types):
raise SWFSerializationException(parameter)
def _check_none_or_list_of_strings(self, parameter):
if parameter is not None:
self._check_list_of_strings(parameter)
def _check_list_of_strings(self, parameter):
if not isinstance(parameter, list):
raise SWFSerializationException(parameter)
for i in parameter:
if not isinstance(i, six.string_types):
raise SWFSerializationException(parameter)
def _list_types(self, kind):
domain_name = self._params["domain"]
status = self._params["registrationStatus"]
reverse_order = self._params.get("reverseOrder", None)
self._check_string(domain_name)
self._check_string(status)
types = self.swf_backend.list_types(kind, domain_name, status, reverse_order=reverse_order)
return json.dumps({
"typeInfos": [_type.to_medium_dict() for _type in types]
})
def _describe_type(self, kind):
domain = self._params["domain"]
_type_args = self._params["{0}Type".format(kind)]
name = _type_args["name"]
version = _type_args["version"]
self._check_string(domain)
self._check_string(name)
self._check_string(version)
_type = self.swf_backend.describe_type(kind, domain, name, version)
return json.dumps(_type.to_full_dict())
def _deprecate_type(self, kind):
domain = self._params["domain"]
_type_args = self._params["{0}Type".format(kind)]
name = _type_args["name"]
version = _type_args["version"]
self._check_string(domain)
self._check_string(name)
self._check_string(version)
self.swf_backend.deprecate_type(kind, domain, name, version)
return ""
# TODO: implement pagination
def list_domains(self):
status = self._params["registrationStatus"]
self._check_string(status)
reverse_order = self._params.get("reverseOrder", None)
domains = self.swf_backend.list_domains(status, reverse_order=reverse_order)
return json.dumps({
"domainInfos": [domain.to_short_dict() for domain in domains]
})
def register_domain(self):
name = self._params["name"]
retention = self._params["workflowExecutionRetentionPeriodInDays"]
description = self._params.get("description")
self._check_string(retention)
self._check_string(name)
self._check_none_or_string(description)
domain = self.swf_backend.register_domain(name, retention,
description=description)
return ""
def deprecate_domain(self):
name = self._params["name"]
self._check_string(name)
domain = self.swf_backend.deprecate_domain(name)
return ""
def describe_domain(self):
name = self._params["name"]
self._check_string(name)
domain = self.swf_backend.describe_domain(name)
return json.dumps(domain.to_full_dict())
# TODO: implement pagination
def list_activity_types(self):
return self._list_types("activity")
def register_activity_type(self):
domain = self._params["domain"]
name = self._params["name"]
version = self._params["version"]
default_task_list = self._params.get("defaultTaskList")
if default_task_list:
task_list = default_task_list.get("name")
else:
task_list = None
default_task_heartbeat_timeout = self._params.get("defaultTaskHeartbeatTimeout")
default_task_schedule_to_close_timeout = self._params.get("defaultTaskScheduleToCloseTimeout")
default_task_schedule_to_start_timeout = self._params.get("defaultTaskScheduleToStartTimeout")
default_task_start_to_close_timeout = self._params.get("defaultTaskStartToCloseTimeout")
description = self._params.get("description")
self._check_string(domain)
self._check_string(name)
self._check_string(version)
self._check_none_or_string(task_list)
self._check_none_or_string(default_task_heartbeat_timeout)
self._check_none_or_string(default_task_schedule_to_close_timeout)
self._check_none_or_string(default_task_schedule_to_start_timeout)
self._check_none_or_string(default_task_start_to_close_timeout)
self._check_none_or_string(description)
# TODO: add defaultTaskPriority when boto gets to support it
activity_type = self.swf_backend.register_type(
"activity", domain, name, version, task_list=task_list,
default_task_heartbeat_timeout=default_task_heartbeat_timeout,
default_task_schedule_to_close_timeout=default_task_schedule_to_close_timeout,
default_task_schedule_to_start_timeout=default_task_schedule_to_start_timeout,
default_task_start_to_close_timeout=default_task_start_to_close_timeout,
description=description,
)
return ""
def deprecate_activity_type(self):
return self._deprecate_type("activity")
def describe_activity_type(self):
return self._describe_type("activity")
def list_workflow_types(self):
return self._list_types("workflow")
def register_workflow_type(self):
domain = self._params["domain"]
name = self._params["name"]
version = self._params["version"]
default_task_list = self._params.get("defaultTaskList")
if default_task_list:
task_list = default_task_list.get("name")
else:
task_list = None
default_child_policy = self._params.get("defaultChildPolicy")
default_task_start_to_close_timeout = self._params.get("defaultTaskStartToCloseTimeout")
default_execution_start_to_close_timeout = self._params.get("defaultExecutionStartToCloseTimeout")
description = self._params.get("description")
self._check_string(domain)
self._check_string(name)
self._check_string(version)
self._check_none_or_string(task_list)
self._check_none_or_string(default_child_policy)
self._check_none_or_string(default_task_start_to_close_timeout)
self._check_none_or_string(default_execution_start_to_close_timeout)
self._check_none_or_string(description)
# TODO: add defaultTaskPriority when boto gets to support it
# TODO: add defaultLambdaRole when boto gets to support it
workflow_type = self.swf_backend.register_type(
"workflow", domain, name, version, task_list=task_list,
default_child_policy=default_child_policy,
default_task_start_to_close_timeout=default_task_start_to_close_timeout,
default_execution_start_to_close_timeout=default_execution_start_to_close_timeout,
description=description,
)
return ""
def deprecate_workflow_type(self):
return self._deprecate_type("workflow")
def describe_workflow_type(self):
return self._describe_type("workflow")
def start_workflow_execution(self):
domain = self._params["domain"]
workflow_id = self._params["workflowId"]
_workflow_type = self._params["workflowType"]
workflow_name = _workflow_type["name"]
workflow_version = _workflow_type["version"]
_default_task_list = self._params.get("defaultTaskList")
if _default_task_list:
task_list = _default_task_list.get("name")
else:
task_list = None
child_policy = self._params.get("childPolicy")
execution_start_to_close_timeout = self._params.get("executionStartToCloseTimeout")
input_ = self._params.get("input")
tag_list = self._params.get("tagList")
task_start_to_close_timeout = self._params.get("taskStartToCloseTimeout")
self._check_string(domain)
self._check_string(workflow_id)
self._check_string(workflow_name)
self._check_string(workflow_version)
self._check_none_or_string(task_list)
self._check_none_or_string(child_policy)
self._check_none_or_string(execution_start_to_close_timeout)
self._check_none_or_string(input_)
self._check_none_or_list_of_strings(tag_list)
self._check_none_or_string(task_start_to_close_timeout)
wfe = self.swf_backend.start_workflow_execution(
domain, workflow_id, workflow_name, workflow_version,
task_list=task_list, child_policy=child_policy,
execution_start_to_close_timeout=execution_start_to_close_timeout,
input=input_, tag_list=tag_list,
task_start_to_close_timeout=task_start_to_close_timeout
)
return json.dumps({
"runId": wfe.run_id
})
def describe_workflow_execution(self):
domain_name = self._params["domain"]
_workflow_execution = self._params["execution"]
run_id = _workflow_execution["runId"]
workflow_id = _workflow_execution["workflowId"]
self._check_string(domain_name)
self._check_string(run_id)
self._check_string(workflow_id)
wfe = self.swf_backend.describe_workflow_execution(domain_name, run_id, workflow_id)
return json.dumps(wfe.to_full_dict())
def get_workflow_execution_history(self):
domain_name = self._params["domain"]
_workflow_execution = self._params["execution"]
run_id = _workflow_execution["runId"]
workflow_id = _workflow_execution["workflowId"]
reverse_order = self._params.get("reverseOrder", None)
wfe = self.swf_backend.describe_workflow_execution(domain_name, run_id, workflow_id)
events = wfe.events(reverse_order=reverse_order)
return json.dumps({
"events": [evt.to_dict() for evt in events]
})
def poll_for_decision_task(self):
domain_name = self._params["domain"]
task_list = self._params["taskList"]["name"]
identity = self._params.get("identity")
reverse_order = self._params.get("reverseOrder", None)
self._check_string(domain_name)
self._check_string(task_list)
decision = self.swf_backend.poll_for_decision_task(
domain_name, task_list, identity=identity
)
if decision:
return json.dumps(
decision.to_full_dict(reverse_order=reverse_order)
)
else:
return json.dumps({"previousStartedEventId": 0, "startedEventId": 0})
def count_pending_decision_tasks(self):
domain_name = self._params["domain"]
task_list = self._params["taskList"]["name"]
self._check_string(domain_name)
self._check_string(task_list)
count = self.swf_backend.count_pending_decision_tasks(domain_name, task_list)
return json.dumps({"count": count, "truncated": False})
def respond_decision_task_completed(self):
task_token = self._params["taskToken"]
execution_context = self._params.get("executionContext")
decisions = self._params.get("decisions")
self._check_string(task_token)
self._check_none_or_string(execution_context)
self.swf_backend.respond_decision_task_completed(
task_token, decisions=decisions, execution_context=execution_context
)
return ""
def poll_for_activity_task(self):
domain_name = self._params["domain"]
task_list = self._params["taskList"]["name"]
identity = self._params.get("identity")
self._check_string(domain_name)
self._check_string(task_list)
self._check_none_or_string(identity)
activity_task = self.swf_backend.poll_for_activity_task(
domain_name, task_list, identity=identity
)
if activity_task:
return json.dumps(
activity_task.to_full_dict()
)
else:
return json.dumps({"startedEventId": 0})
def count_pending_activity_tasks(self):
domain_name = self._params["domain"]
task_list = self._params["taskList"]["name"]
self._check_string(domain_name)
self._check_string(task_list)
count = self.swf_backend.count_pending_activity_tasks(domain_name, task_list)
return json.dumps({"count": count, "truncated": False})
def respond_activity_task_completed(self):
task_token = self._params["taskToken"]
result = self._params.get("result")
self._check_string(task_token)
self._check_none_or_string(result)
self.swf_backend.respond_activity_task_completed(
task_token, result=result
)
return ""
def respond_activity_task_failed(self):
task_token = self._params["taskToken"]
reason = self._params.get("reason")
details = self._params.get("details")
self._check_string(task_token)
# TODO: implement length limits on reason and details (common pb with client libs)
self._check_none_or_string(reason)
self._check_none_or_string(details)
self.swf_backend.respond_activity_task_failed(
task_token, reason=reason, details=details
)
return ""
def terminate_workflow_execution(self):
domain_name = self._params["domain"]
workflow_id = self._params["workflowId"]
child_policy = self._params.get("childPolicy")
details = self._params.get("details")
reason = self._params.get("reason")
run_id = self._params.get("runId")
self._check_string(domain_name)
self._check_string(workflow_id)
self._check_none_or_string(child_policy)
self._check_none_or_string(details)
self._check_none_or_string(reason)
self._check_none_or_string(run_id)
self.swf_backend.terminate_workflow_execution(
domain_name, workflow_id, child_policy=child_policy,
details=details, reason=reason, run_id=run_id
)
return ""
def record_activity_task_heartbeat(self):
task_token = self._params["taskToken"]
details = self._params.get("details")
self._check_string(task_token)
self._check_none_or_string(details)
self.swf_backend.record_activity_task_heartbeat(
task_token, details=details
)
# TODO: make it dynamic when we implement activity tasks cancellation
return json.dumps({"cancelRequested": False})

9
moto/swf/urls.py Normal file
View File

@ -0,0 +1,9 @@
from .responses import SWFResponse
url_bases = [
"https?://swf.(.+).amazonaws.com",
]
url_paths = {
'{0}/$': SWFResponse.dispatch,
}

9
moto/swf/utils.py Normal file
View File

@ -0,0 +1,9 @@
from datetime import datetime
from time import mktime
def decapitalize(key):
return key[0].lower() + key[1:]
def now_timestamp():
return float(mktime(datetime.utcnow().timetuple()))

View File

@ -0,0 +1,22 @@
from __future__ import unicode_literals
import sure
from moto.core.utils import camelcase_to_underscores, underscores_to_camelcase
def test_camelcase_to_underscores():
cases = {
"theNewAttribute": "the_new_attribute",
"attri bute With Space": "attribute_with_space",
"FirstLetterCapital": "first_letter_capital",
}
for arg, expected in cases.items():
camelcase_to_underscores(arg).should.equal(expected)
def test_underscores_to_camelcase():
cases = {
"the_new_attribute": "theNewAttribute",
}
for arg, expected in cases.items():
underscores_to_camelcase(arg).should.equal(expected)

View File

View File

View File

@ -0,0 +1,149 @@
from freezegun import freeze_time
from sure import expect
from moto.swf.exceptions import SWFWorkflowExecutionClosedError
from moto.swf.models import (
ActivityTask,
ActivityType,
Timeout,
)
from ..utils import (
ACTIVITY_TASK_TIMEOUTS,
make_workflow_execution,
process_first_timeout,
)
def test_activity_task_creation():
wfe = make_workflow_execution()
task = ActivityTask(
activity_id="my-activity-123",
activity_type="foo",
input="optional",
scheduled_event_id=117,
workflow_execution=wfe,
timeouts=ACTIVITY_TASK_TIMEOUTS,
)
task.workflow_execution.should.equal(wfe)
task.state.should.equal("SCHEDULED")
task.task_token.should_not.be.empty
task.started_event_id.should.be.none
task.start(123)
task.state.should.equal("STARTED")
task.started_event_id.should.equal(123)
task.complete()
task.state.should.equal("COMPLETED")
# NB: this doesn't make any sense for SWF, a task shouldn't go from a
# "COMPLETED" state to a "FAILED" one, but this is an internal state on our
# side and we don't care about invalid state transitions for now.
task.fail()
task.state.should.equal("FAILED")
def test_activity_task_full_dict_representation():
wfe = make_workflow_execution()
wft = wfe.workflow_type
at = ActivityTask(
activity_id="my-activity-123",
activity_type=ActivityType("foo", "v1.0"),
input="optional",
scheduled_event_id=117,
timeouts=ACTIVITY_TASK_TIMEOUTS,
workflow_execution=wfe,
)
at.start(1234)
fd = at.to_full_dict()
fd["activityId"].should.equal("my-activity-123")
fd["activityType"]["version"].should.equal("v1.0")
fd["input"].should.equal("optional")
fd["startedEventId"].should.equal(1234)
fd.should.contain("taskToken")
fd["workflowExecution"].should.equal(wfe.to_short_dict())
at.start(1234)
fd = at.to_full_dict()
fd["startedEventId"].should.equal(1234)
def test_activity_task_reset_heartbeat_clock():
wfe = make_workflow_execution()
with freeze_time("2015-01-01 12:00:00"):
task = ActivityTask(
activity_id="my-activity-123",
activity_type="foo",
input="optional",
scheduled_event_id=117,
timeouts=ACTIVITY_TASK_TIMEOUTS,
workflow_execution=wfe,
)
task.last_heartbeat_timestamp.should.equal(1420113600.0)
with freeze_time("2015-01-01 13:00:00"):
task.reset_heartbeat_clock()
task.last_heartbeat_timestamp.should.equal(1420117200.0)
def test_activity_task_first_timeout():
wfe = make_workflow_execution()
with freeze_time("2015-01-01 12:00:00"):
task = ActivityTask(
activity_id="my-activity-123",
activity_type="foo",
input="optional",
scheduled_event_id=117,
timeouts=ACTIVITY_TASK_TIMEOUTS,
workflow_execution=wfe,
)
task.first_timeout().should.be.none
# activity task timeout is 300s == 5mins
with freeze_time("2015-01-01 12:06:00"):
task.first_timeout().should.be.a(Timeout)
process_first_timeout(task)
task.state.should.equal("TIMED_OUT")
task.timeout_type.should.equal("HEARTBEAT")
def test_activity_task_cannot_timeout_on_closed_workflow_execution():
with freeze_time("2015-01-01 12:00:00"):
wfe = make_workflow_execution()
wfe.start()
with freeze_time("2015-01-01 13:58:00"):
task = ActivityTask(
activity_id="my-activity-123",
activity_type="foo",
input="optional",
scheduled_event_id=117,
timeouts=ACTIVITY_TASK_TIMEOUTS,
workflow_execution=wfe,
)
with freeze_time("2015-01-01 14:10:00"):
task.first_timeout().should.be.a(Timeout)
wfe.first_timeout().should.be.a(Timeout)
process_first_timeout(wfe)
task.first_timeout().should.be.none
def test_activity_task_cannot_change_state_on_closed_workflow_execution():
wfe = make_workflow_execution()
wfe.start()
task = ActivityTask(
activity_id="my-activity-123",
activity_type="foo",
input="optional",
scheduled_event_id=117,
timeouts=ACTIVITY_TASK_TIMEOUTS,
workflow_execution=wfe,
)
wfe.complete(123)
task.timeout.when.called_with(Timeout(task, 0, "foo")).should.throw(SWFWorkflowExecutionClosedError)
task.complete.when.called_with().should.throw(SWFWorkflowExecutionClosedError)
task.fail.when.called_with().should.throw(SWFWorkflowExecutionClosedError)

View File

@ -0,0 +1,74 @@
from freezegun import freeze_time
from sure import expect
from moto.swf.exceptions import SWFWorkflowExecutionClosedError
from moto.swf.models import DecisionTask, Timeout
from ..utils import make_workflow_execution, process_first_timeout
def test_decision_task_creation():
wfe = make_workflow_execution()
dt = DecisionTask(wfe, 123)
dt.workflow_execution.should.equal(wfe)
dt.state.should.equal("SCHEDULED")
dt.task_token.should_not.be.empty
dt.started_event_id.should.be.none
def test_decision_task_full_dict_representation():
wfe = make_workflow_execution()
wft = wfe.workflow_type
dt = DecisionTask(wfe, 123)
fd = dt.to_full_dict()
fd["events"].should.be.a("list")
fd["previousStartedEventId"].should.equal(0)
fd.should_not.contain("startedEventId")
fd.should.contain("taskToken")
fd["workflowExecution"].should.equal(wfe.to_short_dict())
fd["workflowType"].should.equal(wft.to_short_dict())
dt.start(1234)
fd = dt.to_full_dict()
fd["startedEventId"].should.equal(1234)
def test_decision_task_first_timeout():
wfe = make_workflow_execution()
dt = DecisionTask(wfe, 123)
dt.first_timeout().should.be.none
with freeze_time("2015-01-01 12:00:00"):
dt.start(1234)
dt.first_timeout().should.be.none
# activity task timeout is 300s == 5mins
with freeze_time("2015-01-01 12:06:00"):
dt.first_timeout().should.be.a(Timeout)
dt.complete()
dt.first_timeout().should.be.none
def test_decision_task_cannot_timeout_on_closed_workflow_execution():
with freeze_time("2015-01-01 12:00:00"):
wfe = make_workflow_execution()
wfe.start()
with freeze_time("2015-01-01 13:55:00"):
dt = DecisionTask(wfe, 123)
dt.start(1234)
with freeze_time("2015-01-01 14:10:00"):
dt.first_timeout().should.be.a(Timeout)
wfe.first_timeout().should.be.a(Timeout)
process_first_timeout(wfe)
dt.first_timeout().should.be.none
def test_decision_task_cannot_change_state_on_closed_workflow_execution():
wfe = make_workflow_execution()
wfe.start()
task = DecisionTask(wfe, 123)
wfe.complete(123)
task.timeout.when.called_with(Timeout(task, 0, "foo")).should.throw(SWFWorkflowExecutionClosedError)
task.complete.when.called_with().should.throw(SWFWorkflowExecutionClosedError)

View File

@ -0,0 +1,102 @@
from collections import namedtuple
from sure import expect
from moto.swf.exceptions import SWFUnknownResourceFault
from moto.swf.models import Domain
# Fake WorkflowExecution for tests purposes
WorkflowExecution = namedtuple(
"WorkflowExecution",
["workflow_id", "run_id", "execution_status", "open"]
)
def test_domain_short_dict_representation():
domain = Domain("foo", "52")
domain.to_short_dict().should.equal({"name":"foo", "status":"REGISTERED"})
domain.description = "foo bar"
domain.to_short_dict()["description"].should.equal("foo bar")
def test_domain_full_dict_representation():
domain = Domain("foo", "52")
domain.to_full_dict()["domainInfo"].should.equal(domain.to_short_dict())
_config = domain.to_full_dict()["configuration"]
_config["workflowExecutionRetentionPeriodInDays"].should.equal("52")
def test_domain_string_representation():
domain = Domain("my-domain", "60")
str(domain).should.equal("Domain(name: my-domain, status: REGISTERED)")
def test_domain_add_to_activity_task_list():
domain = Domain("my-domain", "60")
domain.add_to_activity_task_list("foo", "bar")
domain.activity_task_lists.should.equal({
"foo": ["bar"]
})
def test_domain_activity_tasks():
domain = Domain("my-domain", "60")
domain.add_to_activity_task_list("foo", "bar")
domain.add_to_activity_task_list("other", "baz")
sorted(domain.activity_tasks).should.equal(["bar", "baz"])
def test_domain_add_to_decision_task_list():
domain = Domain("my-domain", "60")
domain.add_to_decision_task_list("foo", "bar")
domain.decision_task_lists.should.equal({
"foo": ["bar"]
})
def test_domain_decision_tasks():
domain = Domain("my-domain", "60")
domain.add_to_decision_task_list("foo", "bar")
domain.add_to_decision_task_list("other", "baz")
sorted(domain.decision_tasks).should.equal(["bar", "baz"])
def test_domain_get_workflow_execution():
domain = Domain("my-domain", "60")
wfe1 = WorkflowExecution(workflow_id="wf-id-1", run_id="run-id-1", execution_status="OPEN", open=True)
wfe2 = WorkflowExecution(workflow_id="wf-id-1", run_id="run-id-2", execution_status="CLOSED", open=False)
wfe3 = WorkflowExecution(workflow_id="wf-id-2", run_id="run-id-3", execution_status="OPEN", open=True)
wfe4 = WorkflowExecution(workflow_id="wf-id-3", run_id="run-id-4", execution_status="CLOSED", open=False)
domain.workflow_executions = [wfe1, wfe2, wfe3, wfe4]
# get workflow execution through workflow_id and run_id
domain.get_workflow_execution("wf-id-1", run_id="run-id-1").should.equal(wfe1)
domain.get_workflow_execution("wf-id-1", run_id="run-id-2").should.equal(wfe2)
domain.get_workflow_execution("wf-id-3", run_id="run-id-4").should.equal(wfe4)
domain.get_workflow_execution.when.called_with(
"wf-id-1", run_id="non-existent"
).should.throw(
SWFUnknownResourceFault,
"Unknown execution: WorkflowExecution=[workflowId=wf-id-1, runId=non-existent]"
)
# get OPEN workflow execution by default if no run_id
domain.get_workflow_execution("wf-id-1").should.equal(wfe1)
domain.get_workflow_execution.when.called_with(
"wf-id-3"
).should.throw(
SWFUnknownResourceFault, "Unknown execution, workflowId = wf-id-3"
)
domain.get_workflow_execution.when.called_with(
"wf-id-non-existent"
).should.throw(
SWFUnknownResourceFault, "Unknown execution, workflowId = wf-id-non-existent"
)
# raise_if_closed attribute
domain.get_workflow_execution("wf-id-1", run_id="run-id-1", raise_if_closed=True).should.equal(wfe1)
domain.get_workflow_execution.when.called_with(
"wf-id-3", run_id="run-id-4", raise_if_closed=True
).should.throw(
SWFUnknownResourceFault,
"Unknown execution: WorkflowExecution=[workflowId=wf-id-3, runId=run-id-4]"
)
# raise_if_none attribute
domain.get_workflow_execution("foo", raise_if_none=False).should.be.none

View File

@ -0,0 +1,52 @@
from sure import expect
from moto.swf.models import GenericType
# Tests for GenericType (ActivityType, WorkflowType)
class FooType(GenericType):
@property
def kind(self):
return "foo"
@property
def _configuration_keys(self):
return ["justAnExampleTimeout"]
def test_type_short_dict_representation():
_type = FooType("test-foo", "v1.0")
_type.to_short_dict().should.equal({"name": "test-foo", "version": "v1.0"})
def test_type_medium_dict_representation():
_type = FooType("test-foo", "v1.0")
_type.to_medium_dict()["fooType"].should.equal(_type.to_short_dict())
_type.to_medium_dict()["status"].should.equal("REGISTERED")
_type.to_medium_dict().should.contain("creationDate")
_type.to_medium_dict().should_not.contain("deprecationDate")
_type.to_medium_dict().should_not.contain("description")
_type.description = "foo bar"
_type.to_medium_dict()["description"].should.equal("foo bar")
_type.status = "DEPRECATED"
_type.to_medium_dict().should.contain("deprecationDate")
def test_type_full_dict_representation():
_type = FooType("test-foo", "v1.0")
_type.to_full_dict()["typeInfo"].should.equal(_type.to_medium_dict())
_type.to_full_dict()["configuration"].should.equal({})
_type.task_list = "foo"
_type.to_full_dict()["configuration"]["defaultTaskList"].should.equal({"name":"foo"})
_type.just_an_example_timeout = "60"
_type.to_full_dict()["configuration"]["justAnExampleTimeout"].should.equal("60")
_type.non_whitelisted_property = "34"
keys = _type.to_full_dict()["configuration"].keys()
sorted(keys).should.equal(["defaultTaskList", "justAnExampleTimeout"])
def test_type_string_representation():
_type = FooType("test-foo", "v1.0")
str(_type).should.equal("FooType(name: test-foo, version: v1.0, status: REGISTERED)")

View File

@ -0,0 +1,29 @@
from sure import expect
from freezegun import freeze_time
from moto.swf.models import HistoryEvent
@freeze_time("2015-01-01 12:00:00")
def test_history_event_creation():
he = HistoryEvent(123, "DecisionTaskStarted", scheduled_event_id=2)
he.event_id.should.equal(123)
he.event_type.should.equal("DecisionTaskStarted")
he.event_timestamp.should.equal(1420113600.0)
@freeze_time("2015-01-01 12:00:00")
def test_history_event_to_dict_representation():
he = HistoryEvent(123, "DecisionTaskStarted", scheduled_event_id=2)
he.to_dict().should.equal({
"eventId": 123,
"eventType": "DecisionTaskStarted",
"eventTimestamp": 1420113600.0,
"decisionTaskStartedEventAttributes": {
"scheduledEventId": 2
}
})
def test_history_event_breaks_on_initialization_if_not_implemented():
HistoryEvent.when.called_with(
123, "UnknownHistoryEvent"
).should.throw(NotImplementedError)

View File

@ -0,0 +1,18 @@
from freezegun import freeze_time
from sure import expect
from moto.swf.models import Timeout, WorkflowExecution
from ..utils import make_workflow_execution
def test_timeout_creation():
wfe = make_workflow_execution()
# epoch 1420113600 == "2015-01-01 13:00:00"
timeout = Timeout(wfe, 1420117200, "START_TO_CLOSE")
with freeze_time("2015-01-01 12:00:00"):
timeout.reached.should.be.falsy
with freeze_time("2015-01-01 13:00:00"):
timeout.reached.should.be.truthy

View File

@ -0,0 +1,443 @@
from sure import expect
from freezegun import freeze_time
from moto.swf.models import (
ActivityType,
Timeout,
WorkflowType,
WorkflowExecution,
)
from moto.swf.exceptions import (
SWFDefaultUndefinedFault,
)
from ..utils import (
auto_start_decision_tasks,
get_basic_domain,
get_basic_workflow_type,
make_workflow_execution,
)
VALID_ACTIVITY_TASK_ATTRIBUTES = {
"activityId": "my-activity-001",
"activityType": { "name": "test-activity", "version": "v1.1" },
"taskList": { "name": "task-list-name" },
"scheduleToStartTimeout": "600",
"scheduleToCloseTimeout": "600",
"startToCloseTimeout": "600",
"heartbeatTimeout": "300",
}
def test_workflow_execution_creation():
domain = get_basic_domain()
wft = get_basic_workflow_type()
wfe = WorkflowExecution(domain, wft, "ab1234", child_policy="TERMINATE")
wfe.domain.should.equal(domain)
wfe.workflow_type.should.equal(wft)
wfe.child_policy.should.equal("TERMINATE")
def test_workflow_execution_creation_child_policy_logic():
domain = get_basic_domain()
WorkflowExecution(
domain,
WorkflowType(
"test-workflow", "v1.0",
task_list="queue", default_child_policy="ABANDON",
default_execution_start_to_close_timeout="300",
default_task_start_to_close_timeout="300",
),
"ab1234"
).child_policy.should.equal("ABANDON")
WorkflowExecution(
domain,
WorkflowType(
"test-workflow", "v1.0", task_list="queue",
default_execution_start_to_close_timeout="300",
default_task_start_to_close_timeout="300",
),
"ab1234",
child_policy="REQUEST_CANCEL"
).child_policy.should.equal("REQUEST_CANCEL")
WorkflowExecution.when.called_with(
domain,
WorkflowType("test-workflow", "v1.0"), "ab1234"
).should.throw(SWFDefaultUndefinedFault)
def test_workflow_execution_string_representation():
wfe = make_workflow_execution(child_policy="TERMINATE")
str(wfe).should.match(r"^WorkflowExecution\(run_id: .*\)")
def test_workflow_execution_generates_a_random_run_id():
domain = get_basic_domain()
wft = get_basic_workflow_type()
wfe1 = WorkflowExecution(domain, wft, "ab1234", child_policy="TERMINATE")
wfe2 = WorkflowExecution(domain, wft, "ab1235", child_policy="TERMINATE")
wfe1.run_id.should_not.equal(wfe2.run_id)
def test_workflow_execution_short_dict_representation():
domain = get_basic_domain()
wf_type = WorkflowType(
"test-workflow", "v1.0",
task_list="queue", default_child_policy="ABANDON",
default_execution_start_to_close_timeout="300",
default_task_start_to_close_timeout="300",
)
wfe = WorkflowExecution(domain, wf_type, "ab1234")
sd = wfe.to_short_dict()
sd["workflowId"].should.equal("ab1234")
sd.should.contain("runId")
def test_workflow_execution_medium_dict_representation():
domain = get_basic_domain()
wf_type = WorkflowType(
"test-workflow", "v1.0",
task_list="queue", default_child_policy="ABANDON",
default_execution_start_to_close_timeout="300",
default_task_start_to_close_timeout="300",
)
wfe = WorkflowExecution(domain, wf_type, "ab1234")
md = wfe.to_medium_dict()
md["execution"].should.equal(wfe.to_short_dict())
md["workflowType"].should.equal(wf_type.to_short_dict())
md["startTimestamp"].should.be.a('float')
md["executionStatus"].should.equal("OPEN")
md["cancelRequested"].should.be.falsy
md.should_not.contain("tagList")
wfe.tag_list = ["foo", "bar", "baz"]
md = wfe.to_medium_dict()
md["tagList"].should.equal(["foo", "bar", "baz"])
def test_workflow_execution_full_dict_representation():
domain = get_basic_domain()
wf_type = WorkflowType(
"test-workflow", "v1.0",
task_list="queue", default_child_policy="ABANDON",
default_execution_start_to_close_timeout="300",
default_task_start_to_close_timeout="300",
)
wfe = WorkflowExecution(domain, wf_type, "ab1234")
fd = wfe.to_full_dict()
fd["executionInfo"].should.equal(wfe.to_medium_dict())
fd["openCounts"]["openTimers"].should.equal(0)
fd["openCounts"]["openDecisionTasks"].should.equal(0)
fd["openCounts"]["openActivityTasks"].should.equal(0)
fd["executionConfiguration"].should.equal({
"childPolicy": "ABANDON",
"executionStartToCloseTimeout": "300",
"taskList": {"name": "queue"},
"taskStartToCloseTimeout": "300",
})
def test_workflow_execution_schedule_decision_task():
wfe = make_workflow_execution()
wfe.open_counts["openDecisionTasks"].should.equal(0)
wfe.schedule_decision_task()
wfe.open_counts["openDecisionTasks"].should.equal(1)
def test_workflow_execution_start_decision_task():
wfe = make_workflow_execution()
wfe.schedule_decision_task()
dt = wfe.decision_tasks[0]
wfe.start_decision_task(dt.task_token, identity="srv01")
dt = wfe.decision_tasks[0]
dt.state.should.equal("STARTED")
wfe.events()[-1].event_type.should.equal("DecisionTaskStarted")
wfe.events()[-1].event_attributes["identity"].should.equal("srv01")
def test_workflow_execution_history_events_ids():
wfe = make_workflow_execution()
wfe._add_event("WorkflowExecutionStarted")
wfe._add_event("DecisionTaskScheduled")
wfe._add_event("DecisionTaskStarted")
ids = [evt.event_id for evt in wfe.events()]
ids.should.equal([1, 2, 3])
@freeze_time("2015-01-01 12:00:00")
def test_workflow_execution_start():
wfe = make_workflow_execution()
wfe.events().should.equal([])
wfe.start()
wfe.start_timestamp.should.equal(1420113600.0)
wfe.events().should.have.length_of(2)
wfe.events()[0].event_type.should.equal("WorkflowExecutionStarted")
wfe.events()[1].event_type.should.equal("DecisionTaskScheduled")
@freeze_time("2015-01-02 12:00:00")
def test_workflow_execution_complete():
wfe = make_workflow_execution()
wfe.complete(123, result="foo")
wfe.execution_status.should.equal("CLOSED")
wfe.close_status.should.equal("COMPLETED")
wfe.close_timestamp.should.equal(1420200000.0)
wfe.events()[-1].event_type.should.equal("WorkflowExecutionCompleted")
wfe.events()[-1].event_attributes["decisionTaskCompletedEventId"].should.equal(123)
wfe.events()[-1].event_attributes["result"].should.equal("foo")
@freeze_time("2015-01-02 12:00:00")
def test_workflow_execution_fail():
wfe = make_workflow_execution()
wfe.fail(123, details="some details", reason="my rules")
wfe.execution_status.should.equal("CLOSED")
wfe.close_status.should.equal("FAILED")
wfe.close_timestamp.should.equal(1420200000.0)
wfe.events()[-1].event_type.should.equal("WorkflowExecutionFailed")
wfe.events()[-1].event_attributes["decisionTaskCompletedEventId"].should.equal(123)
wfe.events()[-1].event_attributes["details"].should.equal("some details")
wfe.events()[-1].event_attributes["reason"].should.equal("my rules")
@freeze_time("2015-01-01 12:00:00")
def test_workflow_execution_schedule_activity_task():
wfe = make_workflow_execution()
wfe.latest_activity_task_timestamp.should.be.none
wfe.schedule_activity_task(123, VALID_ACTIVITY_TASK_ATTRIBUTES)
wfe.latest_activity_task_timestamp.should.equal(1420113600.0)
wfe.open_counts["openActivityTasks"].should.equal(1)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ActivityTaskScheduled")
last_event.event_attributes["decisionTaskCompletedEventId"].should.equal(123)
last_event.event_attributes["taskList"]["name"].should.equal("task-list-name")
wfe.activity_tasks.should.have.length_of(1)
task = wfe.activity_tasks[0]
task.activity_id.should.equal("my-activity-001")
task.activity_type.name.should.equal("test-activity")
wfe.domain.activity_task_lists["task-list-name"].should.contain(task)
def test_workflow_execution_schedule_activity_task_without_task_list_should_take_default():
wfe = make_workflow_execution()
wfe.domain.add_type(
ActivityType("test-activity", "v1.2", task_list="foobar")
)
wfe.schedule_activity_task(123, {
"activityId": "my-activity-001",
"activityType": { "name": "test-activity", "version": "v1.2" },
"scheduleToStartTimeout": "600",
"scheduleToCloseTimeout": "600",
"startToCloseTimeout": "600",
"heartbeatTimeout": "300",
})
wfe.open_counts["openActivityTasks"].should.equal(1)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ActivityTaskScheduled")
last_event.event_attributes["taskList"]["name"].should.equal("foobar")
task = wfe.activity_tasks[0]
wfe.domain.activity_task_lists["foobar"].should.contain(task)
def test_workflow_execution_schedule_activity_task_should_fail_if_wrong_attributes():
wfe = make_workflow_execution()
at = ActivityType("test-activity", "v1.1")
at.status = "DEPRECATED"
wfe.domain.add_type(at)
wfe.domain.add_type(ActivityType("test-activity", "v1.2"))
hsh = {
"activityId": "my-activity-001",
"activityType": { "name": "test-activity-does-not-exists", "version": "v1.1" },
}
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.event_attributes["cause"].should.equal("ACTIVITY_TYPE_DOES_NOT_EXIST")
hsh["activityType"]["name"] = "test-activity"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.event_attributes["cause"].should.equal("ACTIVITY_TYPE_DEPRECATED")
hsh["activityType"]["version"] = "v1.2"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.event_attributes["cause"].should.equal("DEFAULT_TASK_LIST_UNDEFINED")
hsh["taskList"] = { "name": "foobar" }
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.event_attributes["cause"].should.equal("DEFAULT_SCHEDULE_TO_START_TIMEOUT_UNDEFINED")
hsh["scheduleToStartTimeout"] = "600"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.event_attributes["cause"].should.equal("DEFAULT_SCHEDULE_TO_CLOSE_TIMEOUT_UNDEFINED")
hsh["scheduleToCloseTimeout"] = "600"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.event_attributes["cause"].should.equal("DEFAULT_START_TO_CLOSE_TIMEOUT_UNDEFINED")
hsh["startToCloseTimeout"] = "600"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.event_attributes["cause"].should.equal("DEFAULT_HEARTBEAT_TIMEOUT_UNDEFINED")
wfe.open_counts["openActivityTasks"].should.equal(0)
wfe.activity_tasks.should.have.length_of(0)
wfe.domain.activity_task_lists.should.have.length_of(0)
hsh["heartbeatTimeout"] = "300"
wfe.schedule_activity_task(123, hsh)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ActivityTaskScheduled")
task = wfe.activity_tasks[0]
wfe.domain.activity_task_lists["foobar"].should.contain(task)
wfe.open_counts["openDecisionTasks"].should.equal(0)
wfe.open_counts["openActivityTasks"].should.equal(1)
def test_workflow_execution_schedule_activity_task_failure_triggers_new_decision():
wfe = make_workflow_execution()
wfe.start()
task_token = wfe.decision_tasks[-1].task_token
wfe.start_decision_task(task_token)
wfe.complete_decision_task(task_token,
execution_context="free-form execution context",
decisions=[
{
"decisionType": "ScheduleActivityTask",
"scheduleActivityTaskDecisionAttributes": {
"activityId": "my-activity-001",
"activityType": { "name": "test-activity-does-not-exist", "version": "v1.2" },
}
},
{
"decisionType": "ScheduleActivityTask",
"scheduleActivityTaskDecisionAttributes": {
"activityId": "my-activity-001",
"activityType": { "name": "test-activity-does-not-exist", "version": "v1.2" },
}
},
])
wfe.latest_execution_context.should.equal("free-form execution context")
wfe.open_counts["openActivityTasks"].should.equal(0)
wfe.open_counts["openDecisionTasks"].should.equal(1)
last_events = wfe.events()[-3:]
last_events[0].event_type.should.equal("ScheduleActivityTaskFailed")
last_events[1].event_type.should.equal("ScheduleActivityTaskFailed")
last_events[2].event_type.should.equal("DecisionTaskScheduled")
def test_workflow_execution_schedule_activity_task_with_same_activity_id():
wfe = make_workflow_execution()
wfe.schedule_activity_task(123, VALID_ACTIVITY_TASK_ATTRIBUTES)
wfe.open_counts["openActivityTasks"].should.equal(1)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ActivityTaskScheduled")
wfe.schedule_activity_task(123, VALID_ACTIVITY_TASK_ATTRIBUTES)
wfe.open_counts["openActivityTasks"].should.equal(1)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("ScheduleActivityTaskFailed")
last_event.event_attributes["cause"].should.equal("ACTIVITY_ID_ALREADY_IN_USE")
def test_workflow_execution_start_activity_task():
wfe = make_workflow_execution()
wfe.schedule_activity_task(123, VALID_ACTIVITY_TASK_ATTRIBUTES)
task_token = wfe.activity_tasks[-1].task_token
wfe.start_activity_task(task_token, identity="worker01")
task = wfe.activity_tasks[-1]
task.state.should.equal("STARTED")
wfe.events()[-1].event_type.should.equal("ActivityTaskStarted")
wfe.events()[-1].event_attributes["identity"].should.equal("worker01")
def test_complete_activity_task():
wfe = make_workflow_execution()
wfe.schedule_activity_task(123, VALID_ACTIVITY_TASK_ATTRIBUTES)
task_token = wfe.activity_tasks[-1].task_token
wfe.open_counts["openActivityTasks"].should.equal(1)
wfe.open_counts["openDecisionTasks"].should.equal(0)
wfe.start_activity_task(task_token, identity="worker01")
wfe.complete_activity_task(task_token, result="a superb result")
task = wfe.activity_tasks[-1]
task.state.should.equal("COMPLETED")
wfe.events()[-2].event_type.should.equal("ActivityTaskCompleted")
wfe.events()[-1].event_type.should.equal("DecisionTaskScheduled")
wfe.open_counts["openActivityTasks"].should.equal(0)
wfe.open_counts["openDecisionTasks"].should.equal(1)
def test_terminate():
wfe = make_workflow_execution()
wfe.schedule_decision_task()
wfe.terminate()
wfe.execution_status.should.equal("CLOSED")
wfe.close_status.should.equal("TERMINATED")
wfe.close_cause.should.equal("OPERATOR_INITIATED")
wfe.open_counts["openDecisionTasks"].should.equal(1)
last_event = wfe.events()[-1]
last_event.event_type.should.equal("WorkflowExecutionTerminated")
# take default child_policy if not provided (as here)
last_event.event_attributes["childPolicy"].should.equal("ABANDON")
def test_first_timeout():
wfe = make_workflow_execution()
wfe.first_timeout().should.be.none
with freeze_time("2015-01-01 12:00:00"):
wfe.start()
wfe.first_timeout().should.be.none
with freeze_time("2015-01-01 14:01"):
# 2 hours timeout reached
wfe.first_timeout().should.be.a(Timeout)
# See moto/swf/models/workflow_execution.py "_process_timeouts()" for more details
def test_timeouts_are_processed_in_order_and_reevaluated():
# Let's make a Workflow Execution with the following properties:
# - execution start to close timeout of 8 mins
# - (decision) task start to close timeout of 5 mins
#
# Now start the workflow execution, and look at the history 15 mins later:
# - a first decision task is fired just after workflow execution start
# - the first decision task should have timed out after 5 mins
# - that fires a new decision task (which we hack to start automatically)
# - then the workflow timeouts after 8 mins (shows gradual reevaluation)
# - but the last scheduled decision task should *not* timeout (workflow closed)
with freeze_time("2015-01-01 12:00:00"):
wfe = make_workflow_execution(
execution_start_to_close_timeout=8*60,
task_start_to_close_timeout=5*60,
)
# decision will automatically start
wfe = auto_start_decision_tasks(wfe)
wfe.start()
event_idx = len(wfe.events())
with freeze_time("2015-01-01 12:08:00"):
wfe._process_timeouts()
event_types = [e.event_type for e in wfe.events()[event_idx:]]
event_types.should.equal([
"DecisionTaskTimedOut",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"WorkflowExecutionTimedOut",
])

View File

View File

@ -0,0 +1,210 @@
import boto
from freezegun import freeze_time
from sure import expect
from moto import mock_swf
from moto.swf import swf_backend
from moto.swf.exceptions import (
SWFValidationException,
SWFUnknownResourceFault,
)
from ..utils import setup_workflow, SCHEDULE_ACTIVITY_TASK_DECISION
# PollForActivityTask endpoint
@mock_swf
def test_poll_for_activity_task_when_one():
conn = setup_workflow()
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
conn.respond_decision_task_completed(decision_token, decisions=[
SCHEDULE_ACTIVITY_TASK_DECISION
])
resp = conn.poll_for_activity_task("test-domain", "activity-task-list", identity="surprise")
resp["activityId"].should.equal("my-activity-001")
resp["taskToken"].should_not.be.none
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
resp["events"][-1]["eventType"].should.equal("ActivityTaskStarted")
resp["events"][-1]["activityTaskStartedEventAttributes"].should.equal(
{ "identity": "surprise", "scheduledEventId": 5 }
)
@mock_swf
def test_poll_for_activity_task_when_none():
conn = setup_workflow()
resp = conn.poll_for_activity_task("test-domain", "activity-task-list")
resp.should.equal({"startedEventId": 0})
@mock_swf
def test_poll_for_activity_task_on_non_existent_queue():
conn = setup_workflow()
resp = conn.poll_for_activity_task("test-domain", "non-existent-queue")
resp.should.equal({"startedEventId": 0})
# CountPendingActivityTasks endpoint
@mock_swf
def test_count_pending_activity_tasks():
conn = setup_workflow()
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
conn.respond_decision_task_completed(decision_token, decisions=[
SCHEDULE_ACTIVITY_TASK_DECISION
])
resp = conn.count_pending_activity_tasks("test-domain", "activity-task-list")
resp.should.equal({"count": 1, "truncated": False})
@mock_swf
def test_count_pending_decision_tasks_on_non_existent_task_list():
conn = setup_workflow()
resp = conn.count_pending_activity_tasks("test-domain", "non-existent")
resp.should.equal({"count": 0, "truncated": False})
# RespondActivityTaskCompleted endpoint
@mock_swf
def test_respond_activity_task_completed():
conn = setup_workflow()
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
conn.respond_decision_task_completed(decision_token, decisions=[
SCHEDULE_ACTIVITY_TASK_DECISION
])
activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"]
resp = conn.respond_activity_task_completed(activity_token, result="result of the task")
resp.should.be.none
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
resp["events"][-2]["eventType"].should.equal("ActivityTaskCompleted")
resp["events"][-2]["activityTaskCompletedEventAttributes"].should.equal(
{ "result": "result of the task", "scheduledEventId": 5, "startedEventId": 6 }
)
@mock_swf
def test_respond_activity_task_completed_with_wrong_token():
conn = setup_workflow()
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
conn.respond_decision_task_completed(decision_token, decisions=[
SCHEDULE_ACTIVITY_TASK_DECISION
])
conn.poll_for_activity_task("test-domain", "activity-task-list")
conn.respond_activity_task_completed.when.called_with(
"not-a-correct-token"
).should.throw(SWFValidationException, "Invalid token")
@mock_swf
def test_respond_activity_task_completed_on_closed_workflow_execution():
conn = setup_workflow()
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
conn.respond_decision_task_completed(decision_token, decisions=[
SCHEDULE_ACTIVITY_TASK_DECISION
])
activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"]
# bad: we're closing workflow execution manually, but endpoints are not coded for now..
wfe = swf_backend.domains[0].workflow_executions[-1]
wfe.execution_status = "CLOSED"
# /bad
conn.respond_activity_task_completed.when.called_with(
activity_token
).should.throw(SWFUnknownResourceFault, "WorkflowExecution=")
@mock_swf
def test_respond_activity_task_completed_with_task_already_completed():
conn = setup_workflow()
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
conn.respond_decision_task_completed(decision_token, decisions=[
SCHEDULE_ACTIVITY_TASK_DECISION
])
activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"]
conn.respond_activity_task_completed(activity_token)
conn.respond_activity_task_completed.when.called_with(
activity_token
).should.throw(SWFUnknownResourceFault, "Unknown activity, scheduledEventId = 5")
# RespondActivityTaskFailed endpoint
@mock_swf
def test_respond_activity_task_failed():
conn = setup_workflow()
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
conn.respond_decision_task_completed(decision_token, decisions=[
SCHEDULE_ACTIVITY_TASK_DECISION
])
activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"]
resp = conn.respond_activity_task_failed(activity_token,
reason="short reason",
details="long details")
resp.should.be.none
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
resp["events"][-2]["eventType"].should.equal("ActivityTaskFailed")
resp["events"][-2]["activityTaskFailedEventAttributes"].should.equal(
{ "reason": "short reason", "details": "long details",
"scheduledEventId": 5, "startedEventId": 6 }
)
@mock_swf
def test_respond_activity_task_completed_with_wrong_token():
# NB: we just test ONE failure case for RespondActivityTaskFailed
# because the safeguards are shared with RespondActivityTaskCompleted, so
# no need to retest everything end-to-end.
conn = setup_workflow()
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
conn.respond_decision_task_completed(decision_token, decisions=[
SCHEDULE_ACTIVITY_TASK_DECISION
])
conn.poll_for_activity_task("test-domain", "activity-task-list")
conn.respond_activity_task_failed.when.called_with(
"not-a-correct-token"
).should.throw(SWFValidationException, "Invalid token")
# RecordActivityTaskHeartbeat endpoint
@mock_swf
def test_record_activity_task_heartbeat():
conn = setup_workflow()
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
conn.respond_decision_task_completed(decision_token, decisions=[
SCHEDULE_ACTIVITY_TASK_DECISION
])
activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"]
resp = conn.record_activity_task_heartbeat(activity_token)
resp.should.equal({"cancelRequested": False})
@mock_swf
def test_record_activity_task_heartbeat_with_wrong_token():
conn = setup_workflow()
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
conn.respond_decision_task_completed(decision_token, decisions=[
SCHEDULE_ACTIVITY_TASK_DECISION
])
conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"]
conn.record_activity_task_heartbeat.when.called_with(
"bad-token", details="some progress details"
).should.throw(SWFValidationException)
@mock_swf
def test_record_activity_task_heartbeat_sets_details_in_case_of_timeout():
conn = setup_workflow()
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
conn.respond_decision_task_completed(decision_token, decisions=[
SCHEDULE_ACTIVITY_TASK_DECISION
])
with freeze_time("2015-01-01 12:00:00"):
activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"]
conn.record_activity_task_heartbeat(activity_token, details="some progress details")
with freeze_time("2015-01-01 12:05:30"):
# => Activity Task Heartbeat timeout reached!!
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
resp["events"][-2]["eventType"].should.equal("ActivityTaskTimedOut")
attrs = resp["events"][-2]["activityTaskTimedOutEventAttributes"]
attrs["details"].should.equal("some progress details")

View File

@ -0,0 +1,126 @@
import boto
from sure import expect
from moto import mock_swf
from moto.swf.exceptions import (
SWFUnknownResourceFault,
SWFTypeAlreadyExistsFault,
SWFTypeDeprecatedFault,
SWFSerializationException,
)
# RegisterActivityType endpoint
@mock_swf
def test_register_activity_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_activity_type("test-domain", "test-activity", "v1.0")
types = conn.list_activity_types("test-domain", "REGISTERED")
actype = types["typeInfos"][0]
actype["activityType"]["name"].should.equal("test-activity")
actype["activityType"]["version"].should.equal("v1.0")
@mock_swf
def test_register_already_existing_activity_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_activity_type("test-domain", "test-activity", "v1.0")
conn.register_activity_type.when.called_with(
"test-domain", "test-activity", "v1.0"
).should.throw(SWFTypeAlreadyExistsFault)
@mock_swf
def test_register_with_wrong_parameter_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_activity_type.when.called_with(
"test-domain", "test-activity", 12
).should.throw(SWFSerializationException)
# ListActivityTypes endpoint
@mock_swf
def test_list_activity_types():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_activity_type("test-domain", "b-test-activity", "v1.0")
conn.register_activity_type("test-domain", "a-test-activity", "v1.0")
conn.register_activity_type("test-domain", "c-test-activity", "v1.0")
all_activity_types = conn.list_activity_types("test-domain", "REGISTERED")
names = [activity_type["activityType"]["name"] for activity_type in all_activity_types["typeInfos"]]
names.should.equal(["a-test-activity", "b-test-activity", "c-test-activity"])
@mock_swf
def test_list_activity_types_reverse_order():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_activity_type("test-domain", "b-test-activity", "v1.0")
conn.register_activity_type("test-domain", "a-test-activity", "v1.0")
conn.register_activity_type("test-domain", "c-test-activity", "v1.0")
all_activity_types = conn.list_activity_types("test-domain", "REGISTERED",
reverse_order=True)
names = [activity_type["activityType"]["name"] for activity_type in all_activity_types["typeInfos"]]
names.should.equal(["c-test-activity", "b-test-activity", "a-test-activity"])
# DeprecateActivityType endpoint
@mock_swf
def test_deprecate_activity_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_activity_type("test-domain", "test-activity", "v1.0")
conn.deprecate_activity_type("test-domain", "test-activity", "v1.0")
actypes = conn.list_activity_types("test-domain", "DEPRECATED")
actype = actypes["typeInfos"][0]
actype["activityType"]["name"].should.equal("test-activity")
actype["activityType"]["version"].should.equal("v1.0")
@mock_swf
def test_deprecate_already_deprecated_activity_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_activity_type("test-domain", "test-activity", "v1.0")
conn.deprecate_activity_type("test-domain", "test-activity", "v1.0")
conn.deprecate_activity_type.when.called_with(
"test-domain", "test-activity", "v1.0"
).should.throw(SWFTypeDeprecatedFault)
@mock_swf
def test_deprecate_non_existent_activity_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.deprecate_activity_type.when.called_with(
"test-domain", "non-existent", "v1.0"
).should.throw(SWFUnknownResourceFault)
# DescribeActivityType endpoint
@mock_swf
def test_describe_activity_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_activity_type("test-domain", "test-activity", "v1.0",
task_list="foo", default_task_heartbeat_timeout="32")
actype = conn.describe_activity_type("test-domain", "test-activity", "v1.0")
actype["configuration"]["defaultTaskList"]["name"].should.equal("foo")
infos = actype["typeInfo"]
infos["activityType"]["name"].should.equal("test-activity")
infos["activityType"]["version"].should.equal("v1.0")
infos["status"].should.equal("REGISTERED")
@mock_swf
def test_describe_non_existent_activity_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.describe_activity_type.when.called_with(
"test-domain", "non-existent", "v1.0"
).should.throw(SWFUnknownResourceFault)

View File

@ -0,0 +1,316 @@
import boto
from freezegun import freeze_time
from sure import expect
from moto import mock_swf
from moto.swf import swf_backend
from moto.swf.exceptions import (
SWFUnknownResourceFault,
SWFValidationException,
SWFDecisionValidationException,
)
from ..utils import setup_workflow
# PollForDecisionTask endpoint
@mock_swf
def test_poll_for_decision_task_when_one():
conn = setup_workflow()
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal(["WorkflowExecutionStarted", "DecisionTaskScheduled"])
resp = conn.poll_for_decision_task("test-domain", "queue", identity="srv01")
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal(["WorkflowExecutionStarted", "DecisionTaskScheduled", "DecisionTaskStarted"])
resp["events"][-1]["decisionTaskStartedEventAttributes"]["identity"].should.equal("srv01")
@mock_swf
def test_poll_for_decision_task_when_none():
conn = setup_workflow()
conn.poll_for_decision_task("test-domain", "queue")
resp = conn.poll_for_decision_task("test-domain", "queue")
# this is the DecisionTask representation you get from the real SWF
# after waiting 60s when there's no decision to be taken
resp.should.equal({"previousStartedEventId": 0, "startedEventId": 0})
@mock_swf
def test_poll_for_decision_task_on_non_existent_queue():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "non-existent-queue")
resp.should.equal({"previousStartedEventId": 0, "startedEventId": 0})
@mock_swf
def test_poll_for_decision_task_with_reverse_order():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue", reverse_order=True)
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal(["DecisionTaskStarted", "DecisionTaskScheduled", "WorkflowExecutionStarted"])
# CountPendingDecisionTasks endpoint
@mock_swf
def test_count_pending_decision_tasks():
conn = setup_workflow()
conn.poll_for_decision_task("test-domain", "queue")
resp = conn.count_pending_decision_tasks("test-domain", "queue")
resp.should.equal({"count": 1, "truncated": False})
@mock_swf
def test_count_pending_decision_tasks_on_non_existent_task_list():
conn = setup_workflow()
resp = conn.count_pending_decision_tasks("test-domain", "non-existent")
resp.should.equal({"count": 0, "truncated": False})
@mock_swf
def test_count_pending_decision_tasks_after_decision_completes():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
conn.respond_decision_task_completed(resp["taskToken"])
resp = conn.count_pending_decision_tasks("test-domain", "queue")
resp.should.equal({"count": 0, "truncated": False})
# RespondDecisionTaskCompleted endpoint
@mock_swf
def test_respond_decision_task_completed_with_no_decision():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
task_token = resp["taskToken"]
resp = conn.respond_decision_task_completed(
task_token,
execution_context="free-form context",
)
resp.should.be.none
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal([
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"DecisionTaskCompleted",
])
evt = resp["events"][-1]
evt["decisionTaskCompletedEventAttributes"].should.equal({
"executionContext": "free-form context",
"scheduledEventId": 2,
"startedEventId": 3,
})
resp = conn.describe_workflow_execution("test-domain", conn.run_id, "uid-abcd1234")
resp["latestExecutionContext"].should.equal("free-form context")
@mock_swf
def test_respond_decision_task_completed_with_wrong_token():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
conn.respond_decision_task_completed.when.called_with(
"not-a-correct-token"
).should.throw(SWFValidationException)
@mock_swf
def test_respond_decision_task_completed_on_close_workflow_execution():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
task_token = resp["taskToken"]
# bad: we're closing workflow execution manually, but endpoints are not coded for now..
wfe = swf_backend.domains[0].workflow_executions[-1]
wfe.execution_status = "CLOSED"
# /bad
conn.respond_decision_task_completed.when.called_with(
task_token
).should.throw(SWFUnknownResourceFault)
@mock_swf
def test_respond_decision_task_completed_with_task_already_completed():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
task_token = resp["taskToken"]
conn.respond_decision_task_completed(task_token)
conn.respond_decision_task_completed.when.called_with(
task_token
).should.throw(SWFUnknownResourceFault)
@mock_swf
def test_respond_decision_task_completed_with_complete_workflow_execution():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
task_token = resp["taskToken"]
decisions = [{
"decisionType": "CompleteWorkflowExecution",
"completeWorkflowExecutionDecisionAttributes": {"result": "foo bar"}
}]
resp = conn.respond_decision_task_completed(task_token, decisions=decisions)
resp.should.be.none
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal([
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"DecisionTaskCompleted",
"WorkflowExecutionCompleted",
])
resp["events"][-1]["workflowExecutionCompletedEventAttributes"]["result"].should.equal("foo bar")
@mock_swf
def test_respond_decision_task_completed_with_close_decision_not_last():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
task_token = resp["taskToken"]
decisions = [
{ "decisionType": "CompleteWorkflowExecution" },
{ "decisionType": "WeDontCare" },
]
conn.respond_decision_task_completed.when.called_with(
task_token, decisions=decisions
).should.throw(SWFValidationException, r"Close must be last decision in list")
@mock_swf
def test_respond_decision_task_completed_with_invalid_decision_type():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
task_token = resp["taskToken"]
decisions = [
{ "decisionType": "BadDecisionType" },
{ "decisionType": "CompleteWorkflowExecution" },
]
conn.respond_decision_task_completed.when.called_with(
task_token, decisions=decisions
).should.throw(
SWFDecisionValidationException,
r"Value 'BadDecisionType' at 'decisions.1.member.decisionType'"
)
@mock_swf
def test_respond_decision_task_completed_with_missing_attributes():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
task_token = resp["taskToken"]
decisions = [
{
"decisionType": "should trigger even with incorrect decision type",
"startTimerDecisionAttributes": {}
},
]
conn.respond_decision_task_completed.when.called_with(
task_token, decisions=decisions
).should.throw(
SWFDecisionValidationException,
r"Value null at 'decisions.1.member.startTimerDecisionAttributes.timerId' " \
r"failed to satisfy constraint: Member must not be null"
)
@mock_swf
def test_respond_decision_task_completed_with_missing_attributes_totally():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
task_token = resp["taskToken"]
decisions = [
{ "decisionType": "StartTimer" },
]
conn.respond_decision_task_completed.when.called_with(
task_token, decisions=decisions
).should.throw(
SWFDecisionValidationException,
r"Value null at 'decisions.1.member.startTimerDecisionAttributes.timerId' " \
r"failed to satisfy constraint: Member must not be null"
)
@mock_swf
def test_respond_decision_task_completed_with_fail_workflow_execution():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
task_token = resp["taskToken"]
decisions = [{
"decisionType": "FailWorkflowExecution",
"failWorkflowExecutionDecisionAttributes": {"reason": "my rules", "details": "foo"}
}]
resp = conn.respond_decision_task_completed(task_token, decisions=decisions)
resp.should.be.none
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal([
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"DecisionTaskCompleted",
"WorkflowExecutionFailed",
])
attrs = resp["events"][-1]["workflowExecutionFailedEventAttributes"]
attrs["reason"].should.equal("my rules")
attrs["details"].should.equal("foo")
@mock_swf
@freeze_time("2015-01-01 12:00:00")
def test_respond_decision_task_completed_with_schedule_activity_task():
conn = setup_workflow()
resp = conn.poll_for_decision_task("test-domain", "queue")
task_token = resp["taskToken"]
decisions = [{
"decisionType": "ScheduleActivityTask",
"scheduleActivityTaskDecisionAttributes": {
"activityId": "my-activity-001",
"activityType": {
"name": "test-activity",
"version": "v1.1"
},
"heartbeatTimeout": "60",
"input": "123",
"taskList": {
"name": "my-task-list"
},
}
}]
resp = conn.respond_decision_task_completed(task_token, decisions=decisions)
resp.should.be.none
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal([
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"DecisionTaskCompleted",
"ActivityTaskScheduled",
])
resp["events"][-1]["activityTaskScheduledEventAttributes"].should.equal({
"decisionTaskCompletedEventId": 4,
"activityId": "my-activity-001",
"activityType": {
"name": "test-activity",
"version": "v1.1",
},
"heartbeatTimeout": "60",
"input": "123",
"taskList": {
"name": "my-task-list"
},
})
resp = conn.describe_workflow_execution("test-domain", conn.run_id, "uid-abcd1234")
resp["latestActivityTaskTimestamp"].should.equal(1420113600.0)

View File

@ -0,0 +1,117 @@
import boto
from sure import expect
from moto import mock_swf
from moto.swf.exceptions import (
SWFUnknownResourceFault,
SWFDomainAlreadyExistsFault,
SWFDomainDeprecatedFault,
SWFSerializationException,
)
# RegisterDomain endpoint
@mock_swf
def test_register_domain():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60", description="A test domain")
all_domains = conn.list_domains("REGISTERED")
domain = all_domains["domainInfos"][0]
domain["name"].should.equal("test-domain")
domain["status"].should.equal("REGISTERED")
domain["description"].should.equal("A test domain")
@mock_swf
def test_register_already_existing_domain():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60", description="A test domain")
conn.register_domain.when.called_with(
"test-domain", "60", description="A test domain"
).should.throw(SWFDomainAlreadyExistsFault)
@mock_swf
def test_register_with_wrong_parameter_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain.when.called_with(
"test-domain", 60, description="A test domain"
).should.throw(SWFSerializationException)
# ListDomains endpoint
@mock_swf
def test_list_domains_order():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("b-test-domain", "60")
conn.register_domain("a-test-domain", "60")
conn.register_domain("c-test-domain", "60")
all_domains = conn.list_domains("REGISTERED")
names = [domain["name"] for domain in all_domains["domainInfos"]]
names.should.equal(["a-test-domain", "b-test-domain", "c-test-domain"])
@mock_swf
def test_list_domains_reverse_order():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("b-test-domain", "60")
conn.register_domain("a-test-domain", "60")
conn.register_domain("c-test-domain", "60")
all_domains = conn.list_domains("REGISTERED", reverse_order=True)
names = [domain["name"] for domain in all_domains["domainInfos"]]
names.should.equal(["c-test-domain", "b-test-domain", "a-test-domain"])
# DeprecateDomain endpoint
@mock_swf
def test_deprecate_domain():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60", description="A test domain")
conn.deprecate_domain("test-domain")
all_domains = conn.list_domains("DEPRECATED")
domain = all_domains["domainInfos"][0]
domain["name"].should.equal("test-domain")
@mock_swf
def test_deprecate_already_deprecated_domain():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60", description="A test domain")
conn.deprecate_domain("test-domain")
conn.deprecate_domain.when.called_with(
"test-domain"
).should.throw(SWFDomainDeprecatedFault)
@mock_swf
def test_deprecate_non_existent_domain():
conn = boto.connect_swf("the_key", "the_secret")
conn.deprecate_domain.when.called_with(
"non-existent"
).should.throw(SWFUnknownResourceFault)
# DescribeDomain endpoint
@mock_swf
def test_describe_domain():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60", description="A test domain")
domain = conn.describe_domain("test-domain")
domain["configuration"]["workflowExecutionRetentionPeriodInDays"].should.equal("60")
domain["domainInfo"]["description"].should.equal("A test domain")
domain["domainInfo"]["name"].should.equal("test-domain")
domain["domainInfo"]["status"].should.equal("REGISTERED")
@mock_swf
def test_describe_non_existent_domain():
conn = boto.connect_swf("the_key", "the_secret")
conn.describe_domain.when.called_with(
"non-existent"
).should.throw(SWFUnknownResourceFault)

View File

@ -0,0 +1,100 @@
import boto
from freezegun import freeze_time
from sure import expect
from moto import mock_swf
from ..utils import setup_workflow, SCHEDULE_ACTIVITY_TASK_DECISION
# Activity Task Heartbeat timeout
# Default value in workflow helpers: 5 mins
@mock_swf
def test_activity_task_heartbeat_timeout():
with freeze_time("2015-01-01 12:00:00"):
conn = setup_workflow()
decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
conn.respond_decision_task_completed(decision_token, decisions=[
SCHEDULE_ACTIVITY_TASK_DECISION
])
conn.poll_for_activity_task("test-domain", "activity-task-list", identity="surprise")
with freeze_time("2015-01-01 12:04:30"):
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
resp["events"][-1]["eventType"].should.equal("ActivityTaskStarted")
with freeze_time("2015-01-01 12:05:30"):
# => Activity Task Heartbeat timeout reached!!
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
resp["events"][-2]["eventType"].should.equal("ActivityTaskTimedOut")
attrs = resp["events"][-2]["activityTaskTimedOutEventAttributes"]
attrs["timeoutType"].should.equal("HEARTBEAT")
# checks that event has been emitted at 12:05:00, not 12:05:30
resp["events"][-2]["eventTimestamp"].should.equal(1420113900)
resp["events"][-1]["eventType"].should.equal("DecisionTaskScheduled")
# Decision Task Start to Close timeout
# Default value in workflow helpers: 5 mins
@mock_swf
def test_decision_task_start_to_close_timeout():
pass
with freeze_time("2015-01-01 12:00:00"):
conn = setup_workflow()
conn.poll_for_decision_task("test-domain", "queue")["taskToken"]
with freeze_time("2015-01-01 12:04:30"):
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
event_types = [evt["eventType"] for evt in resp["events"]]
event_types.should.equal(
["WorkflowExecutionStarted", "DecisionTaskScheduled", "DecisionTaskStarted"]
)
with freeze_time("2015-01-01 12:05:30"):
# => Decision Task Start to Close timeout reached!!
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
event_types = [evt["eventType"] for evt in resp["events"]]
event_types.should.equal(
["WorkflowExecutionStarted", "DecisionTaskScheduled", "DecisionTaskStarted",
"DecisionTaskTimedOut", "DecisionTaskScheduled"]
)
attrs = resp["events"][-2]["decisionTaskTimedOutEventAttributes"]
attrs.should.equal({
"scheduledEventId": 2, "startedEventId": 3, "timeoutType": "START_TO_CLOSE"
})
# checks that event has been emitted at 12:05:00, not 12:05:30
resp["events"][-2]["eventTimestamp"].should.equal(1420113900)
# Workflow Execution Start to Close timeout
# Default value in workflow helpers: 2 hours
@mock_swf
def test_workflow_execution_start_to_close_timeout():
pass
with freeze_time("2015-01-01 12:00:00"):
conn = setup_workflow()
with freeze_time("2015-01-01 13:59:30"):
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
event_types = [evt["eventType"] for evt in resp["events"]]
event_types.should.equal(
["WorkflowExecutionStarted", "DecisionTaskScheduled"]
)
with freeze_time("2015-01-01 14:00:30"):
# => Workflow Execution Start to Close timeout reached!!
resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234")
event_types = [evt["eventType"] for evt in resp["events"]]
event_types.should.equal(
["WorkflowExecutionStarted", "DecisionTaskScheduled", "WorkflowExecutionTimedOut"]
)
attrs = resp["events"][-1]["workflowExecutionTimedOutEventAttributes"]
attrs.should.equal({
"childPolicy": "ABANDON", "timeoutType": "START_TO_CLOSE"
})
# checks that event has been emitted at 14:00:00, not 14:00:30
resp["events"][-1]["eventTimestamp"].should.equal(1420120800)

View File

@ -0,0 +1,163 @@
import boto
from sure import expect
from moto import mock_swf
from moto.swf.exceptions import (
SWFWorkflowExecutionAlreadyStartedFault,
SWFTypeDeprecatedFault,
SWFUnknownResourceFault,
)
# Utils
@mock_swf
def setup_swf_environment():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60", description="A test domain")
conn.register_workflow_type(
"test-domain", "test-workflow", "v1.0",
task_list="queue", default_child_policy="TERMINATE",
default_execution_start_to_close_timeout="300",
default_task_start_to_close_timeout="300",
)
conn.register_activity_type("test-domain", "test-activity", "v1.1")
return conn
# StartWorkflowExecution endpoint
@mock_swf
def test_start_workflow_execution():
conn = setup_swf_environment()
wf = conn.start_workflow_execution("test-domain", "uid-abcd1234", "test-workflow", "v1.0")
wf.should.contain("runId")
@mock_swf
def test_start_already_started_workflow_execution():
conn = setup_swf_environment()
conn.start_workflow_execution("test-domain", "uid-abcd1234", "test-workflow", "v1.0")
conn.start_workflow_execution.when.called_with(
"test-domain", "uid-abcd1234", "test-workflow", "v1.0"
).should.throw(SWFWorkflowExecutionAlreadyStartedFault)
@mock_swf
def test_start_workflow_execution_on_deprecated_type():
conn = setup_swf_environment()
conn.deprecate_workflow_type("test-domain", "test-workflow", "v1.0")
conn.start_workflow_execution.when.called_with(
"test-domain", "uid-abcd1234", "test-workflow", "v1.0"
).should.throw(SWFTypeDeprecatedFault)
# DescribeWorkflowExecution endpoint
@mock_swf
def test_describe_workflow_execution():
conn = setup_swf_environment()
hsh = conn.start_workflow_execution("test-domain", "uid-abcd1234", "test-workflow", "v1.0")
run_id = hsh["runId"]
wfe = conn.describe_workflow_execution("test-domain", run_id, "uid-abcd1234")
wfe["executionInfo"]["execution"]["workflowId"].should.equal("uid-abcd1234")
wfe["executionInfo"]["executionStatus"].should.equal("OPEN")
@mock_swf
def test_describe_non_existent_workflow_execution():
conn = setup_swf_environment()
conn.describe_workflow_execution.when.called_with(
"test-domain", "wrong-run-id", "wrong-workflow-id"
).should.throw(SWFUnknownResourceFault)
# GetWorkflowExecutionHistory endpoint
@mock_swf
def test_get_workflow_execution_history():
conn = setup_swf_environment()
hsh = conn.start_workflow_execution("test-domain", "uid-abcd1234", "test-workflow", "v1.0")
run_id = hsh["runId"]
resp = conn.get_workflow_execution_history("test-domain", run_id, "uid-abcd1234")
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal(["WorkflowExecutionStarted", "DecisionTaskScheduled"])
@mock_swf
def test_get_workflow_execution_history_with_reverse_order():
conn = setup_swf_environment()
hsh = conn.start_workflow_execution("test-domain", "uid-abcd1234", "test-workflow", "v1.0")
run_id = hsh["runId"]
resp = conn.get_workflow_execution_history("test-domain", run_id, "uid-abcd1234",
reverse_order=True)
types = [evt["eventType"] for evt in resp["events"]]
types.should.equal(["DecisionTaskScheduled", "WorkflowExecutionStarted"])
@mock_swf
def test_get_workflow_execution_history_on_non_existent_workflow_execution():
conn = setup_swf_environment()
conn.get_workflow_execution_history.when.called_with(
"test-domain", "wrong-run-id", "wrong-workflow-id"
).should.throw(SWFUnknownResourceFault)
# TerminateWorkflowExecution endpoint
@mock_swf
def test_terminate_workflow_execution():
conn = setup_swf_environment()
run_id = conn.start_workflow_execution(
"test-domain", "uid-abcd1234", "test-workflow", "v1.0"
)["runId"]
resp = conn.terminate_workflow_execution("test-domain", "uid-abcd1234",
details="some details",
reason="a more complete reason",
run_id=run_id)
resp.should.be.none
resp = conn.get_workflow_execution_history("test-domain", run_id, "uid-abcd1234")
evt = resp["events"][-1]
evt["eventType"].should.equal("WorkflowExecutionTerminated")
attrs = evt["workflowExecutionTerminatedEventAttributes"]
attrs["details"].should.equal("some details")
attrs["reason"].should.equal("a more complete reason")
attrs["cause"].should.equal("OPERATOR_INITIATED")
@mock_swf
def test_terminate_workflow_execution_with_wrong_workflow_or_run_id():
conn = setup_swf_environment()
run_id = conn.start_workflow_execution(
"test-domain", "uid-abcd1234", "test-workflow", "v1.0"
)["runId"]
# terminate workflow execution
resp = conn.terminate_workflow_execution("test-domain", "uid-abcd1234")
# already closed, with run_id
conn.terminate_workflow_execution.when.called_with(
"test-domain", "uid-abcd1234", run_id=run_id
).should.throw(
SWFUnknownResourceFault, "WorkflowExecution=[workflowId=uid-abcd1234, runId="
)
# already closed, without run_id
conn.terminate_workflow_execution.when.called_with(
"test-domain", "uid-abcd1234"
).should.throw(
SWFUnknownResourceFault, "Unknown execution, workflowId = uid-abcd1234"
)
# wrong workflow id
conn.terminate_workflow_execution.when.called_with(
"test-domain", "uid-non-existent"
).should.throw(
SWFUnknownResourceFault, "Unknown execution, workflowId = uid-non-existent"
)
# wrong run_id
conn.terminate_workflow_execution.when.called_with(
"test-domain", "uid-abcd1234", run_id="foo"
).should.throw(
SWFUnknownResourceFault, "WorkflowExecution=[workflowId=uid-abcd1234, runId="
)

View File

@ -0,0 +1,130 @@
import boto
from sure import expect
from moto import mock_swf
from moto.swf.exceptions import (
SWFUnknownResourceFault,
SWFTypeAlreadyExistsFault,
SWFTypeDeprecatedFault,
SWFSerializationException,
)
# RegisterWorkflowType endpoint
@mock_swf
def test_register_workflow_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_workflow_type("test-domain", "test-workflow", "v1.0")
types = conn.list_workflow_types("test-domain", "REGISTERED")
actype = types["typeInfos"][0]
actype["workflowType"]["name"].should.equal("test-workflow")
actype["workflowType"]["version"].should.equal("v1.0")
@mock_swf
def test_register_already_existing_workflow_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_workflow_type("test-domain", "test-workflow", "v1.0")
conn.register_workflow_type.when.called_with(
"test-domain", "test-workflow", "v1.0"
).should.throw(SWFTypeAlreadyExistsFault)
@mock_swf
def test_register_with_wrong_parameter_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_workflow_type.when.called_with(
"test-domain", "test-workflow", 12
).should.throw(SWFSerializationException)
# ListWorkflowTypes endpoint
@mock_swf
def test_list_workflow_types():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_workflow_type("test-domain", "b-test-workflow", "v1.0")
conn.register_workflow_type("test-domain", "a-test-workflow", "v1.0")
conn.register_workflow_type("test-domain", "c-test-workflow", "v1.0")
all_workflow_types = conn.list_workflow_types("test-domain", "REGISTERED")
names = [activity_type["workflowType"]["name"] for activity_type in all_workflow_types["typeInfos"]]
names.should.equal(["a-test-workflow", "b-test-workflow", "c-test-workflow"])
@mock_swf
def test_list_workflow_types_reverse_order():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_workflow_type("test-domain", "b-test-workflow", "v1.0")
conn.register_workflow_type("test-domain", "a-test-workflow", "v1.0")
conn.register_workflow_type("test-domain", "c-test-workflow", "v1.0")
all_workflow_types = conn.list_workflow_types("test-domain", "REGISTERED",
reverse_order=True)
names = [activity_type["workflowType"]["name"] for activity_type in all_workflow_types["typeInfos"]]
names.should.equal(["c-test-workflow", "b-test-workflow", "a-test-workflow"])
# DeprecateWorkflowType endpoint
@mock_swf
def test_deprecate_workflow_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_workflow_type("test-domain", "test-workflow", "v1.0")
conn.deprecate_workflow_type("test-domain", "test-workflow", "v1.0")
actypes = conn.list_workflow_types("test-domain", "DEPRECATED")
actype = actypes["typeInfos"][0]
actype["workflowType"]["name"].should.equal("test-workflow")
actype["workflowType"]["version"].should.equal("v1.0")
@mock_swf
def test_deprecate_already_deprecated_workflow_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_workflow_type("test-domain", "test-workflow", "v1.0")
conn.deprecate_workflow_type("test-domain", "test-workflow", "v1.0")
conn.deprecate_workflow_type.when.called_with(
"test-domain", "test-workflow", "v1.0"
).should.throw(SWFTypeDeprecatedFault)
@mock_swf
def test_deprecate_non_existent_workflow_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.deprecate_workflow_type.when.called_with(
"test-domain", "non-existent", "v1.0"
).should.throw(SWFUnknownResourceFault)
# DescribeWorkflowType endpoint
@mock_swf
def test_describe_workflow_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.register_workflow_type("test-domain", "test-workflow", "v1.0",
task_list="foo", default_child_policy="TERMINATE")
actype = conn.describe_workflow_type("test-domain", "test-workflow", "v1.0")
actype["configuration"]["defaultTaskList"]["name"].should.equal("foo")
actype["configuration"]["defaultChildPolicy"].should.equal("TERMINATE")
actype["configuration"].keys().should_not.contain("defaultTaskStartToCloseTimeout")
infos = actype["typeInfo"]
infos["workflowType"]["name"].should.equal("test-workflow")
infos["workflowType"]["version"].should.equal("v1.0")
infos["status"].should.equal("REGISTERED")
@mock_swf
def test_describe_non_existent_workflow_type():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60")
conn.describe_workflow_type.when.called_with(
"test-domain", "non-existent", "v1.0"
).should.throw(SWFUnknownResourceFault)

View File

@ -0,0 +1,152 @@
from __future__ import unicode_literals
from moto.swf.exceptions import (
SWFClientError,
SWFUnknownResourceFault,
SWFDomainAlreadyExistsFault,
SWFDomainDeprecatedFault,
SWFSerializationException,
SWFTypeAlreadyExistsFault,
SWFTypeDeprecatedFault,
SWFWorkflowExecutionAlreadyStartedFault,
SWFDefaultUndefinedFault,
SWFValidationException,
SWFDecisionValidationException,
)
from moto.swf.models import (
WorkflowType,
)
def test_swf_client_error():
ex = SWFClientError("error message", "ASpecificType")
ex.status.should.equal(400)
ex.error_code.should.equal("ASpecificType")
ex.body.should.equal({
"__type": "ASpecificType",
"message": "error message"
})
def test_swf_unknown_resource_fault():
ex = SWFUnknownResourceFault("type", "detail")
ex.status.should.equal(400)
ex.error_code.should.equal("UnknownResourceFault")
ex.body.should.equal({
"__type": "com.amazonaws.swf.base.model#UnknownResourceFault",
"message": "Unknown type: detail"
})
def test_swf_unknown_resource_fault_with_only_one_parameter():
ex = SWFUnknownResourceFault("foo bar baz")
ex.status.should.equal(400)
ex.error_code.should.equal("UnknownResourceFault")
ex.body.should.equal({
"__type": "com.amazonaws.swf.base.model#UnknownResourceFault",
"message": "Unknown foo bar baz"
})
def test_swf_domain_already_exists_fault():
ex = SWFDomainAlreadyExistsFault("domain-name")
ex.status.should.equal(400)
ex.error_code.should.equal("DomainAlreadyExistsFault")
ex.body.should.equal({
"__type": "com.amazonaws.swf.base.model#DomainAlreadyExistsFault",
"message": "domain-name"
})
def test_swf_domain_deprecated_fault():
ex = SWFDomainDeprecatedFault("domain-name")
ex.status.should.equal(400)
ex.error_code.should.equal("DomainDeprecatedFault")
ex.body.should.equal({
"__type": "com.amazonaws.swf.base.model#DomainDeprecatedFault",
"message": "domain-name"
})
def test_swf_serialization_exception():
ex = SWFSerializationException("value")
ex.status.should.equal(400)
ex.error_code.should.equal("SerializationException")
ex.body["__type"].should.equal("com.amazonaws.swf.base.model#SerializationException")
ex.body["Message"].should.match(r"class java.lang.Foo can not be converted to an String")
def test_swf_type_already_exists_fault():
wft = WorkflowType("wf-name", "wf-version")
ex = SWFTypeAlreadyExistsFault(wft)
ex.status.should.equal(400)
ex.error_code.should.equal("TypeAlreadyExistsFault")
ex.body.should.equal({
"__type": "com.amazonaws.swf.base.model#TypeAlreadyExistsFault",
"message": "WorkflowType=[name=wf-name, version=wf-version]"
})
def test_swf_type_deprecated_fault():
wft = WorkflowType("wf-name", "wf-version")
ex = SWFTypeDeprecatedFault(wft)
ex.status.should.equal(400)
ex.error_code.should.equal("TypeDeprecatedFault")
ex.body.should.equal({
"__type": "com.amazonaws.swf.base.model#TypeDeprecatedFault",
"message": "WorkflowType=[name=wf-name, version=wf-version]"
})
def test_swf_workflow_execution_already_started_fault():
ex = SWFWorkflowExecutionAlreadyStartedFault()
ex.status.should.equal(400)
ex.error_code.should.equal("WorkflowExecutionAlreadyStartedFault")
ex.body.should.equal({
"__type": "com.amazonaws.swf.base.model#WorkflowExecutionAlreadyStartedFault",
})
def test_swf_default_undefined_fault():
ex = SWFDefaultUndefinedFault("execution_start_to_close_timeout")
ex.status.should.equal(400)
ex.error_code.should.equal("DefaultUndefinedFault")
ex.body.should.equal({
"__type": "com.amazonaws.swf.base.model#DefaultUndefinedFault",
"message": "executionStartToCloseTimeout",
})
def test_swf_validation_exception():
ex = SWFValidationException("Invalid token")
ex.status.should.equal(400)
ex.error_code.should.equal("ValidationException")
ex.body.should.equal({
"__type": "com.amazon.coral.validate#ValidationException",
"message": "Invalid token",
})
def test_swf_decision_validation_error():
ex = SWFDecisionValidationException([
{ "type": "null_value",
"where": "decisions.1.member.startTimerDecisionAttributes.startToFireTimeout" },
{ "type": "bad_decision_type",
"value": "FooBar",
"where": "decisions.1.member.decisionType",
"possible_values": "Foo, Bar, Baz"},
])
ex.status.should.equal(400)
ex.error_code.should.equal("ValidationException")
ex.body["__type"].should.equal("com.amazon.coral.validate#ValidationException")
msg = ex.body["message"]
msg.should.match(r"^2 validation errors detected:")
msg.should.match(
r"Value null at 'decisions.1.member.startTimerDecisionAttributes.startToFireTimeout' "\
r"failed to satisfy constraint: Member must not be null;"
)
msg.should.match(
r"Value 'FooBar' at 'decisions.1.member.decisionType' failed to satisfy constraint: " \
r"Member must satisfy enum value set: \[Foo, Bar, Baz\]"
)

View File

@ -0,0 +1,21 @@
from freezegun import freeze_time
from sure import expect
from moto.swf.utils import (
decapitalize,
now_timestamp,
)
def test_decapitalize():
cases = {
"fooBar": "fooBar",
"FooBar": "fooBar",
"FOO BAR": "fOO BAR",
}
for before, after in cases.items():
decapitalize(before).should.equal(after)
@freeze_time("2015-01-01 12:00:00")
def test_now_timestamp():
now_timestamp().should.equal(1420113600.0)

97
tests/test_swf/utils.py Normal file
View File

@ -0,0 +1,97 @@
import boto
from moto import mock_swf
from moto.swf.models import (
ActivityType,
Domain,
WorkflowType,
WorkflowExecution,
)
# Some useful constants
# Here are some activity timeouts we use in moto/swf tests ; they're extracted
# from semi-real world example, the goal is mostly to have predictible and
# intuitive behaviour in moto/swf own tests...
ACTIVITY_TASK_TIMEOUTS = {
"heartbeatTimeout": "300", # 5 mins
"scheduleToStartTimeout": "1800", # 30 mins
"startToCloseTimeout": "1800", # 30 mins
"scheduleToCloseTimeout": "2700", # 45 mins
}
# Some useful decisions
SCHEDULE_ACTIVITY_TASK_DECISION = {
"decisionType": "ScheduleActivityTask",
"scheduleActivityTaskDecisionAttributes": {
"activityId": "my-activity-001",
"activityType": { "name": "test-activity", "version": "v1.1" },
"taskList": { "name": "activity-task-list" },
}
}
for key, value in ACTIVITY_TASK_TIMEOUTS.items():
SCHEDULE_ACTIVITY_TASK_DECISION["scheduleActivityTaskDecisionAttributes"][key] = value
# A test Domain
def get_basic_domain():
return Domain("test-domain", "90")
# A test WorkflowType
def _generic_workflow_type_attributes():
return [
"test-workflow", "v1.0"
], {
"task_list": "queue",
"default_child_policy": "ABANDON",
"default_execution_start_to_close_timeout": "7200",
"default_task_start_to_close_timeout": "300",
}
def get_basic_workflow_type():
args, kwargs = _generic_workflow_type_attributes()
return WorkflowType(*args, **kwargs)
def mock_basic_workflow_type(domain_name, conn):
args, kwargs = _generic_workflow_type_attributes()
conn.register_workflow_type(domain_name, *args, **kwargs)
return conn
# A test WorkflowExecution
def make_workflow_execution(**kwargs):
domain = get_basic_domain()
domain.add_type(ActivityType("test-activity", "v1.1"))
wft = get_basic_workflow_type()
return WorkflowExecution(domain, wft, "ab1234", **kwargs)
# Makes decision tasks start automatically on a given workflow
def auto_start_decision_tasks(wfe):
wfe.schedule_decision_task = wfe.schedule_and_start_decision_task
return wfe
# Setup a complete example workflow and return the connection object
@mock_swf
def setup_workflow():
conn = boto.connect_swf("the_key", "the_secret")
conn.register_domain("test-domain", "60", description="A test domain")
conn = mock_basic_workflow_type("test-domain", conn)
conn.register_activity_type(
"test-domain", "test-activity", "v1.1",
default_task_heartbeat_timeout="600",
default_task_schedule_to_close_timeout="600",
default_task_schedule_to_start_timeout="600",
default_task_start_to_close_timeout="600",
)
wfe = conn.start_workflow_execution("test-domain", "uid-abcd1234", "test-workflow", "v1.0")
conn.run_id = wfe["runId"]
return conn
# A helper for processing the first timeout on a given object
def process_first_timeout(obj):
_timeout = obj.first_timeout()
if _timeout:
obj.timeout(_timeout)