From 93120927f7e44f138e2f685baf0fff6225a601c1 Mon Sep 17 00:00:00 2001 From: Ian Fillion-de Kiewit Date: Thu, 4 Feb 2016 17:14:33 -0500 Subject: [PATCH] Add list_open_workflow_executions endpoint --- moto/swf/models/__init__.py | 15 +++++ moto/swf/models/workflow_execution.py | 21 +++++++ moto/swf/responses.py | 55 ++++++++++++++++++- .../models/test_workflow_execution.py | 21 +++++++ .../responses/test_workflow_executions.py | 47 ++++++++++++++++ 5 files changed, 158 insertions(+), 1 deletion(-) diff --git a/moto/swf/models/__init__.py b/moto/swf/models/__init__.py index 9f6e74eb9..729c1d135 100644 --- a/moto/swf/models/__init__.py +++ b/moto/swf/models/__init__.py @@ -61,6 +61,21 @@ class SWFBackend(BaseBackend): domains = reversed(domains) return domains + def list_open_workflow_executions(self, domain_name, start_time_filter, + maximum_page_size, reverse_order, + **kwargs): + self._process_timeouts() + domain = self._get_domain(domain_name) + if domain.status == "DEPRECATED": + raise SWFDomainDeprecatedFault(domain_name) + open_wfes = [ + wfe for wfe in domain.workflow_executions + if wfe.execution_status == 'OPEN' + ] + if reverse_order: + open_wfes = reversed(open_wfes) + return open_wfes[0:maximum_page_size] + def register_domain(self, name, workflow_execution_retention_period_in_days, description=None): if self._get_domain(name, ignore_empty=True): diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index b241debce..661250705 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -146,6 +146,27 @@ class WorkflowExecution(object): hsh["latestActivityTaskTimestamp"] = self.latest_activity_task_timestamp return hsh + def to_list_dict(self): + hsh = { + 'execution': { + 'workflowId': self.workflow_id, + 'runId': self.run_id, + }, + 'workflowType': self.workflow_type.to_short_dict(), + 'startTimestamp': self.start_timestamp, + 'executionStatus': self.execution_status, + 'cancelRequested': self.cancel_requested, + } + if self.tag_list: + hsh['tagList'] = self.tag_list + if self.parent: + hsh['parent'] = self.parent + if self.close_status: + hsh['closeStatus'] = self.close_status + if self.close_timestamp: + hsh['closeTimestamp'] = self.close_timestamp + return hsh + def _process_timeouts(self): """ SWF timeouts can happen on different objects (workflow executions, diff --git a/moto/swf/responses.py b/moto/swf/responses.py index 15942fa17..e2fb28636 100644 --- a/moto/swf/responses.py +++ b/moto/swf/responses.py @@ -3,7 +3,7 @@ import six from moto.core.responses import BaseResponse -from .exceptions import SWFSerializationException +from .exceptions import SWFSerializationException, SWFValidationException from .models import swf_backends @@ -18,6 +18,10 @@ class SWFResponse(BaseResponse): def _params(self): return json.loads(self.body.decode("utf-8")) + def _check_int(self, parameter): + if not isinstance(parameter, int): + raise SWFSerializationException(parameter) + def _check_none_or_string(self, parameter): if parameter is not None: self._check_string(parameter) @@ -37,6 +41,18 @@ class SWFResponse(BaseResponse): if not isinstance(i, six.string_types): raise SWFSerializationException(parameter) + def _check_exclusivity(self, **kwargs): + if kwargs.values().count(None) >= len(kwargs) - 1: + return + keys = kwargs.keys() + if len(keys) == 2: + message = 'Cannot specify both a {0} and a {1}'.format(keys[0], + keys[1]) + else: + message = 'Cannot specify more than one exclusive filters in the' \ + ' same query: {0}'.format(keys) + raise SWFValidationException(message) + def _list_types(self, kind): domain_name = self._params["domain"] status = self._params["registrationStatus"] @@ -81,6 +97,43 @@ class SWFResponse(BaseResponse): "domainInfos": [domain.to_short_dict() for domain in domains] }) + def list_open_workflow_executions(self): + domain = self._params['domain'] + start_time_filter = self._params['startTimeFilter'] + execution_filter = self._params.get('executionFilter', None) + workflow_id = execution_filter['workflowId'] if execution_filter else None + maximum_page_size = self._params.get('maximumPageSize', 1000) + reverse_order = self._params.get('reverseOrder', None) + tag_filter = self._params.get('tagFilter', None) + type_filter = self._params.get('typeFilter', None) + + self._check_string(domain) + self._check_none_or_string(workflow_id) + self._check_exclusivity(executionFilter=execution_filter, + typeFilter=type_filter, + tagFilter=tag_filter) + if tag_filter: + self._check_string(tag_filter['tag']) + if type_filter: + self._check_string(type_filter['name']) + self._check_string(type_filter['version']) + self._check_int(maximum_page_size) + + workflow_executions = self.swf_backend.list_open_workflow_executions( + domain_name=domain, + start_time_filter=start_time_filter, + execution_filter=execution_filter, + tag_filter=tag_filter, + type_filter=type_filter, + maximum_page_size=maximum_page_size, + reverse_order=reverse_order, + workflow_id=workflow_id + ) + + return json.dumps({ + 'executionInfos': [wfe.to_list_dict() for wfe in workflow_executions] + }) + def register_domain(self): name = self._params["name"] retention = self._params["workflowExecutionRetentionPeriodInDays"] diff --git a/tests/test_swf/models/test_workflow_execution.py b/tests/test_swf/models/test_workflow_execution.py index 8ef86de69..f6a69f8d7 100644 --- a/tests/test_swf/models/test_workflow_execution.py +++ b/tests/test_swf/models/test_workflow_execution.py @@ -1,4 +1,5 @@ from freezegun import freeze_time +import sure # noqa from moto.swf.models import ( ActivityType, @@ -141,6 +142,26 @@ def test_workflow_execution_full_dict_representation(): }) +def test_workflow_execution_list_dict_representation(): + domain = get_basic_domain() + wf_type = WorkflowType( + 'test-workflow', 'v1.0', + task_list='queue', default_child_policy='ABANDON', + default_execution_start_to_close_timeout='300', + default_task_start_to_close_timeout='300', + ) + wfe = WorkflowExecution(domain, wf_type, 'ab1234') + + ld = wfe.to_list_dict() + ld['workflowType']['version'].should.equal('v1.0') + ld['workflowType']['name'].should.equal('test-workflow') + ld['executionStatus'].should.equal('OPEN') + ld['execution']['workflowId'].should.equal('ab1234') + ld['execution'].should.contain('runId') + ld['cancelRequested'].should.be.false + ld.should.contain('startTimestamp') + + def test_workflow_execution_schedule_decision_task(): wfe = make_workflow_execution() wfe.open_counts["openDecisionTasks"].should.equal(0) diff --git a/tests/test_swf/responses/test_workflow_executions.py b/tests/test_swf/responses/test_workflow_executions.py index 07bd0f9cf..97958f100 100644 --- a/tests/test_swf/responses/test_workflow_executions.py +++ b/tests/test_swf/responses/test_workflow_executions.py @@ -1,10 +1,13 @@ import boto from boto.swf.exceptions import SWFResponseError +from datetime import datetime, timedelta +import sure # noqa # Ensure 'assert_raises' context manager support for Python 2.6 import tests.backport_assert_raises # noqa from moto import mock_swf +from moto.core.utils import iso_8601_datetime_with_milliseconds # Utils @@ -105,6 +108,50 @@ def test_get_workflow_execution_history_on_non_existent_workflow_execution(): ).should.throw(SWFResponseError) +# ListOpenWorkflowExecutions endpoint +@mock_swf +def test_list_open_workflow_executions(): + conn = setup_swf_environment() + conn.start_workflow_execution( + 'test-domain', 'uid-abcd1234', 'test-workflow', 'v1.0' + ) + + yesterday = datetime.now() - timedelta(days=1) + oldest_date = iso_8601_datetime_with_milliseconds(yesterday) + response = conn.list_open_workflow_executions('test-domain', + oldest_date, + workflow_id='test-workflow') + execution_infos = response['executionInfos'] + len(execution_infos).should.equal(1) + open_workflow = execution_infos[0] + open_workflow['workflowType'].should.equal({'version': 'v1.0', + 'name': 'test-workflow'}) + open_workflow.should.contain('startTimestamp') + open_workflow['execution']['workflowId'].should.equal('uid-abcd1234') + open_workflow['execution'].should.contain('runId') + open_workflow['cancelRequested'].should.be(False) + open_workflow['executionStatus'].should.equal('OPEN') + + +@mock_swf +def test_list_open_workflow_executions_does_not_show_closed(): + conn = setup_swf_environment() + run_id = conn.start_workflow_execution( + 'test-domain', 'uid-abcd1234', 'test-workflow', 'v1.0' + )['runId'] + conn.terminate_workflow_execution('test-domain', 'uid-abcd1234', + details='some details', + reason='a more complete reason', + run_id=run_id) + + yesterday = datetime.now() - timedelta(days=1) + oldest_date = iso_8601_datetime_with_milliseconds(yesterday) + response = conn.list_open_workflow_executions('test-domain', + oldest_date, + workflow_id='test-workflow') + response['executionInfos'].should.be.empty + + # TerminateWorkflowExecution endpoint @mock_swf def test_terminate_workflow_execution():