From d31105889e6760fd698835fce673ff42b4bd22bc Mon Sep 17 00:00:00 2001 From: Ian Fillion-de Kiewit Date: Fri, 5 Feb 2016 15:35:46 -0500 Subject: [PATCH] Add list_closed_workflow_executions and make validation/filtering on list_open_workflow_executions better --- moto/swf/models/__init__.py | 37 +++++++++-- moto/swf/models/workflow_execution.py | 3 +- moto/swf/responses.py | 61 +++++++++++++++++++ .../responses/test_workflow_executions.py | 39 +++++++++--- 4 files changed, 127 insertions(+), 13 deletions(-) diff --git a/moto/swf/models/__init__.py b/moto/swf/models/__init__.py index 729c1d135..61fe5f52a 100644 --- a/moto/swf/models/__init__.py +++ b/moto/swf/models/__init__.py @@ -61,9 +61,8 @@ class SWFBackend(BaseBackend): domains = reversed(domains) return domains - def list_open_workflow_executions(self, domain_name, start_time_filter, - maximum_page_size, reverse_order, - **kwargs): + def list_open_workflow_executions(self, domain_name, maximum_page_size, + tag_filter, reverse_order, **kwargs): self._process_timeouts() domain = self._get_domain(domain_name) if domain.status == "DEPRECATED": @@ -72,10 +71,38 @@ class SWFBackend(BaseBackend): wfe for wfe in domain.workflow_executions if wfe.execution_status == 'OPEN' ] + + if tag_filter: + for open_wfe in open_wfes: + if tag_filter['tag'] not in open_wfe.tag_list: + open_wfes.remove(open_wfe) if reverse_order: open_wfes = reversed(open_wfes) return open_wfes[0:maximum_page_size] + def list_closed_workflow_executions(self, domain_name, close_time_filter, + tag_filter, close_status_filter, maximum_page_size, reverse_order, + **kwargs): + self._process_timeouts() + domain = self._get_domain(domain_name) + if domain.status == "DEPRECATED": + raise SWFDomainDeprecatedFault(domain_name) + closed_wfes = [ + wfe for wfe in domain.workflow_executions + if wfe.execution_status == 'CLOSED' + ] + if tag_filter: + for closed_wfe in closed_wfes: + if tag_filter['tag'] not in closed_wfe.tag_list: + closed_wfes.remove(closed_wfe) + if close_status_filter: + for closed_wfe in closed_wfes: + if close_status_filter != closed_wfe.close_status: + closed_wfes.remove(closed_wfe) + if reverse_order: + closed_wfes = reversed(closed_wfes) + return closed_wfes[0:maximum_page_size] + def register_domain(self, name, workflow_execution_retention_period_in_days, description=None): if self._get_domain(name, ignore_empty=True): @@ -123,13 +150,13 @@ class SWFBackend(BaseBackend): def start_workflow_execution(self, domain_name, workflow_id, workflow_name, workflow_version, - tag_list=None, **kwargs): + tag_list=None, input=None, **kwargs): domain = self._get_domain(domain_name) wf_type = domain.get_type("workflow", workflow_name, workflow_version) if wf_type.status == "DEPRECATED": raise SWFTypeDeprecatedFault(wf_type) wfe = WorkflowExecution(domain, wf_type, workflow_id, - tag_list=tag_list, **kwargs) + tag_list=tag_list, input=input, **kwargs) domain.add_workflow_execution(wfe) wfe.start() diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index 661250705..a30c2e18d 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -57,7 +57,7 @@ class WorkflowExecution(object): self.latest_execution_context = None self.parent = None self.start_timestamp = None - self.tag_list = [] # TODO + self.tag_list = kwargs.get("tag_list", None) or [] self.timeout_type = None self.workflow_type = workflow_type # args processing @@ -254,6 +254,7 @@ class WorkflowExecution(object): task_list=self.task_list, task_start_to_close_timeout=self.task_start_to_close_timeout, workflow_type=self.workflow_type, + input=self.input ) self.schedule_decision_task() diff --git a/moto/swf/responses.py b/moto/swf/responses.py index e2fb28636..a4975677f 100644 --- a/moto/swf/responses.py +++ b/moto/swf/responses.py @@ -97,6 +97,62 @@ class SWFResponse(BaseResponse): "domainInfos": [domain.to_short_dict() for domain in domains] }) + def list_closed_workflow_executions(self): + domain = self._params['domain'] + start_time_filter = self._params.get('startTimeFilter', None) + close_time_filter = self._params.get('closeTimeFilter', None) + execution_filter = self._params.get('executionFilter', None) + workflow_id = execution_filter['workflowId'] if execution_filter else None + maximum_page_size = self._params.get('maximumPageSize', 1000) + reverse_order = self._params.get('reverseOrder', None) + tag_filter = self._params.get('tagFilter', None) + type_filter = self._params.get('typeFilter', None) + close_status_filter = self._params.get('closeStatusFilter', None) + + self._check_string(domain) + self._check_none_or_string(workflow_id) + self._check_exclusivity(executionFilter=execution_filter, + typeFilter=type_filter, + tagFilter=tag_filter, + closeStatusFilter=close_status_filter) + self._check_exclusivity(startTimeFilter=start_time_filter, + closeTimeFilter=close_time_filter) + if start_time_filter is None and close_time_filter is None: + raise SWFValidationException('Must specify time filter') + if start_time_filter: + self._check_int(start_time_filter['oldestDate']) + if 'latestDate' in start_time_filter: + self._check_int(start_time_filter['latestDate']) + if close_time_filter: + self._check_int(close_time_filter['oldestDate']) + if 'latestDate' in close_time_filter: + self._check_int(close_time_filter['latestDate']) + if tag_filter: + self._check_string(tag_filter['tag']) + if type_filter: + self._check_string(type_filter['name']) + self._check_string(type_filter['version']) + if close_status_filter: + self._check_string(close_status_filter['status']) + self._check_int(maximum_page_size) + + workflow_executions = self.swf_backend.list_closed_workflow_executions( + domain_name=domain, + start_time_filter=start_time_filter, + close_time_filter=close_time_filter, + execution_filter=execution_filter, + tag_filter=tag_filter, + type_filter=type_filter, + maximum_page_size=maximum_page_size, + reverse_order=reverse_order, + workflow_id=workflow_id, + close_status_filter=close_status_filter + ) + + return json.dumps({ + 'executionInfos': [wfe.to_list_dict() for wfe in workflow_executions] + }) + def list_open_workflow_executions(self): domain = self._params['domain'] start_time_filter = self._params['startTimeFilter'] @@ -112,6 +168,9 @@ class SWFResponse(BaseResponse): self._check_exclusivity(executionFilter=execution_filter, typeFilter=type_filter, tagFilter=tag_filter) + self._check_int(start_time_filter['oldestDate']) + if 'latestDate' in start_time_filter: + self._check_int(start_time_filter['latestDate']) if tag_filter: self._check_string(tag_filter['tag']) if type_filter: @@ -339,6 +398,8 @@ class SWFResponse(BaseResponse): return json.dumps({"count": count, "truncated": False}) def respond_decision_task_completed(self): + + import ipdb; ipdb.set_trace() task_token = self._params["taskToken"] execution_context = self._params.get("executionContext") decisions = self._params.get("decisions") diff --git a/tests/test_swf/responses/test_workflow_executions.py b/tests/test_swf/responses/test_workflow_executions.py index 97958f100..d353eeb98 100644 --- a/tests/test_swf/responses/test_workflow_executions.py +++ b/tests/test_swf/responses/test_workflow_executions.py @@ -112,9 +112,18 @@ def test_get_workflow_execution_history_on_non_existent_workflow_execution(): @mock_swf def test_list_open_workflow_executions(): conn = setup_swf_environment() + # One open workflow execution conn.start_workflow_execution( 'test-domain', 'uid-abcd1234', 'test-workflow', 'v1.0' ) + # One closed workflow execution to make sure it isn't displayed + run_id = conn.start_workflow_execution( + 'test-domain', 'uid-abcd12345', 'test-workflow', 'v1.0' + )['runId'] + conn.terminate_workflow_execution('test-domain', 'uid-abcd12345', + details='some details', + reason='a more complete reason', + run_id=run_id) yesterday = datetime.now() - timedelta(days=1) oldest_date = iso_8601_datetime_with_milliseconds(yesterday) @@ -133,23 +142,39 @@ def test_list_open_workflow_executions(): open_workflow['executionStatus'].should.equal('OPEN') +# ListClosedWorkflowExecutions endpoint @mock_swf -def test_list_open_workflow_executions_does_not_show_closed(): +def test_list_closed_workflow_executions(): conn = setup_swf_environment() - run_id = conn.start_workflow_execution( + # Leave one workflow execution open to make sure it isn't displayed + conn.start_workflow_execution( 'test-domain', 'uid-abcd1234', 'test-workflow', 'v1.0' + ) + # One closed workflow execution + run_id = conn.start_workflow_execution( + 'test-domain', 'uid-abcd12345', 'test-workflow', 'v1.0' )['runId'] - conn.terminate_workflow_execution('test-domain', 'uid-abcd1234', + conn.terminate_workflow_execution('test-domain', 'uid-abcd12345', details='some details', reason='a more complete reason', run_id=run_id) yesterday = datetime.now() - timedelta(days=1) oldest_date = iso_8601_datetime_with_milliseconds(yesterday) - response = conn.list_open_workflow_executions('test-domain', - oldest_date, - workflow_id='test-workflow') - response['executionInfos'].should.be.empty + response = conn.list_closed_workflow_executions( + 'test-domain', + start_oldest_date=oldest_date, + workflow_id='test-workflow') + execution_infos = response['executionInfos'] + len(execution_infos).should.equal(1) + open_workflow = execution_infos[0] + open_workflow['workflowType'].should.equal({'version': 'v1.0', + 'name': 'test-workflow'}) + open_workflow.should.contain('startTimestamp') + open_workflow['execution']['workflowId'].should.equal('uid-abcd12345') + open_workflow['execution'].should.contain('runId') + open_workflow['cancelRequested'].should.be(False) + open_workflow['executionStatus'].should.equal('CLOSED') # TerminateWorkflowExecution endpoint