import json
import os
from functools import wraps
from time import sleep
from uuid import uuid4

import boto3
import pytest
import requests
from botocore.exceptions import ClientError

from moto import mock_aws
from tests.test_awslambda.utilities import (
    get_test_zip_file1,
)

from . import base_url, sfn_allow_lambda_invoke, sfn_role_policy

lambda_state_machine = {
    "Comment": "A simple AWS Step Functions state machine that calls two Lambda Functions",
    "StartAt": "Open Case",
    "States": {
        "Open Case": {"Type": "Task", "Resource": "TODO", "Next": "Send Email"},
        "Send Email": {"Type": "Task", "Resource": "TODO", "End": True},
    },
}


def aws_verified(func):
    """
    Function that is verified to work against AWS.
    Can be run against AWS at any time by setting:
      MOTO_TEST_ALLOW_AWS_REQUEST=true

    If this environment variable is not set, the function runs in a `mock_aws` context.

    This decorator will:
      - Create an IAM-role that can be used by AWSLambda functions table
      - Run the test
      - Delete the role
    """

    @wraps(func)
    def pagination_wrapper():
        role_name = "moto_test_role_" + str(uuid4())[0:6]

        allow_aws_request = (
            os.environ.get("MOTO_TEST_ALLOW_AWS_REQUEST", "false").lower() == "true"
        )

        if allow_aws_request:
            return create_role_and_test(role_name, sleep_time=10)
        else:
            with mock_aws():
                requests.post(
                    f"http://{base_url}/moto-api/config",
                    json={"stepfunctions": {"execute_state_machine": True}},
                )
                resp = create_role_and_test(role_name, sleep_time=0)
                requests.post(
                    f"http://{base_url}/moto-api/config",
                    json={"stepfunctions": {"execute_state_machine": False}},
                )
                return resp

    def create_role_and_test(role_name, sleep_time):
        policy_doc = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "LambdaAssumeRole",
                    "Effect": "Allow",
                    "Principal": {"Service": "lambda.amazonaws.com"},
                    "Action": "sts:AssumeRole",
                }
            ],
        }
        iam = boto3.client("iam", region_name="us-east-1")
        iam_role_arn = iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(policy_doc),
            Path="/",
        )["Role"]["Arn"]

        fn_name = f"fn_for_{role_name}"
        try:
            fn_arn = create_function(iam_role_arn, role_name, fn_name)

            resp = func(fn_name, fn_arn, sleep_time)
        finally:
            _lambda = boto3.client("lambda", "us-east-1")
            _lambda.delete_function(FunctionName=fn_name)

            iam.delete_role(RoleName=role_name)

        return resp

    def create_function(role_arn: str, role_name: str, fn_name: str):
        iam = boto3.client("iam", region_name="us-east-1")

        # The waiter is normally used to wait until a resource is ready
        wait_for_role = iam.get_waiter("role_exists")
        wait_for_role.wait(RoleName=role_name)

        # HOWEVER:
        # Just because the role exists, doesn't mean it can be used by Lambda
        # The only reliable way to check if our role is ready:
        #   - try to create a function and see if it succeeds
        #
        # The alternative is to wait between 5 and 10 seconds, which is not a great solution either
        _lambda = boto3.client("lambda", "us-east-1")
        for _ in range(15):
            try:
                fn = _lambda.create_function(
                    FunctionName=fn_name,
                    Runtime="python3.11",
                    Role=role_arn,
                    Handler="lambda_function.lambda_handler",
                    Code={"ZipFile": get_test_zip_file1()},
                )
                return fn["FunctionArn"]

            except ClientError:
                sleep(1)
        raise Exception(
            f"Couldn't create test Lambda FN using IAM role {role_name}, probably because it wasn't ready in time"
        )

    return pagination_wrapper


