Add list_open_workflow_executions endpoint

This commit is contained in:
Ian Fillion-de Kiewit 2016-02-04 17:14:33 -05:00
parent 129b4faff8
commit 93120927f7
5 changed files with 158 additions and 1 deletions

View File

@ -61,6 +61,21 @@ class SWFBackend(BaseBackend):
domains = reversed(domains)
return domains
def list_open_workflow_executions(self, domain_name, start_time_filter,
maximum_page_size, reverse_order,
**kwargs):
self._process_timeouts()
domain = self._get_domain(domain_name)
if domain.status == "DEPRECATED":
raise SWFDomainDeprecatedFault(domain_name)
open_wfes = [
wfe for wfe in domain.workflow_executions
if wfe.execution_status == 'OPEN'
]
if reverse_order:
open_wfes = reversed(open_wfes)
return open_wfes[0:maximum_page_size]
def register_domain(self, name, workflow_execution_retention_period_in_days,
description=None):
if self._get_domain(name, ignore_empty=True):

View File

@ -146,6 +146,27 @@ class WorkflowExecution(object):
hsh["latestActivityTaskTimestamp"] = self.latest_activity_task_timestamp
return hsh
def to_list_dict(self):
hsh = {
'execution': {
'workflowId': self.workflow_id,
'runId': self.run_id,
},
'workflowType': self.workflow_type.to_short_dict(),
'startTimestamp': self.start_timestamp,
'executionStatus': self.execution_status,
'cancelRequested': self.cancel_requested,
}
if self.tag_list:
hsh['tagList'] = self.tag_list
if self.parent:
hsh['parent'] = self.parent
if self.close_status:
hsh['closeStatus'] = self.close_status
if self.close_timestamp:
hsh['closeTimestamp'] = self.close_timestamp
return hsh
def _process_timeouts(self):
"""
SWF timeouts can happen on different objects (workflow executions,

View File

@ -3,7 +3,7 @@ import six
from moto.core.responses import BaseResponse
from .exceptions import SWFSerializationException
from .exceptions import SWFSerializationException, SWFValidationException
from .models import swf_backends
@ -18,6 +18,10 @@ class SWFResponse(BaseResponse):
def _params(self):
return json.loads(self.body.decode("utf-8"))
def _check_int(self, parameter):
if not isinstance(parameter, int):
raise SWFSerializationException(parameter)
def _check_none_or_string(self, parameter):
if parameter is not None:
self._check_string(parameter)
@ -37,6 +41,18 @@ class SWFResponse(BaseResponse):
if not isinstance(i, six.string_types):
raise SWFSerializationException(parameter)
def _check_exclusivity(self, **kwargs):
if kwargs.values().count(None) >= len(kwargs) - 1:
return
keys = kwargs.keys()
if len(keys) == 2:
message = 'Cannot specify both a {0} and a {1}'.format(keys[0],
keys[1])
else:
message = 'Cannot specify more than one exclusive filters in the' \
' same query: {0}'.format(keys)
raise SWFValidationException(message)
def _list_types(self, kind):
domain_name = self._params["domain"]
status = self._params["registrationStatus"]
@ -81,6 +97,43 @@ class SWFResponse(BaseResponse):
"domainInfos": [domain.to_short_dict() for domain in domains]
})
def list_open_workflow_executions(self):
domain = self._params['domain']
start_time_filter = self._params['startTimeFilter']
execution_filter = self._params.get('executionFilter', None)
workflow_id = execution_filter['workflowId'] if execution_filter else None
maximum_page_size = self._params.get('maximumPageSize', 1000)
reverse_order = self._params.get('reverseOrder', None)
tag_filter = self._params.get('tagFilter', None)
type_filter = self._params.get('typeFilter', None)
self._check_string(domain)
self._check_none_or_string(workflow_id)
self._check_exclusivity(executionFilter=execution_filter,
typeFilter=type_filter,
tagFilter=tag_filter)
if tag_filter:
self._check_string(tag_filter['tag'])
if type_filter:
self._check_string(type_filter['name'])
self._check_string(type_filter['version'])
self._check_int(maximum_page_size)
workflow_executions = self.swf_backend.list_open_workflow_executions(
domain_name=domain,
start_time_filter=start_time_filter,
execution_filter=execution_filter,
tag_filter=tag_filter,
type_filter=type_filter,
maximum_page_size=maximum_page_size,
reverse_order=reverse_order,
workflow_id=workflow_id
)
return json.dumps({
'executionInfos': [wfe.to_list_dict() for wfe in workflow_executions]
})
def register_domain(self):
name = self._params["name"]
retention = self._params["workflowExecutionRetentionPeriodInDays"]

View File

@ -1,4 +1,5 @@
from freezegun import freeze_time
import sure # noqa
from moto.swf.models import (
ActivityType,
@ -141,6 +142,26 @@ def test_workflow_execution_full_dict_representation():
})
def test_workflow_execution_list_dict_representation():
domain = get_basic_domain()
wf_type = WorkflowType(
'test-workflow', 'v1.0',
task_list='queue', default_child_policy='ABANDON',
default_execution_start_to_close_timeout='300',
default_task_start_to_close_timeout='300',
)
wfe = WorkflowExecution(domain, wf_type, 'ab1234')
ld = wfe.to_list_dict()
ld['workflowType']['version'].should.equal('v1.0')
ld['workflowType']['name'].should.equal('test-workflow')
ld['executionStatus'].should.equal('OPEN')
ld['execution']['workflowId'].should.equal('ab1234')
ld['execution'].should.contain('runId')
ld['cancelRequested'].should.be.false
ld.should.contain('startTimestamp')
def test_workflow_execution_schedule_decision_task():
wfe = make_workflow_execution()
wfe.open_counts["openDecisionTasks"].should.equal(0)

View File

@ -1,10 +1,13 @@
import boto
from boto.swf.exceptions import SWFResponseError
from datetime import datetime, timedelta
import sure # noqa
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises # noqa
from moto import mock_swf
from moto.core.utils import iso_8601_datetime_with_milliseconds
# Utils
@ -105,6 +108,50 @@ def test_get_workflow_execution_history_on_non_existent_workflow_execution():
).should.throw(SWFResponseError)
# ListOpenWorkflowExecutions endpoint
@mock_swf
def test_list_open_workflow_executions():
conn = setup_swf_environment()
conn.start_workflow_execution(
'test-domain', 'uid-abcd1234', 'test-workflow', 'v1.0'
)
yesterday = datetime.now() - timedelta(days=1)
oldest_date = iso_8601_datetime_with_milliseconds(yesterday)
response = conn.list_open_workflow_executions('test-domain',
oldest_date,
workflow_id='test-workflow')
execution_infos = response['executionInfos']
len(execution_infos).should.equal(1)
open_workflow = execution_infos[0]
open_workflow['workflowType'].should.equal({'version': 'v1.0',
'name': 'test-workflow'})
open_workflow.should.contain('startTimestamp')
open_workflow['execution']['workflowId'].should.equal('uid-abcd1234')
open_workflow['execution'].should.contain('runId')
open_workflow['cancelRequested'].should.be(False)
open_workflow['executionStatus'].should.equal('OPEN')
@mock_swf
def test_list_open_workflow_executions_does_not_show_closed():
conn = setup_swf_environment()
run_id = conn.start_workflow_execution(
'test-domain', 'uid-abcd1234', 'test-workflow', 'v1.0'
)['runId']
conn.terminate_workflow_execution('test-domain', 'uid-abcd1234',
details='some details',
reason='a more complete reason',
run_id=run_id)
yesterday = datetime.now() - timedelta(days=1)
oldest_date = iso_8601_datetime_with_milliseconds(yesterday)
response = conn.list_open_workflow_executions('test-domain',
oldest_date,
workflow_id='test-workflow')
response['executionInfos'].should.be.empty
# TerminateWorkflowExecution endpoint
@mock_swf
def test_terminate_workflow_execution():