import json import os from functools import wraps from time import sleep from uuid import uuid4 import boto3 import pytest import requests from botocore.exceptions import ClientError from moto import mock_aws from tests.test_awslambda.utilities import ( get_test_zip_file1, ) from ...markers import requires_docker from . import base_url, sfn_allow_lambda_invoke, sfn_role_policy lambda_state_machine = { "Comment": "A simple AWS Step Functions state machine that calls two Lambda Functions", "StartAt": "Open Case", "States": { "Open Case": {"Type": "Task", "Resource": "TODO", "Next": "Send Email"}, "Send Email": {"Type": "Task", "Resource": "TODO", "End": True}, }, } def aws_verified(func): """ Function that is verified to work against AWS. Can be run against AWS at any time by setting: MOTO_TEST_ALLOW_AWS_REQUEST=true If this environment variable is not set, the function runs in a `mock_aws` context. This decorator will: - Create an IAM-role that can be used by AWSLambda functions table - Run the test - Delete the role """ @wraps(func) def pagination_wrapper(): role_name = "moto_test_role_" + str(uuid4())[0:6] allow_aws_request = ( os.environ.get("MOTO_TEST_ALLOW_AWS_REQUEST", "false").lower() == "true" ) if allow_aws_request: return create_role_and_test(role_name, sleep_time=10) else: with mock_aws(): requests.post( f"http://{base_url}/moto-api/config", json={"stepfunctions": {"execute_state_machine": True}}, ) resp = create_role_and_test(role_name, sleep_time=0) requests.post( f"http://{base_url}/moto-api/config", json={"stepfunctions": {"execute_state_machine": False}}, ) return resp def create_role_and_test(role_name, sleep_time): policy_doc = { "Version": "2012-10-17", "Statement": [ { "Sid": "LambdaAssumeRole", "Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } iam = boto3.client("iam", region_name="us-east-1") iam_role_arn = iam.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(policy_doc), Path="/", )["Role"]["Arn"] fn_name = f"fn_for_{role_name}" try: fn_arn = create_function(iam_role_arn, role_name, fn_name) resp = func(fn_name, fn_arn, sleep_time) finally: _lambda = boto3.client("lambda", "us-east-1") _lambda.delete_function(FunctionName=fn_name) iam.delete_role(RoleName=role_name) return resp def create_function(role_arn: str, role_name: str, fn_name: str): iam = boto3.client("iam", region_name="us-east-1") # The waiter is normally used to wait until a resource is ready wait_for_role = iam.get_waiter("role_exists") wait_for_role.wait(RoleName=role_name) # HOWEVER: # Just because the role exists, doesn't mean it can be used by Lambda # The only reliable way to check if our role is ready: # - try to create a function and see if it succeeds # # The alternative is to wait between 5 and 10 seconds, which is not a great solution either _lambda = boto3.client("lambda", "us-east-1") for _ in range(15): try: fn = _lambda.create_function( FunctionName=fn_name, Runtime="python3.11", Role=role_arn, Handler="lambda_function.lambda_handler", Code={"ZipFile": get_test_zip_file1()}, ) return fn["FunctionArn"] except ClientError: sleep(1) raise Exception( f"Couldn't create test Lambda FN using IAM role {role_name}, probably because it wasn't ready in time" ) return pagination_wrapper @aws_verified @pytest.mark.aws_verified @pytest.mark.network @requires_docker def test_state_machine_calling_lambda_fn(fn_name=None, fn_arn=None, sleep_time=0): definition = lambda_state_machine.copy() definition["States"]["Open Case"]["Resource"] = fn_arn definition["States"]["Send Email"]["Resource"] = fn_arn iam = boto3.client("iam", region_name="us-east-1") role_name = f"sfn_role_{str(uuid4())[0:6]}" sfn_role = iam.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(sfn_role_policy), Path="/", )["Role"]["Arn"] iam.put_role_policy( PolicyDocument=json.dumps(sfn_allow_lambda_invoke), PolicyName="allowLambdaInvoke", RoleName=role_name, ) sleep(sleep_time) client = boto3.client("stepfunctions", region_name="us-east-1") name = "sfn_" + str(uuid4())[0:6] exec_input = {"my": "input"} # response = client.create_state_machine( name=name, definition=json.dumps(definition), roleArn=sfn_role ) state_machine_arn = response["stateMachineArn"] execution = client.start_execution( name="exec1", stateMachineArn=state_machine_arn, input=json.dumps(exec_input) ) execution_arn = execution["executionArn"] for _ in range(10): execution = client.describe_execution(executionArn=execution_arn) if execution["status"] == "SUCCEEDED": assert "stopDate" in execution assert json.loads(execution["input"]) == exec_input assert json.loads(execution["output"]) == exec_input history = client.get_execution_history(executionArn=execution_arn) assert len(history["events"]) == 12 assert [e["type"] for e in history["events"]] == [ "ExecutionStarted", "TaskStateEntered", "LambdaFunctionScheduled", "LambdaFunctionStarted", "LambdaFunctionSucceeded", "TaskStateExited", "TaskStateEntered", "LambdaFunctionScheduled", "LambdaFunctionStarted", "LambdaFunctionSucceeded", "TaskStateExited", "ExecutionSucceeded", ] client.delete_state_machine(stateMachineArn=state_machine_arn) iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke") iam.delete_role(RoleName=role_name) break sleep(1) else: client.delete_state_machine(stateMachineArn=state_machine_arn) iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke") iam.delete_role(RoleName=role_name) assert False, "Should have failed already" @aws_verified @pytest.mark.aws_verified @pytest.mark.network @requires_docker def test_state_machine_calling_failing_lambda_fn( fn_name=None, fn_arn=None, sleep_time=0 ): definition = lambda_state_machine.copy() definition["States"]["Open Case"]["Resource"] = fn_arn definition["States"]["Send Email"]["Resource"] = fn_arn iam = boto3.client("iam", region_name="us-east-1") role_name = f"sfn_role_{str(uuid4())[0:6]}" sfn_role = iam.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(sfn_role_policy), Path="/", )["Role"]["Arn"] iam.put_role_policy( PolicyDocument=json.dumps(sfn_allow_lambda_invoke), PolicyName="allowLambdaInvoke", RoleName=role_name, ) sleep(sleep_time) client = boto3.client("stepfunctions", region_name="us-east-1") name = "sfn_" + str(uuid4())[0:6] exec_input = {"my": "input", "error": True} # response = client.create_state_machine( name=name, definition=json.dumps(definition), roleArn=sfn_role ) state_machine_arn = response["stateMachineArn"] execution = client.start_execution( name="exec1", stateMachineArn=state_machine_arn, input=json.dumps(exec_input) ) execution_arn = execution["executionArn"] for _ in range(10): execution = client.describe_execution(executionArn=execution_arn) if execution["status"] == "FAILED": assert json.loads(execution["cause"])["errorMessage"] == "I failed!" history = client.get_execution_history(executionArn=execution_arn) assert len(history["events"]) == 6 assert [e["type"] for e in history["events"]] == [ "ExecutionStarted", "TaskStateEntered", "LambdaFunctionScheduled", "LambdaFunctionStarted", "LambdaFunctionFailed", "ExecutionFailed", ] client.delete_state_machine(stateMachineArn=state_machine_arn) iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke") iam.delete_role(RoleName=role_name) break sleep(1) else: client.delete_state_machine(stateMachineArn=state_machine_arn) iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke") iam.delete_role(RoleName=role_name) assert False, "Should have failed already"