Add list_closed_workflow_executions and make validation/filtering on list_open_workflow_executions better

This commit is contained in:
Ian Fillion-de Kiewit 2016-02-05 15:35:46 -05:00
parent 93120927f7
commit d31105889e
4 changed files with 127 additions and 13 deletions

View File

@ -61,9 +61,8 @@ class SWFBackend(BaseBackend):
domains = reversed(domains)
return domains
def list_open_workflow_executions(self, domain_name, start_time_filter,
maximum_page_size, reverse_order,
**kwargs):
def list_open_workflow_executions(self, domain_name, maximum_page_size,
tag_filter, reverse_order, **kwargs):
self._process_timeouts()
domain = self._get_domain(domain_name)
if domain.status == "DEPRECATED":
@ -72,10 +71,38 @@ class SWFBackend(BaseBackend):
wfe for wfe in domain.workflow_executions
if wfe.execution_status == 'OPEN'
]
if tag_filter:
for open_wfe in open_wfes:
if tag_filter['tag'] not in open_wfe.tag_list:
open_wfes.remove(open_wfe)
if reverse_order:
open_wfes = reversed(open_wfes)
return open_wfes[0:maximum_page_size]
def list_closed_workflow_executions(self, domain_name, close_time_filter,
tag_filter, close_status_filter, maximum_page_size, reverse_order,
**kwargs):
self._process_timeouts()
domain = self._get_domain(domain_name)
if domain.status == "DEPRECATED":
raise SWFDomainDeprecatedFault(domain_name)
closed_wfes = [
wfe for wfe in domain.workflow_executions
if wfe.execution_status == 'CLOSED'
]
if tag_filter:
for closed_wfe in closed_wfes:
if tag_filter['tag'] not in closed_wfe.tag_list:
closed_wfes.remove(closed_wfe)
if close_status_filter:
for closed_wfe in closed_wfes:
if close_status_filter != closed_wfe.close_status:
closed_wfes.remove(closed_wfe)
if reverse_order:
closed_wfes = reversed(closed_wfes)
return closed_wfes[0:maximum_page_size]
def register_domain(self, name, workflow_execution_retention_period_in_days,
description=None):
if self._get_domain(name, ignore_empty=True):
@ -123,13 +150,13 @@ class SWFBackend(BaseBackend):
def start_workflow_execution(self, domain_name, workflow_id,
workflow_name, workflow_version,
tag_list=None, **kwargs):
tag_list=None, input=None, **kwargs):
domain = self._get_domain(domain_name)
wf_type = domain.get_type("workflow", workflow_name, workflow_version)
if wf_type.status == "DEPRECATED":
raise SWFTypeDeprecatedFault(wf_type)
wfe = WorkflowExecution(domain, wf_type, workflow_id,
tag_list=tag_list, **kwargs)
tag_list=tag_list, input=input, **kwargs)
domain.add_workflow_execution(wfe)
wfe.start()

View File

@ -57,7 +57,7 @@ class WorkflowExecution(object):
self.latest_execution_context = None
self.parent = None
self.start_timestamp = None
self.tag_list = [] # TODO
self.tag_list = kwargs.get("tag_list", None) or []
self.timeout_type = None
self.workflow_type = workflow_type
# args processing
@ -254,6 +254,7 @@ class WorkflowExecution(object):
task_list=self.task_list,
task_start_to_close_timeout=self.task_start_to_close_timeout,
workflow_type=self.workflow_type,
input=self.input
)
self.schedule_decision_task()

View File