@aws_verified
@pytest.mark.aws_verified
def test_state_machine_calling_lambda_fn(fn_name=None, fn_arn=None, sleep_time=0):
    definition = lambda_state_machine.copy()
    definition["States"]["Open Case"]["Resource"] = fn_arn
    definition["States"]["Send Email"]["Resource"] = fn_arn

    iam = boto3.client("iam", region_name="us-east-1")
    role_name = f"sfn_role_{str(uuid4())[0:6]}"
    sfn_role = iam.create_role(
        RoleName=role_name,
        AssumeRolePolicyDocument=json.dumps(sfn_role_policy),
        Path="/",
    )["Role"]["Arn"]
    iam.put_role_policy(
        PolicyDocument=json.dumps(sfn_allow_lambda_invoke),
        PolicyName="allowLambdaInvoke",
        RoleName=role_name,
    )
    sleep(sleep_time)

    client = boto3.client("stepfunctions", region_name="us-east-1")
    name = "sfn_" + str(uuid4())[0:6]
    exec_input = {"my": "input"}
    #
    response = client.create_state_machine(
        name=name, definition=json.dumps(definition), roleArn=sfn_role
    )
    state_machine_arn = response["stateMachineArn"]

    execution = client.start_execution(
        name="exec1", stateMachineArn=state_machine_arn, input=json.dumps(exec_input)
    )
    execution_arn = execution["executionArn"]

    for _ in range(10):
        execution = client.describe_execution(executionArn=execution_arn)
        if execution["status"] == "SUCCEEDED":
            assert "stopDate" in execution
            assert json.loads(execution["input"]) == exec_input
            assert json.loads(execution["output"]) == exec_input

            history = client.get_execution_history(executionArn=execution_arn)
            assert len(history["events"]) == 12
            assert [e["type"] for e in history["events"]] == [
                "ExecutionStarted",
                "TaskStateEntered",
                "LambdaFunctionScheduled",
                "LambdaFunctionStarted",
                "LambdaFunctionSucceeded",
                "TaskStateExited",
                "TaskStateEntered",
                "LambdaFunctionScheduled",
                "LambdaFunctionStarted",
                "LambdaFunctionSucceeded",
                "TaskStateExited",
                "ExecutionSucceeded",
            ]

            client.delete_state_machine(stateMachineArn=state_machine_arn)
            iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke")
            iam.delete_role(RoleName=role_name)
            break
        sleep(1)
    else:
        client.delete_state_machine(stateMachineArn=state_machine_arn)
        iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke")
        iam.delete_role(RoleName=role_name)
        assert False, "Should have failed already"


@aws_verified
@pytest.mark.aws_verified
def test_state_machine_calling_failing_lambda_fn(
    fn_name=None, fn_arn=None, sleep_time=0
):
    definition = lambda_state_machine.copy()
    definition["States"]["Open Case"]["Resource"] = fn_arn
    definition["States"]["Send Email"]["Resource"] = fn_arn

    iam = boto3.client("iam", region_name="us-east-1")
    role_name = f"sfn_role_{str(uuid4())[0:6]}"
    sfn_role = iam.create_role(
        RoleName=role_name,
        AssumeRolePolicyDocument=json.dumps(sfn_role_policy),
        Path="/",
    )["Role"]["Arn"]
    iam.put_role_policy(
        PolicyDocument=json.dumps(sfn_allow_lambda_invoke),
        PolicyName="allowLambdaInvoke",
        RoleName=role_name,
    )
    sleep(sleep_time)

    client = boto3.client("stepfunctions", region_name="us-east-1")
    name = "sfn_" + str(uuid4())[0:6]
    exec_input = {"my": "input", "error": True}
    #
    response = client.create_state_machine(
        name=name, definition=json.dumps(definition), roleArn=sfn_role
    )
    state_machine_arn = response["stateMachineArn"]

    execution = client.start_execution(
        name="exec1", stateMachineArn=state_machine_arn, input=json.dumps(exec_input)
    )
    execution_arn = execution["executionArn"]

    for _ in range(10):
        execution = client.describe_execution(executionArn=execution_arn)
        if execution["status"] == "FAILED":
            assert json.loads(execution["cause"])["errorMessage"] == "I failed!"

            history = client.get_execution_history(executionArn=execution_arn)
            assert len(history["events"]) == 6
            assert [e["type"] for e in history["events"]] == [
                "ExecutionStarted",
                "TaskStateEntered",
                "LambdaFunctionScheduled",
                "LambdaFunctionStarted",
                "LambdaFunctionFailed",
                "ExecutionFailed",
            ]

            client.delete_state_machine(stateMachineArn=state_machine_arn)
            iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke")
            iam.delete_role(RoleName=role_name)
            break
        sleep(1)
    else:
        client.delete_state_machine(stateMachineArn=state_machine_arn)
        iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke")
        iam.delete_role(RoleName=role_name)
        assert False, "Should have failed already"