diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index 4ccc4e2dc..0a93716b1 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -8115,7 +8115,7 @@ - [X] describe_state_machine - [ ] describe_state_machine_for_execution - [ ] get_activity_task -- [ ] get_execution_history +- [X] get_execution_history - [ ] list_activities - [X] list_executions - [X] list_state_machines diff --git a/Makefile b/Makefile index 391a8efa0..b155b6f8e 100644 --- a/Makefile +++ b/Makefile @@ -20,12 +20,14 @@ lint: flake8 moto black --check moto/ tests/ +format: + black moto/ tests/ + test-only: rm -f .coverage rm -rf cover @pytest -sv --cov=moto --cov-report html ./tests/ $(TEST_EXCLUDE) - test: lint test-only test_server: diff --git a/moto/settings.py b/moto/settings.py index 707c61397..d3259e0ec 100644 --- a/moto/settings.py +++ b/moto/settings.py @@ -4,3 +4,12 @@ TEST_SERVER_MODE = os.environ.get("TEST_SERVER_MODE", "0").lower() == "true" INITIAL_NO_AUTH_ACTION_COUNT = float( os.environ.get("INITIAL_NO_AUTH_ACTION_COUNT", float("inf")) ) + + +def get_sf_execution_history_type(): + """ + Determines which execution history events `get_execution_history` returns + :returns: str representing the type of Step Function Execution Type events should be + returned. Default value is SUCCESS, currently supports (SUCCESS || FAILURE) + """ + return os.environ.get("SF_EXECUTION_HISTORY_TYPE", "SUCCESS") diff --git a/moto/stepfunctions/models.py b/moto/stepfunctions/models.py index 125e5d807..bc82670cd 100644 --- a/moto/stepfunctions/models.py +++ b/moto/stepfunctions/models.py @@ -1,6 +1,7 @@ import json import re from datetime import datetime +from dateutil.tz import tzlocal from boto3 import Session @@ -17,6 +18,7 @@ from .exceptions import ( StateMachineDoesNotExist, ) from .utils import paginate, api_to_cfn_tags, cfn_to_api_tags +from moto import settings class StateMachine(CloudFormationModel): @@ -27,10 +29,51 @@ class StateMachine(CloudFormationModel): self.name = name self.definition = definition self.roleArn = roleArn + self.executions = [] self.tags = [] if tags: self.add_tags(tags) + def start_execution(self, region_name, account_id, execution_name, execution_input): + self._ensure_execution_name_doesnt_exist(execution_name) + self._validate_execution_input(execution_input) + execution = Execution( + region_name=region_name, + account_id=account_id, + state_machine_name=self.name, + execution_name=execution_name, + state_machine_arn=self.arn, + execution_input=execution_input, + ) + self.executions.append(execution) + return execution + + def stop_execution(self, execution_arn): + execution = next( + (x for x in self.executions if x.execution_arn == execution_arn), None + ) + if not execution: + raise ExecutionDoesNotExist( + "Execution Does Not Exist: '" + execution_arn + "'" + ) + execution.stop() + return execution + + def _ensure_execution_name_doesnt_exist(self, name): + for execution in self.executions: + if execution.name == name: + raise ExecutionAlreadyExists( + "Execution Already Exists: '" + execution.execution_arn + "'" + ) + + def _validate_execution_input(self, execution_input): + try: + json.loads(execution_input) + except Exception as ex: + raise InvalidExecutionInput( + "Invalid State Machine Execution Input: '" + str(ex) + "'" + ) + def update(self, **kwargs): for key, value in kwargs.items(): if value is not None: @@ -176,6 +219,104 @@ class Execution: self.status = "RUNNING" self.stop_date = None + def get_execution_history(self, roleArn): + sf_execution_history_type = settings.get_sf_execution_history_type() + if sf_execution_history_type == "SUCCESS": + return [ + { + "timestamp": iso_8601_datetime_with_milliseconds( + datetime(2020, 1, 1, 0, 0, 0, tzinfo=tzlocal()) + ), + "type": "ExecutionStarted", + "id": 1, + "previousEventId": 0, + "executionStartedEventDetails": { + "input": "{}", + "inputDetails": {"truncated": False}, + "roleArn": roleArn, + }, + }, + { + "timestamp": iso_8601_datetime_with_milliseconds( + datetime(2020, 1, 1, 0, 0, 10, tzinfo=tzlocal()) + ), + "type": "PassStateEntered", + "id": 2, + "previousEventId": 0, + "stateEnteredEventDetails": { + "name": "A State", + "input": "{}", + "inputDetails": {"truncated": False}, + }, + }, + { + "timestamp": iso_8601_datetime_with_milliseconds( + datetime(2020, 1, 1, 0, 0, 10, tzinfo=tzlocal()) + ), + "type": "PassStateExited", + "id": 3, + "previousEventId": 2, + "stateExitedEventDetails": { + "name": "A State", + "output": "An output", + "outputDetails": {"truncated": False}, + }, + }, + { + "timestamp": iso_8601_datetime_with_milliseconds( + datetime(2020, 1, 1, 0, 0, 20, tzinfo=tzlocal()) + ), + "type": "ExecutionSucceeded", + "id": 4, + "previousEventId": 3, + "executionSucceededEventDetails": { + "output": "An output", + "outputDetails": {"truncated": False}, + }, + }, + ] + elif sf_execution_history_type == "FAILURE": + return [ + { + "timestamp": iso_8601_datetime_with_milliseconds( + datetime(2020, 1, 1, 0, 0, 0, tzinfo=tzlocal()) + ), + "type": "ExecutionStarted", + "id": 1, + "previousEventId": 0, + "executionStartedEventDetails": { + "input": "{}", + "inputDetails": {"truncated": False}, + "roleArn": roleArn, + }, + }, + { + "timestamp": iso_8601_datetime_with_milliseconds( + datetime(2020, 1, 1, 0, 0, 10, tzinfo=tzlocal()) + ), + "type": "FailStateEntered", + "id": 2, + "previousEventId": 0, + "stateEnteredEventDetails": { + "name": "A State", + "input": "{}", + "inputDetails": {"truncated": False}, + }, + }, + { + "timestamp": iso_8601_datetime_with_milliseconds( + datetime(2020, 1, 1, 0, 0, 10, tzinfo=tzlocal()) + ), + "type": "ExecutionFailed", + "id": 3, + "previousEventId": 2, + "executionFailedEventDetails": { + "error": "AnError", + "cause": "An error occurred!", + }, + }, + ] + def stop(self): self.status = "ABORTED" self.stop_date = iso_8601_datetime_with_milliseconds(datetime.now()) @@ -346,38 +487,23 @@ class StepFunctionBackend(BaseBackend): return sm def start_execution(self, state_machine_arn, name=None, execution_input=None): - state_machine_name = self.describe_state_machine(state_machine_arn).name - self._ensure_execution_name_doesnt_exist(name) - self._validate_execution_input(execution_input) - execution = Execution( + state_machine = self.describe_state_machine(state_machine_arn) + execution = state_machine.start_execution( region_name=self.region_name, account_id=self._get_account_id(), - state_machine_name=state_machine_name, execution_name=name or str(uuid4()), - state_machine_arn=state_machine_arn, execution_input=execution_input, ) - self.executions.append(execution) return execution def stop_execution(self, execution_arn): - execution = next( - (x for x in self.executions if x.execution_arn == execution_arn), None - ) - if not execution: - raise ExecutionDoesNotExist( - "Execution Does Not Exist: '" + execution_arn + "'" - ) - execution.stop() - return execution + self._validate_execution_arn(execution_arn) + state_machine = self._get_state_machine_for_execution(execution_arn) + return state_machine.stop_execution(execution_arn) @paginate def list_executions(self, state_machine_arn, status_filter=None): - executions = [ - execution - for execution in self.executions - if execution.state_machine_arn == state_machine_arn - ] + executions = self.describe_state_machine(state_machine_arn).executions if status_filter: executions = list(filter(lambda e: e.status == status_filter, executions)) @@ -385,13 +511,32 @@ class StepFunctionBackend(BaseBackend): executions = sorted(executions, key=lambda x: x.start_date, reverse=True) return executions - def describe_execution(self, arn): - self._validate_execution_arn(arn) - exctn = next((x for x in self.executions if x.execution_arn == arn), None) + def describe_execution(self, execution_arn): + self._validate_execution_arn(execution_arn) + state_machine = self._get_state_machine_for_execution(execution_arn) + exctn = next( + (x for x in state_machine.executions if x.execution_arn == execution_arn), + None, + ) if not exctn: - raise ExecutionDoesNotExist("Execution Does Not Exist: '" + arn + "'") + raise ExecutionDoesNotExist( + "Execution Does Not Exist: '" + execution_arn + "'" + ) return exctn + def get_execution_history(self, execution_arn): + self._validate_execution_arn(execution_arn) + state_machine = self._get_state_machine_for_execution(execution_arn) + execution = next( + (x for x in state_machine.executions if x.execution_arn == execution_arn), + None, + ) + if not execution: + raise ExecutionDoesNotExist( + "Execution Does Not Exist: '" + execution_arn + "'" + ) + return execution.get_execution_history(state_machine.roleArn) + def tag_resource(self, resource_arn, tags): try: state_machine = self.describe_state_machine(resource_arn) @@ -444,20 +589,18 @@ class StepFunctionBackend(BaseBackend): if not arn or not match: raise InvalidArn(invalid_msg) - def _ensure_execution_name_doesnt_exist(self, name): - for execution in self.executions: - if execution.name == name: - raise ExecutionAlreadyExists( - "Execution Already Exists: '" + execution.execution_arn + "'" - ) - - def _validate_execution_input(self, execution_input): - try: - json.loads(execution_input) - except Exception as ex: - raise InvalidExecutionInput( - "Invalid State Machine Execution Input: '" + str(ex) + "'" + def _get_state_machine_for_execution(self, execution_arn): + state_machine_name = execution_arn.split(":")[6] + state_machine_arn = next( + (x.arn for x in self.state_machines if x.name == state_machine_name), None + ) + if not state_machine_arn: + # Assume that if the state machine arn is not present, then neither will the + # execution + raise ExecutionDoesNotExist( + "Execution Does Not Exist: '" + execution_arn + "'" ) + return self.describe_state_machine(state_machine_arn) def _get_account_id(self): return ACCOUNT_ID diff --git a/moto/stepfunctions/responses.py b/moto/stepfunctions/responses.py index 7eae8091b..c34b8c433 100644 --- a/moto/stepfunctions/responses.py +++ b/moto/stepfunctions/responses.py @@ -208,6 +208,21 @@ class StepFunctionResponse(BaseResponse): @amzn_request_id def stop_execution(self): arn = self._get_param("executionArn") - execution = self.stepfunction_backend.stop_execution(arn) - response = {"stopDate": execution.stop_date} - return 200, {}, json.dumps(response) + try: + execution = self.stepfunction_backend.stop_execution(arn) + response = {"stopDate": execution.stop_date} + return 200, {}, json.dumps(response) + except AWSError as err: + return err.response() + + @amzn_request_id + def get_execution_history(self): + execution_arn = self._get_param("executionArn") + try: + execution_history = self.stepfunction_backend.get_execution_history( + execution_arn + ) + response = {"events": execution_history} + return 200, {}, json.dumps(response) + except AWSError as err: + return err.response() diff --git a/tests/test_stepfunctions/test_stepfunctions.py b/tests/test_stepfunctions/test_stepfunctions.py index 13a6809f5..16fd9cb5d 100644 --- a/tests/test_stepfunctions/test_stepfunctions.py +++ b/tests/test_stepfunctions/test_stepfunctions.py @@ -2,15 +2,23 @@ from __future__ import unicode_literals import boto3 import json +import os import sure # noqa - +import sys from datetime import datetime +from dateutil.tz import tzlocal from botocore.exceptions import ClientError import pytest from moto import mock_cloudformation, mock_sts, mock_stepfunctions from moto.core import ACCOUNT_ID +if sys.version_info[0] < 3: + import mock + from unittest import SkipTest +else: + from unittest import SkipTest, mock + region = "us-east-1" simple_definition = ( '{"Comment": "An example of the Amazon States Language using a choice state.",' @@ -799,10 +807,30 @@ def test_state_machine_stop_execution(): @mock_stepfunctions @mock_sts -def test_state_machine_describe_execution_after_stoppage(): - account_id +def test_state_machine_stop_raises_error_when_unknown_execution(): + client = boto3.client("stepfunctions", region_name=region) + client.create_state_machine( + name="test-state-machine", + definition=str(simple_definition), + roleArn=_get_default_role(), + ) + with pytest.raises(ClientError) as ex: + unknown_execution = ( + "arn:aws:states:" + + region + + ":" + + _get_account_id() + + ":execution:test-state-machine:unknown" + ) + client.stop_execution(executionArn=unknown_execution) + ex.value.response["Error"]["Code"].should.equal("ExecutionDoesNotExist") + ex.value.response["Error"]["Message"].should.contain("Execution Does Not Exist:") + + +@mock_stepfunctions +@mock_sts +def test_state_machine_describe_execution_after_stoppage(): client = boto3.client("stepfunctions", region_name=region) - # sm = client.create_state_machine( name="name", definition=str(simple_definition), roleArn=_get_default_role() ) @@ -815,6 +843,146 @@ def test_state_machine_describe_execution_after_stoppage(): description["stopDate"].should.be.a(datetime) +@mock_stepfunctions +@mock_sts +def test_state_machine_get_execution_history_throws_error_with_unknown_execution(): + client = boto3.client("stepfunctions", region_name=region) + client.create_state_machine( + name="test-state-machine", + definition=str(simple_definition), + roleArn=_get_default_role(), + ) + with pytest.raises(ClientError) as ex: + unknown_execution = ( + "arn:aws:states:" + + region + + ":" + + _get_account_id() + + ":execution:test-state-machine:unknown" + ) + client.get_execution_history(executionArn=unknown_execution) + ex.value.response["Error"]["Code"].should.equal("ExecutionDoesNotExist") + ex.value.response["Error"]["Message"].should.contain("Execution Does Not Exist:") + + +@mock_stepfunctions +@mock_sts +def test_state_machine_get_execution_history_contains_expected_success_events_when_started(): + expected_events = [ + { + "timestamp": datetime(2020, 1, 1, 0, 0, 0, tzinfo=tzlocal()), + "type": "ExecutionStarted", + "id": 1, + "previousEventId": 0, + "executionStartedEventDetails": { + "input": "{}", + "inputDetails": {"truncated": False}, + "roleArn": _get_default_role(), + }, + }, + { + "timestamp": datetime(2020, 1, 1, 0, 0, 10, tzinfo=tzlocal()), + "type": "PassStateEntered", + "id": 2, + "previousEventId": 0, + "stateEnteredEventDetails": { + "name": "A State", + "input": "{}", + "inputDetails": {"truncated": False}, + }, + }, + { + "timestamp": datetime(2020, 1, 1, 0, 0, 10, tzinfo=tzlocal()), + "type": "PassStateExited", + "id": 3, + "previousEventId": 2, + "stateExitedEventDetails": { + "name": "A State", + "output": "An output", + "outputDetails": {"truncated": False}, + }, + }, + { + "timestamp": datetime(2020, 1, 1, 0, 0, 20, tzinfo=tzlocal()), + "type": "ExecutionSucceeded", + "id": 4, + "previousEventId": 3, + "executionSucceededEventDetails": { + "output": "An output", + "outputDetails": {"truncated": False}, + }, + }, + ] + + client = boto3.client("stepfunctions", region_name=region) + sm = client.create_state_machine( + name="test-state-machine", + definition=simple_definition, + roleArn=_get_default_role(), + ) + execution = client.start_execution(stateMachineArn=sm["stateMachineArn"]) + execution_history = client.get_execution_history( + executionArn=execution["executionArn"] + ) + execution_history["events"].should.have.length_of(4) + execution_history["events"].should.equal(expected_events) + + +@mock_stepfunctions +@mock_sts +@mock.patch.dict(os.environ, {"SF_EXECUTION_HISTORY_TYPE": "FAILURE"}) +def test_state_machine_get_execution_history_contains_expected_failure_events_when_started(): + if os.environ.get("TEST_SERVER_MODE", "false").lower() == "true": + raise SkipTest("Cant pass environment variable in server mode") + expected_events = [ + { + "timestamp": datetime(2020, 1, 1, 0, 0, 0, tzinfo=tzlocal()), + "type": "ExecutionStarted", + "id": 1, + "previousEventId": 0, + "executionStartedEventDetails": { + "input": "{}", + "inputDetails": {"truncated": False}, + "roleArn": _get_default_role(), + }, + }, + { + "timestamp": datetime(2020, 1, 1, 0, 0, 10, tzinfo=tzlocal()), + "type": "FailStateEntered", + "id": 2, + "previousEventId": 0, + "stateEnteredEventDetails": { + "name": "A State", + "input": "{}", + "inputDetails": {"truncated": False}, + }, + }, + { + "timestamp": datetime(2020, 1, 1, 0, 0, 10, tzinfo=tzlocal()), + "type": "ExecutionFailed", + "id": 3, + "previousEventId": 2, + "executionFailedEventDetails": { + "error": "AnError", + "cause": "An error occurred!", + }, + }, + ] + + client = boto3.client("stepfunctions", region_name=region) + sm = client.create_state_machine( + name="test-state-machine", + definition=simple_definition, + roleArn=_get_default_role(), + ) + execution = client.start_execution(stateMachineArn=sm["stateMachineArn"]) + execution_history = client.get_execution_history( + executionArn=execution["executionArn"] + ) + execution_history["events"].should.have.length_of(3) + execution_history["events"].should.equal(expected_events) + + @mock_stepfunctions @mock_cloudformation def test_state_machine_cloudformation():