@ -97,6 +97,62 @@ class SWFResponse(BaseResponse):
"domainInfos": [domain.to_short_dict() for domain in domains]
})
def list_closed_workflow_executions(self):
domain = self._params['domain']
start_time_filter = self._params.get('startTimeFilter', None)
close_time_filter = self._params.get('closeTimeFilter', None)
execution_filter = self._params.get('executionFilter', None)
workflow_id = execution_filter['workflowId'] if execution_filter else None
maximum_page_size = self._params.get('maximumPageSize', 1000)
reverse_order = self._params.get('reverseOrder', None)
tag_filter = self._params.get('tagFilter', None)
type_filter = self._params.get('typeFilter', None)
close_status_filter = self._params.get('closeStatusFilter', None)
self._check_string(domain)
self._check_none_or_string(workflow_id)
self._check_exclusivity(executionFilter=execution_filter,
typeFilter=type_filter,
tagFilter=tag_filter,
closeStatusFilter=close_status_filter)
self._check_exclusivity(startTimeFilter=start_time_filter,
closeTimeFilter=close_time_filter)
if start_time_filter is None and close_time_filter is None:
raise SWFValidationException('Must specify time filter')
if start_time_filter:
self._check_int(start_time_filter['oldestDate'])
if 'latestDate' in start_time_filter:
self._check_int(start_time_filter['latestDate'])
if close_time_filter:
self._check_int(close_time_filter['oldestDate'])
if 'latestDate' in close_time_filter:
self._check_int(close_time_filter['latestDate'])
if tag_filter:
self._check_string(tag_filter['tag'])
if type_filter:
self._check_string(type_filter['name'])
self._check_string(type_filter['version'])
if close_status_filter:
self._check_string(close_status_filter['status'])
self._check_int(maximum_page_size)
workflow_executions = self.swf_backend.list_closed_workflow_executions(
domain_name=domain,
start_time_filter=start_time_filter,
close_time_filter=close_time_filter,
execution_filter=execution_filter,
tag_filter=tag_filter,
type_filter=type_filter,
maximum_page_size=maximum_page_size,
reverse_order=reverse_order,
workflow_id=workflow_id,
close_status_filter=close_status_filter
)
return json.dumps({
'executionInfos': [wfe.to_list_dict() for wfe in workflow_executions]
})
def list_open_workflow_executions(self):
domain = self._params['domain']
start_time_filter = self._params['startTimeFilter']
@ -112,6 +168,9 @@ class SWFResponse(BaseResponse):
self._check_exclusivity(executionFilter=execution_filter,
typeFilter=type_filter,
tagFilter=tag_filter)
self._check_int(start_time_filter['oldestDate'])
if 'latestDate' in start_time_filter:
self._check_int(start_time_filter['latestDate'])
if tag_filter:
self._check_string(tag_filter['tag'])
if type_filter:
@ -339,6 +398,8 @@ class SWFResponse(BaseResponse):
return json.dumps({"count": count, "truncated": False})
def respond_decision_task_completed(self):
import ipdb; ipdb.set_trace()
task_token = self._params["taskToken"]
execution_context = self._params.get("executionContext")
decisions = self._params.get("decisions")

View File

@ -112,9 +112,18 @@ def test_get_workflow_execution_history_on_non_existent_workflow_execution():
@mock_swf
def test_list_open_workflow_executions():
conn = setup_swf_environment()
# One open workflow execution
conn.start_workflow_execution(
'test-domain', 'uid-abcd1234', 'test-workflow', 'v1.0'
)
# One closed workflow execution to make sure it isn't displayed
run_id = conn.start_workflow_execution(
'test-domain', 'uid-abcd12345', 'test-workflow', 'v1.0'
)['runId']
conn.terminate_workflow_execution('test-domain', 'uid-abcd12345',
details='some details',
reason='a more complete reason',
run_id=run_id)
yesterday = datetime.now() - timedelta(days=1)
oldest_date = iso_8601_datetime_with_milliseconds(yesterday)
@ -133,23 +142,39 @@ def test_list_open_workflow_executions():
open_workflow['executionStatus'].should.equal('OPEN')
# ListClosedWorkflowExecutions endpoint
@mock_swf
def test_list_open_workflow_executions_does_not_show_closed():
def test_list_closed_workflow_executions():
conn = setup_swf_environment()
run_id = conn.start_workflow_execution(
# Leave one workflow execution open to make sure it isn't displayed
conn.start_workflow_execution(
'test-domain', 'uid-abcd1234', 'test-workflow', 'v1.0'
)
# One closed workflow execution
run_id = conn.start_workflow_execution(
'test-domain', 'uid-abcd12345', 'test-workflow', 'v1.0'
)['runId']
conn.terminate_workflow_execution('test-domain', 'uid-abcd1234',
conn.terminate_workflow_execution('test-domain', 'uid-abcd12345',
details='some details',
reason='a more complete reason',
run_id=run_id)
yesterday = datetime.now() - timedelta(days=1)
oldest_date = iso_8601_datetime_with_milliseconds(yesterday)
response = conn.list_open_workflow_executions('test-domain',
oldest_date,
workflow_id='test-workflow')
response['executionInfos'].should.be.empty
response = conn.list_closed_workflow_executions(
'test-domain',
start_oldest_date=oldest_date,
workflow_id='test-workflow')
execution_infos = response['executionInfos']
len(execution_infos).should.equal(1)
open_workflow = execution_infos[0]
open_workflow['workflowType'].should.equal({'version': 'v1.0',
'name': 'test-workflow'})
open_workflow.should.contain('startTimestamp')
open_workflow['execution']['workflowId'].should.equal('uid-abcd12345')
open_workflow['execution'].should.contain('runId')
open_workflow['cancelRequested'].should.be(False)
open_workflow['executionStatus'].should.equal('CLOSED')
# TerminateWorkflowExecution endpoint