from __future__ import unicode_literals

import base64
import uuid
import botocore.client
import boto3
import hashlib
import io
import json
import time
import zipfile
import sure  # noqa

from freezegun import freeze_time
from moto import (
    mock_dynamodb2,
    mock_lambda,
    mock_iam,
    mock_s3,
    mock_ec2,
    mock_sns,
    mock_logs,
    settings,
    mock_sqs,
)
from moto.sts.models import ACCOUNT_ID
from nose.tools import assert_raises
from botocore.exceptions import ClientError

_lambda_region = "us-west-2"
boto3.setup_default_session(region_name=_lambda_region)


def _process_lambda(func_str):
    zip_output = io.BytesIO()
    zip_file = zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED)
    zip_file.writestr("lambda_function.py", func_str)
    zip_file.close()
    zip_output.seek(0)
    return zip_output.read()


def get_test_zip_file1():
    pfunc = """
def lambda_handler(event, context):
    return event
"""
    return _process_lambda(pfunc)


def get_test_zip_file2():
    func_str = """
import boto3

def lambda_handler(event, context):
    ec2 = boto3.resource('ec2', region_name='us-west-2', endpoint_url='http://{base_url}')

    volume_id = event.get('volume_id')
    vol = ec2.Volume(volume_id)

    print('get volume details for %s\\nVolume - %s  state=%s, size=%s' % (volume_id, volume_id, vol.state, vol.size))
    return event
""".format(
        base_url="motoserver:5000"
        if settings.TEST_SERVER_MODE
        else "ec2.us-west-2.amazonaws.com"
    )
    return _process_lambda(func_str)


def get_test_zip_file3():
    pfunc = """
def lambda_handler(event, context):
    print("get_test_zip_file3 success")
    return event
"""
    return _process_lambda(pfunc)


def get_test_zip_file4():
    pfunc = """
def lambda_handler(event, context):    
    raise Exception('I failed!')
"""
    return _process_lambda(pfunc)


@mock_lambda
def test_list_functions():
    conn = boto3.client("lambda", "us-west-2")
    result = conn.list_functions()
    result["Functions"].should.have.length_of(0)


@mock_lambda
def test_invoke_requestresponse_function():
    conn = boto3.client("lambda", "us-west-2")
    conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": get_test_zip_file1()},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    in_data = {"msg": "So long and thanks for all the fish"}
    success_result = conn.invoke(
        FunctionName="testFunction",
        InvocationType="RequestResponse",
        Payload=json.dumps(in_data),
    )

    success_result["StatusCode"].should.equal(202)
    result_obj = json.loads(
        base64.b64decode(success_result["LogResult"]).decode("utf-8")
    )

    result_obj.should.equal(in_data)

    payload = success_result["Payload"].read().decode("utf-8")
    json.loads(payload).should.equal(in_data)


@mock_lambda
def test_invoke_event_function():
    conn = boto3.client("lambda", "us-west-2")
    conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": get_test_zip_file1()},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    conn.invoke.when.called_with(
        FunctionName="notAFunction", InvocationType="Event", Payload="{}"
    ).should.throw(botocore.client.ClientError)

    in_data = {"msg": "So long and thanks for all the fish"}
    success_result = conn.invoke(
        FunctionName="testFunction", InvocationType="Event", Payload=json.dumps(in_data)
    )
    success_result["StatusCode"].should.equal(202)
    json.loads(success_result["Payload"].read().decode("utf-8")).should.equal({})


if settings.TEST_SERVER_MODE:

    @mock_ec2
    @mock_lambda
    def test_invoke_function_get_ec2_volume():
        conn = boto3.resource("ec2", "us-west-2")
        vol = conn.create_volume(Size=99, AvailabilityZone="us-west-2")
        vol = conn.Volume(vol.id)

        conn = boto3.client("lambda", "us-west-2")
        conn.create_function(
            FunctionName="testFunction",
            Runtime="python3.7",
            Role=get_role_name(),
            Handler="lambda_function.lambda_handler",
            Code={"ZipFile": get_test_zip_file2()},
            Description="test lambda function",
            Timeout=3,
            MemorySize=128,
            Publish=True,
        )

        in_data = {"volume_id": vol.id}
        result = conn.invoke(
            FunctionName="testFunction",
            InvocationType="RequestResponse",
            Payload=json.dumps(in_data),
        )
        result["StatusCode"].should.equal(202)
        msg = "get volume details for %s\nVolume - %s  state=%s, size=%s\n%s" % (
            vol.id,
            vol.id,
            vol.state,
            vol.size,
            json.dumps(in_data).replace(
                " ", ""
            ),  # Makes the tests pass as the result is missing the whitespace
        )

        log_result = base64.b64decode(result["LogResult"]).decode("utf-8")

        # The Docker lambda invocation will return an additional '\n', so need to replace it:
        log_result = log_result.replace("\n\n", "\n")
        log_result.should.equal(msg)

        payload = result["Payload"].read().decode("utf-8")

        # The Docker lambda invocation will return an additional '\n', so need to replace it:
        payload = payload.replace("\n\n", "\n")
        payload.should.equal(msg)


@mock_logs
@mock_sns
@mock_ec2
@mock_lambda
def test_invoke_function_from_sns():
    logs_conn = boto3.client("logs", region_name="us-west-2")
    sns_conn = boto3.client("sns", region_name="us-west-2")
    sns_conn.create_topic(Name="some-topic")
    topics_json = sns_conn.list_topics()
    topics = topics_json["Topics"]
    topic_arn = topics[0]["TopicArn"]

    conn = boto3.client("lambda", "us-west-2")
    result = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": get_test_zip_file3()},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    sns_conn.subscribe(
        TopicArn=topic_arn, Protocol="lambda", Endpoint=result["FunctionArn"]
    )

    result = sns_conn.publish(TopicArn=topic_arn, Message=json.dumps({}))

    start = time.time()
    while (time.time() - start) < 30:
        result = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction")
        log_streams = result.get("logStreams")
        if not log_streams:
            time.sleep(1)
            continue

        assert len(log_streams) == 1
        result = logs_conn.get_log_events(
            logGroupName="/aws/lambda/testFunction",
            logStreamName=log_streams[0]["logStreamName"],
        )
        for event in result.get("events"):
            if event["message"] == "get_test_zip_file3 success":
                return

        time.sleep(1)

    assert False, "Test Failed"


@mock_lambda
def test_create_based_on_s3_with_missing_bucket():
    conn = boto3.client("lambda", "us-west-2")

    conn.create_function.when.called_with(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"S3Bucket": "this-bucket-does-not-exist", "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
        VpcConfig={"SecurityGroupIds": ["sg-123abc"], "SubnetIds": ["subnet-123abc"]},
    ).should.throw(botocore.client.ClientError)


@mock_lambda
@mock_s3
@freeze_time("2015-01-01 00:00:00")
def test_create_function_from_aws_bucket():
    s3_conn = boto3.client("s3", "us-west-2")
    s3_conn.create_bucket(Bucket="test-bucket")
    zip_content = get_test_zip_file2()

    s3_conn.put_object(Bucket="test-bucket", Key="test.zip", Body=zip_content)
    conn = boto3.client("lambda", "us-west-2")

    result = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
        VpcConfig={"SecurityGroupIds": ["sg-123abc"], "SubnetIds": ["subnet-123abc"]},
    )
    # this is hard to match against, so remove it
    result["ResponseMetadata"].pop("HTTPHeaders", None)
    # Botocore inserts retry attempts not seen in Python27
    result["ResponseMetadata"].pop("RetryAttempts", None)
    result.pop("LastModified")
    result.should.equal(
        {
            "FunctionName": "testFunction",
            "FunctionArn": "arn:aws:lambda:{}:123456789012:function:testFunction".format(
                _lambda_region
            ),
            "Runtime": "python2.7",
            "Role": result["Role"],
            "Handler": "lambda_function.lambda_handler",
            "CodeSha256": hashlib.sha256(zip_content).hexdigest(),
            "CodeSize": len(zip_content),
            "Description": "test lambda function",
            "Timeout": 3,
            "MemorySize": 128,
            "Version": "1",
            "VpcConfig": {
                "SecurityGroupIds": ["sg-123abc"],
                "SubnetIds": ["subnet-123abc"],
                "VpcId": "vpc-123abc",
            },
            "ResponseMetadata": {"HTTPStatusCode": 201},
        }
    )


@mock_lambda
@freeze_time("2015-01-01 00:00:00")
def test_create_function_from_zipfile():
    conn = boto3.client("lambda", "us-west-2")
    zip_content = get_test_zip_file1()
    result = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": zip_content},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )
    # this is hard to match against, so remove it
    result["ResponseMetadata"].pop("HTTPHeaders", None)
    # Botocore inserts retry attempts not seen in Python27
    result["ResponseMetadata"].pop("RetryAttempts", None)
    result.pop("LastModified")

    result.should.equal(
        {
            "FunctionName": "testFunction",
            "FunctionArn": "arn:aws:lambda:{}:123456789012:function:testFunction".format(
                _lambda_region
            ),
            "Runtime": "python2.7",
            "Role": result["Role"],
            "Handler": "lambda_function.lambda_handler",
            "CodeSize": len(zip_content),
            "Description": "test lambda function",
            "Timeout": 3,
            "MemorySize": 128,
            "CodeSha256": hashlib.sha256(zip_content).hexdigest(),
            "Version": "1",
            "VpcConfig": {"SecurityGroupIds": [], "SubnetIds": []},
            "ResponseMetadata": {"HTTPStatusCode": 201},
        }
    )


@mock_lambda
@mock_s3
@freeze_time("2015-01-01 00:00:00")
def test_get_function():
    s3_conn = boto3.client("s3", "us-west-2")
    s3_conn.create_bucket(Bucket="test-bucket")

    zip_content = get_test_zip_file1()
    s3_conn.put_object(Bucket="test-bucket", Key="test.zip", Body=zip_content)
    conn = boto3.client("lambda", "us-west-2")

    conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
        Environment={"Variables": {"test_variable": "test_value"}},
    )

    result = conn.get_function(FunctionName="testFunction")
    # this is hard to match against, so remove it
    result["ResponseMetadata"].pop("HTTPHeaders", None)
    # Botocore inserts retry attempts not seen in Python27
    result["ResponseMetadata"].pop("RetryAttempts", None)
    result["Configuration"].pop("LastModified")

    result["Code"]["Location"].should.equal(
        "s3://awslambda-{0}-tasks.s3-{0}.amazonaws.com/test.zip".format(_lambda_region)
    )
    result["Code"]["RepositoryType"].should.equal("S3")

    result["Configuration"]["CodeSha256"].should.equal(
        hashlib.sha256(zip_content).hexdigest()
    )
    result["Configuration"]["CodeSize"].should.equal(len(zip_content))
    result["Configuration"]["Description"].should.equal("test lambda function")
    result["Configuration"].should.contain("FunctionArn")
    result["Configuration"]["FunctionName"].should.equal("testFunction")
    result["Configuration"]["Handler"].should.equal("lambda_function.lambda_handler")
    result["Configuration"]["MemorySize"].should.equal(128)
    result["Configuration"]["Role"].should.equal(get_role_name())
    result["Configuration"]["Runtime"].should.equal("python2.7")
    result["Configuration"]["Timeout"].should.equal(3)
    result["Configuration"]["Version"].should.equal("$LATEST")
    result["Configuration"].should.contain("VpcConfig")
    result["Configuration"].should.contain("Environment")
    result["Configuration"]["Environment"].should.contain("Variables")
    result["Configuration"]["Environment"]["Variables"].should.equal(
        {"test_variable": "test_value"}
    )

    # Test get function with
    result = conn.get_function(FunctionName="testFunction", Qualifier="$LATEST")
    result["Configuration"]["Version"].should.equal("$LATEST")
    result["Configuration"]["FunctionArn"].should.equal(
        "arn:aws:lambda:us-west-2:123456789012:function:testFunction:$LATEST"
    )

    # Test get function when can't find function name
    with assert_raises(ClientError):
        conn.get_function(FunctionName="junk", Qualifier="$LATEST")


@mock_lambda
@mock_s3
def test_get_function_by_arn():
    bucket_name = "test-bucket"
    s3_conn = boto3.client("s3", "us-east-1")
    s3_conn.create_bucket(Bucket=bucket_name)

    zip_content = get_test_zip_file2()
    s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
    conn = boto3.client("lambda", "us-east-1")

    fnc = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    result = conn.get_function(FunctionName=fnc["FunctionArn"])
    result["Configuration"]["FunctionName"].should.equal("testFunction")


@mock_lambda
@mock_s3
def test_delete_function():
    s3_conn = boto3.client("s3", "us-west-2")
    s3_conn.create_bucket(Bucket="test-bucket")

    zip_content = get_test_zip_file2()
    s3_conn.put_object(Bucket="test-bucket", Key="test.zip", Body=zip_content)
    conn = boto3.client("lambda", "us-west-2")

    conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    success_result = conn.delete_function(FunctionName="testFunction")
    # this is hard to match against, so remove it
    success_result["ResponseMetadata"].pop("HTTPHeaders", None)
    # Botocore inserts retry attempts not seen in Python27
    success_result["ResponseMetadata"].pop("RetryAttempts", None)

    success_result.should.equal({"ResponseMetadata": {"HTTPStatusCode": 204}})

    function_list = conn.list_functions()
    function_list["Functions"].should.have.length_of(0)


@mock_lambda
@mock_s3
def test_delete_function_by_arn():
    bucket_name = "test-bucket"
    s3_conn = boto3.client("s3", "us-east-1")
    s3_conn.create_bucket(Bucket=bucket_name)

    zip_content = get_test_zip_file2()
    s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
    conn = boto3.client("lambda", "us-east-1")

    fnc = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    conn.delete_function(FunctionName=fnc["FunctionArn"])
    function_list = conn.list_functions()
    function_list["Functions"].should.have.length_of(0)


@mock_lambda
def test_delete_unknown_function():
    conn = boto3.client("lambda", "us-west-2")
    conn.delete_function.when.called_with(
        FunctionName="testFunctionThatDoesntExist"
    ).should.throw(botocore.client.ClientError)


@mock_lambda
@mock_s3
def test_publish():
    s3_conn = boto3.client("s3", "us-west-2")
    s3_conn.create_bucket(Bucket="test-bucket")

    zip_content = get_test_zip_file2()
    s3_conn.put_object(Bucket="test-bucket", Key="test.zip", Body=zip_content)
    conn = boto3.client("lambda", "us-west-2")

    conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=False,
    )

    function_list = conn.list_functions()
    function_list["Functions"].should.have.length_of(1)
    latest_arn = function_list["Functions"][0]["FunctionArn"]

    res = conn.publish_version(FunctionName="testFunction")
    assert res["ResponseMetadata"]["HTTPStatusCode"] == 201

    function_list = conn.list_functions()
    function_list["Functions"].should.have.length_of(2)

    # #SetComprehension ;-)
    published_arn = list(
        {f["FunctionArn"] for f in function_list["Functions"]} - {latest_arn}
    )[0]
    published_arn.should.contain("testFunction:1")

    conn.delete_function(FunctionName="testFunction", Qualifier="1")

    function_list = conn.list_functions()
    function_list["Functions"].should.have.length_of(1)
    function_list["Functions"][0]["FunctionArn"].should.contain("testFunction")


@mock_lambda
@mock_s3
@freeze_time("2015-01-01 00:00:00")
def test_list_create_list_get_delete_list():
    """
    test `list -> create -> list -> get -> delete -> list` integration

    """
    s3_conn = boto3.client("s3", "us-west-2")
    s3_conn.create_bucket(Bucket="test-bucket")

    zip_content = get_test_zip_file2()
    s3_conn.put_object(Bucket="test-bucket", Key="test.zip", Body=zip_content)
    conn = boto3.client("lambda", "us-west-2")

    conn.list_functions()["Functions"].should.have.length_of(0)

    conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )
    expected_function_result = {
        "Code": {
            "Location": "s3://awslambda-{0}-tasks.s3-{0}.amazonaws.com/test.zip".format(
                _lambda_region
            ),
            "RepositoryType": "S3",
        },
        "Configuration": {
            "CodeSha256": hashlib.sha256(zip_content).hexdigest(),
            "CodeSize": len(zip_content),
            "Description": "test lambda function",
            "FunctionArn": "arn:aws:lambda:{}:123456789012:function:testFunction".format(
                _lambda_region
            ),
            "FunctionName": "testFunction",
            "Handler": "lambda_function.lambda_handler",
            "MemorySize": 128,
            "Role": get_role_name(),
            "Runtime": "python2.7",
            "Timeout": 3,
            "Version": "$LATEST",
            "VpcConfig": {"SecurityGroupIds": [], "SubnetIds": []},
        },
        "ResponseMetadata": {"HTTPStatusCode": 200},
    }
    func = conn.list_functions()["Functions"][0]
    func.pop("LastModified")
    func.should.equal(expected_function_result["Configuration"])

    func = conn.get_function(FunctionName="testFunction")
    # this is hard to match against, so remove it
    func["ResponseMetadata"].pop("HTTPHeaders", None)
    # Botocore inserts retry attempts not seen in Python27
    func["ResponseMetadata"].pop("RetryAttempts", None)
    func["Configuration"].pop("LastModified")

    func.should.equal(expected_function_result)
    conn.delete_function(FunctionName="testFunction")

    conn.list_functions()["Functions"].should.have.length_of(0)


@mock_lambda
def test_invoke_lambda_error():
    lambda_fx = """
def lambda_handler(event, context):
    raise Exception('failsauce')
    """
    zip_output = io.BytesIO()
    zip_file = zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED)
    zip_file.writestr("lambda_function.py", lambda_fx)
    zip_file.close()
    zip_output.seek(0)

    client = boto3.client("lambda", region_name="us-east-1")
    client.create_function(
        FunctionName="test-lambda-fx",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
        Code={"ZipFile": zip_output.read()},
    )

    result = client.invoke(
        FunctionName="test-lambda-fx", InvocationType="RequestResponse", LogType="Tail"
    )

    assert "FunctionError" in result
    assert result["FunctionError"] == "Handled"


@mock_lambda
@mock_s3
def test_tags():
    """
    test list_tags -> tag_resource -> list_tags -> tag_resource -> list_tags -> untag_resource -> list_tags integration
    """
    s3_conn = boto3.client("s3", "us-west-2")
    s3_conn.create_bucket(Bucket="test-bucket")

    zip_content = get_test_zip_file2()
    s3_conn.put_object(Bucket="test-bucket", Key="test.zip", Body=zip_content)
    conn = boto3.client("lambda", "us-west-2")

    function = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.handler",
        Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    # List tags when there are none
    conn.list_tags(Resource=function["FunctionArn"])["Tags"].should.equal(dict())

    # List tags when there is one
    conn.tag_resource(Resource=function["FunctionArn"], Tags=dict(spam="eggs"))[
        "ResponseMetadata"
    ]["HTTPStatusCode"].should.equal(200)
    conn.list_tags(Resource=function["FunctionArn"])["Tags"].should.equal(
        dict(spam="eggs")
    )

    # List tags when another has been added
    conn.tag_resource(Resource=function["FunctionArn"], Tags=dict(foo="bar"))[
        "ResponseMetadata"
    ]["HTTPStatusCode"].should.equal(200)
    conn.list_tags(Resource=function["FunctionArn"])["Tags"].should.equal(
        dict(spam="eggs", foo="bar")
    )

    # Untag resource
    conn.untag_resource(Resource=function["FunctionArn"], TagKeys=["spam", "trolls"])[
        "ResponseMetadata"
    ]["HTTPStatusCode"].should.equal(204)
    conn.list_tags(Resource=function["FunctionArn"])["Tags"].should.equal(
        dict(foo="bar")
    )

    # Untag a tag that does not exist (no error and no change)
    conn.untag_resource(Resource=function["FunctionArn"], TagKeys=["spam"])[
        "ResponseMetadata"
    ]["HTTPStatusCode"].should.equal(204)


@mock_lambda
def test_tags_not_found():
    """
    Test list_tags and tag_resource when the lambda with the given arn does not exist
    """
    conn = boto3.client("lambda", "us-west-2")
    conn.list_tags.when.called_with(
        Resource="arn:aws:lambda:123456789012:function:not-found"
    ).should.throw(botocore.client.ClientError)

    conn.tag_resource.when.called_with(
        Resource="arn:aws:lambda:123456789012:function:not-found",
        Tags=dict(spam="eggs"),
    ).should.throw(botocore.client.ClientError)

    conn.untag_resource.when.called_with(
        Resource="arn:aws:lambda:123456789012:function:not-found", TagKeys=["spam"]
    ).should.throw(botocore.client.ClientError)


@mock_lambda
def test_invoke_async_function():
    conn = boto3.client("lambda", "us-west-2")
    conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": get_test_zip_file1()},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    success_result = conn.invoke_async(
        FunctionName="testFunction", InvokeArgs=json.dumps({"test": "event"})
    )

    success_result["Status"].should.equal(202)


@mock_lambda
@freeze_time("2015-01-01 00:00:00")
def test_get_function_created_with_zipfile():
    conn = boto3.client("lambda", "us-west-2")
    zip_content = get_test_zip_file1()
    result = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.handler",
        Code={"ZipFile": zip_content},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    response = conn.get_function(FunctionName="testFunction")
    response["Configuration"].pop("LastModified")

    response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
    assert len(response["Code"]) == 2
    assert response["Code"]["RepositoryType"] == "S3"
    assert response["Code"]["Location"].startswith(
        "s3://awslambda-{0}-tasks.s3-{0}.amazonaws.com".format(_lambda_region)
    )
    response["Configuration"].should.equal(
        {
            "CodeSha256": hashlib.sha256(zip_content).hexdigest(),
            "CodeSize": len(zip_content),
            "Description": "test lambda function",
            "FunctionArn": "arn:aws:lambda:{}:123456789012:function:testFunction".format(
                _lambda_region
            ),
            "FunctionName": "testFunction",
            "Handler": "lambda_function.handler",
            "MemorySize": 128,
            "Role": get_role_name(),
            "Runtime": "python2.7",
            "Timeout": 3,
            "Version": "$LATEST",
            "VpcConfig": {"SecurityGroupIds": [], "SubnetIds": []},
        }
    )


@mock_lambda
def test_add_function_permission():
    conn = boto3.client("lambda", "us-west-2")
    zip_content = get_test_zip_file1()
    conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=(get_role_name()),
        Handler="lambda_function.handler",
        Code={"ZipFile": zip_content},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    response = conn.add_permission(
        FunctionName="testFunction",
        StatementId="1",
        Action="lambda:InvokeFunction",
        Principal="432143214321",
        SourceArn="arn:aws:lambda:us-west-2:account-id:function:helloworld",
        SourceAccount="123412341234",
        EventSourceToken="blah",
        Qualifier="2",
    )
    assert "Statement" in response
    res = json.loads(response["Statement"])
    assert res["Action"] == "lambda:InvokeFunction"


@mock_lambda
def test_get_function_policy():
    conn = boto3.client("lambda", "us-west-2")
    zip_content = get_test_zip_file1()
    conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.handler",
        Code={"ZipFile": zip_content},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    response = conn.add_permission(
        FunctionName="testFunction",
        StatementId="1",
        Action="lambda:InvokeFunction",
        Principal="432143214321",
        SourceArn="arn:aws:lambda:us-west-2:account-id:function:helloworld",
        SourceAccount="123412341234",
        EventSourceToken="blah",
        Qualifier="2",
    )

    response = conn.get_policy(FunctionName="testFunction")

    assert "Policy" in response
    res = json.loads(response["Policy"])
    assert res["Statement"][0]["Action"] == "lambda:InvokeFunction"


@mock_lambda
@mock_s3
def test_list_versions_by_function():
    s3_conn = boto3.client("s3", "us-west-2")
    s3_conn.create_bucket(Bucket="test-bucket")

    zip_content = get_test_zip_file2()
    s3_conn.put_object(Bucket="test-bucket", Key="test.zip", Body=zip_content)
    conn = boto3.client("lambda", "us-west-2")

    conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    res = conn.publish_version(FunctionName="testFunction")
    assert res["ResponseMetadata"]["HTTPStatusCode"] == 201
    versions = conn.list_versions_by_function(FunctionName="testFunction")
    assert len(versions["Versions"]) == 3
    assert (
        versions["Versions"][0]["FunctionArn"]
        == "arn:aws:lambda:us-west-2:123456789012:function:testFunction:$LATEST"
    )
    assert (
        versions["Versions"][1]["FunctionArn"]
        == "arn:aws:lambda:us-west-2:123456789012:function:testFunction:1"
    )
    assert (
        versions["Versions"][2]["FunctionArn"]
        == "arn:aws:lambda:us-west-2:123456789012:function:testFunction:2"
    )

    conn.create_function(
        FunctionName="testFunction_2",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=False,
    )
    versions = conn.list_versions_by_function(FunctionName="testFunction_2")
    assert len(versions["Versions"]) == 1
    assert (
        versions["Versions"][0]["FunctionArn"]
        == "arn:aws:lambda:us-west-2:123456789012:function:testFunction_2:$LATEST"
    )


@mock_lambda
@mock_s3
def test_create_function_with_already_exists():
    s3_conn = boto3.client("s3", "us-west-2")
    s3_conn.create_bucket(Bucket="test-bucket")

    zip_content = get_test_zip_file2()
    s3_conn.put_object(Bucket="test-bucket", Key="test.zip", Body=zip_content)
    conn = boto3.client("lambda", "us-west-2")

    conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    response = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    assert response["FunctionName"] == "testFunction"


@mock_lambda
@mock_s3
def test_list_versions_by_function_for_nonexistent_function():
    conn = boto3.client("lambda", "us-west-2")
    versions = conn.list_versions_by_function(FunctionName="testFunction")

    assert len(versions["Versions"]) == 0


@mock_logs
@mock_lambda
@mock_sqs
def test_create_event_source_mapping():
    sqs = boto3.resource("sqs", region_name="us-east-1")
    queue = sqs.create_queue(QueueName="test-sqs-queue1")

    conn = boto3.client("lambda", region_name="us-east-1")
    func = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": get_test_zip_file3()},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    response = conn.create_event_source_mapping(
        EventSourceArn=queue.attributes["QueueArn"], FunctionName=func["FunctionArn"]
    )

    assert response["EventSourceArn"] == queue.attributes["QueueArn"]
    assert response["FunctionArn"] == func["FunctionArn"]
    assert response["State"] == "Enabled"


@mock_logs
@mock_lambda
@mock_sqs
def test_invoke_function_from_sqs():
    logs_conn = boto3.client("logs", region_name="us-east-1")
    sqs = boto3.resource("sqs", region_name="us-east-1")
    queue = sqs.create_queue(QueueName="test-sqs-queue1")

    conn = boto3.client("lambda", region_name="us-east-1")
    func = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": get_test_zip_file3()},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    response = conn.create_event_source_mapping(
        EventSourceArn=queue.attributes["QueueArn"], FunctionName=func["FunctionArn"]
    )

    assert response["EventSourceArn"] == queue.attributes["QueueArn"]
    assert response["State"] == "Enabled"

    sqs_client = boto3.client("sqs", region_name="us-east-1")
    sqs_client.send_message(QueueUrl=queue.url, MessageBody="test")
    start = time.time()
    while (time.time() - start) < 30:
        result = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction")
        log_streams = result.get("logStreams")
        if not log_streams:
            time.sleep(1)
            continue

        assert len(log_streams) == 1
        result = logs_conn.get_log_events(
            logGroupName="/aws/lambda/testFunction",
            logStreamName=log_streams[0]["logStreamName"],
        )
        for event in result.get("events"):
            if event["message"] == "get_test_zip_file3 success":
                return
        time.sleep(1)

    assert False, "Test Failed"


@mock_logs
@mock_lambda
@mock_dynamodb2
def test_invoke_function_from_dynamodb():
    logs_conn = boto3.client("logs", region_name="us-east-1")
    dynamodb = boto3.client("dynamodb", region_name="us-east-1")
    table_name = "table_with_stream"
    table = dynamodb.create_table(
        TableName=table_name,
        KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
        AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
        StreamSpecification={
            "StreamEnabled": True,
            "StreamViewType": "NEW_AND_OLD_IMAGES",
        },
    )

    conn = boto3.client("lambda", region_name="us-east-1")
    func = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": get_test_zip_file3()},
        Description="test lambda function executed after a DynamoDB table is updated",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    response = conn.create_event_source_mapping(
        EventSourceArn=table["TableDescription"]["LatestStreamArn"],
        FunctionName=func["FunctionArn"],
    )

    assert response["EventSourceArn"] == table["TableDescription"]["LatestStreamArn"]
    assert response["State"] == "Enabled"

    dynamodb.put_item(TableName=table_name, Item={"id": {"S": "item 1"}})
    start = time.time()
    while (time.time() - start) < 30:
        result = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction")
        log_streams = result.get("logStreams")
        if not log_streams:
            time.sleep(1)
            continue

        assert len(log_streams) == 1
        result = logs_conn.get_log_events(
            logGroupName="/aws/lambda/testFunction",
            logStreamName=log_streams[0]["logStreamName"],
        )
        for event in result.get("events"):
            if event["message"] == "get_test_zip_file3 success":
                return
        time.sleep(1)

    assert False, "Test Failed"


@mock_logs
@mock_lambda
@mock_sqs
def test_invoke_function_from_sqs_exception():
    logs_conn = boto3.client("logs", region_name="us-east-1")
    sqs = boto3.resource("sqs", region_name="us-east-1")
    queue = sqs.create_queue(QueueName="test-sqs-queue1")

    conn = boto3.client("lambda", region_name="us-east-1")
    func = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": get_test_zip_file4()},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    response = conn.create_event_source_mapping(
        EventSourceArn=queue.attributes["QueueArn"], FunctionName=func["FunctionArn"]
    )

    assert response["EventSourceArn"] == queue.attributes["QueueArn"]
    assert response["State"] == "Enabled"

    entries = []
    for i in range(3):
        body = {"uuid": str(uuid.uuid4()), "test": "test_{}".format(i)}
        entry = {"Id": str(i), "MessageBody": json.dumps(body)}
        entries.append(entry)

    queue.send_messages(Entries=entries)

    start = time.time()
    while (time.time() - start) < 30:
        result = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction")
        log_streams = result.get("logStreams")
        if not log_streams:
            time.sleep(1)
            continue
        assert len(log_streams) >= 1

        result = logs_conn.get_log_events(
            logGroupName="/aws/lambda/testFunction",
            logStreamName=log_streams[0]["logStreamName"],
        )
        for event in result.get("events"):
            if "I failed!" in event["message"]:
                messages = queue.receive_messages(MaxNumberOfMessages=10)
                # Verify messages are still visible and unprocessed
                assert len(messages) == 3
                return
        time.sleep(1)

    assert False, "Test Failed"


@mock_logs
@mock_lambda
@mock_sqs
def test_list_event_source_mappings():
    sqs = boto3.resource("sqs", region_name="us-east-1")
    queue = sqs.create_queue(QueueName="test-sqs-queue1")

    conn = boto3.client("lambda", region_name="us-east-1")
    func = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": get_test_zip_file3()},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )
    response = conn.create_event_source_mapping(
        EventSourceArn=queue.attributes["QueueArn"], FunctionName=func["FunctionArn"]
    )
    mappings = conn.list_event_source_mappings(EventSourceArn="123")
    assert len(mappings["EventSourceMappings"]) == 0

    mappings = conn.list_event_source_mappings(
        EventSourceArn=queue.attributes["QueueArn"]
    )
    assert len(mappings["EventSourceMappings"]) == 1
    assert mappings["EventSourceMappings"][0]["UUID"] == response["UUID"]
    assert mappings["EventSourceMappings"][0]["FunctionArn"] == func["FunctionArn"]


@mock_lambda
@mock_sqs
def test_get_event_source_mapping():
    sqs = boto3.resource("sqs", region_name="us-east-1")
    queue = sqs.create_queue(QueueName="test-sqs-queue1")

    conn = boto3.client("lambda", region_name="us-east-1")
    func = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": get_test_zip_file3()},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )
    response = conn.create_event_source_mapping(
        EventSourceArn=queue.attributes["QueueArn"], FunctionName=func["FunctionArn"]
    )
    mapping = conn.get_event_source_mapping(UUID=response["UUID"])
    assert mapping["UUID"] == response["UUID"]
    assert mapping["FunctionArn"] == func["FunctionArn"]

    conn.get_event_source_mapping.when.called_with(UUID="1").should.throw(
        botocore.client.ClientError
    )


@mock_lambda
@mock_sqs
def test_update_event_source_mapping():
    sqs = boto3.resource("sqs", region_name="us-east-1")
    queue = sqs.create_queue(QueueName="test-sqs-queue1")

    conn = boto3.client("lambda", region_name="us-east-1")
    func1 = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": get_test_zip_file3()},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )
    func2 = conn.create_function(
        FunctionName="testFunction2",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": get_test_zip_file3()},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )
    response = conn.create_event_source_mapping(
        EventSourceArn=queue.attributes["QueueArn"], FunctionName=func1["FunctionArn"]
    )
    assert response["FunctionArn"] == func1["FunctionArn"]
    assert response["BatchSize"] == 10
    assert response["State"] == "Enabled"

    mapping = conn.update_event_source_mapping(
        UUID=response["UUID"], Enabled=False, BatchSize=15, FunctionName="testFunction2"
    )
    assert mapping["UUID"] == response["UUID"]
    assert mapping["FunctionArn"] == func2["FunctionArn"]
    assert mapping["State"] == "Disabled"


@mock_lambda
@mock_sqs
def test_delete_event_source_mapping():
    sqs = boto3.resource("sqs", region_name="us-east-1")
    queue = sqs.create_queue(QueueName="test-sqs-queue1")

    conn = boto3.client("lambda", region_name="us-east-1")
    func1 = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": get_test_zip_file3()},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )
    response = conn.create_event_source_mapping(
        EventSourceArn=queue.attributes["QueueArn"], FunctionName=func1["FunctionArn"]
    )
    assert response["FunctionArn"] == func1["FunctionArn"]
    assert response["BatchSize"] == 10
    assert response["State"] == "Enabled"

    response = conn.delete_event_source_mapping(UUID=response["UUID"])

    assert response["State"] == "Deleting"
    conn.get_event_source_mapping.when.called_with(UUID=response["UUID"]).should.throw(
        botocore.client.ClientError
    )


@mock_lambda
@mock_s3
def test_update_configuration():
    s3_conn = boto3.client("s3", "us-west-2")
    s3_conn.create_bucket(Bucket="test-bucket")

    zip_content = get_test_zip_file2()
    s3_conn.put_object(Bucket="test-bucket", Key="test.zip", Body=zip_content)
    conn = boto3.client("lambda", "us-west-2")

    fxn = conn.create_function(
        FunctionName="testFunction",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
        Environment={"Variables": {"test_old_environment": "test_old_value"}},
    )

    assert fxn["Description"] == "test lambda function"
    assert fxn["Handler"] == "lambda_function.lambda_handler"
    assert fxn["MemorySize"] == 128
    assert fxn["Runtime"] == "python2.7"
    assert fxn["Timeout"] == 3

    updated_config = conn.update_function_configuration(
        FunctionName="testFunction",
        Description="updated test lambda function",
        Handler="lambda_function.new_lambda_handler",
        Runtime="python3.6",
        Timeout=7,
        Environment={"Variables": {"test_environment": "test_value"}},
    )

    assert updated_config["ResponseMetadata"]["HTTPStatusCode"] == 200
    assert updated_config["Description"] == "updated test lambda function"
    assert updated_config["Handler"] == "lambda_function.new_lambda_handler"
    assert updated_config["MemorySize"] == 128
    assert updated_config["Runtime"] == "python3.6"
    assert updated_config["Timeout"] == 7
    assert updated_config["Environment"]["Variables"] == {
        "test_environment": "test_value"
    }


@mock_lambda
def test_update_function_zip():
    conn = boto3.client("lambda", "us-west-2")

    zip_content_one = get_test_zip_file1()

    fxn = conn.create_function(
        FunctionName="testFunctionZip",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"ZipFile": zip_content_one},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    zip_content_two = get_test_zip_file2()

    fxn_updated = conn.update_function_code(
        FunctionName="testFunctionZip", ZipFile=zip_content_two, Publish=True
    )

    response = conn.get_function(FunctionName="testFunctionZip", Qualifier="2")
    response["Configuration"].pop("LastModified")

    response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
    assert len(response["Code"]) == 2
    assert response["Code"]["RepositoryType"] == "S3"
    assert response["Code"]["Location"].startswith(
        "s3://awslambda-{0}-tasks.s3-{0}.amazonaws.com".format(_lambda_region)
    )
    response["Configuration"].should.equal(
        {
            "CodeSha256": hashlib.sha256(zip_content_two).hexdigest(),
            "CodeSize": len(zip_content_two),
            "Description": "test lambda function",
            "FunctionArn": "arn:aws:lambda:{}:123456789012:function:testFunctionZip:2".format(
                _lambda_region
            ),
            "FunctionName": "testFunctionZip",
            "Handler": "lambda_function.lambda_handler",
            "MemorySize": 128,
            "Role": fxn["Role"],
            "Runtime": "python2.7",
            "Timeout": 3,
            "Version": "2",
            "VpcConfig": {"SecurityGroupIds": [], "SubnetIds": []},
        }
    )


@mock_lambda
@mock_s3
def test_update_function_s3():
    s3_conn = boto3.client("s3", "us-west-2")
    s3_conn.create_bucket(Bucket="test-bucket")

    zip_content = get_test_zip_file1()
    s3_conn.put_object(Bucket="test-bucket", Key="test.zip", Body=zip_content)

    conn = boto3.client("lambda", "us-west-2")

    fxn = conn.create_function(
        FunctionName="testFunctionS3",
        Runtime="python2.7",
        Role=get_role_name(),
        Handler="lambda_function.lambda_handler",
        Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
        Description="test lambda function",
        Timeout=3,
        MemorySize=128,
        Publish=True,
    )

    zip_content_two = get_test_zip_file2()
    s3_conn.put_object(Bucket="test-bucket", Key="test2.zip", Body=zip_content_two)

    fxn_updated = conn.update_function_code(
        FunctionName="testFunctionS3",
        S3Bucket="test-bucket",
        S3Key="test2.zip",
        Publish=True,
    )

    response = conn.get_function(FunctionName="testFunctionS3", Qualifier="2")
    response["Configuration"].pop("LastModified")

    response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
    assert len(response["Code"]) == 2
    assert response["Code"]["RepositoryType"] == "S3"
    assert response["Code"]["Location"].startswith(
        "s3://awslambda-{0}-tasks.s3-{0}.amazonaws.com".format(_lambda_region)
    )
    response["Configuration"].should.equal(
        {
            "CodeSha256": hashlib.sha256(zip_content_two).hexdigest(),
            "CodeSize": len(zip_content_two),
            "Description": "test lambda function",
            "FunctionArn": "arn:aws:lambda:{}:123456789012:function:testFunctionS3:2".format(
                _lambda_region
            ),
            "FunctionName": "testFunctionS3",
            "Handler": "lambda_function.lambda_handler",
            "MemorySize": 128,
            "Role": fxn["Role"],
            "Runtime": "python2.7",
            "Timeout": 3,
            "Version": "2",
            "VpcConfig": {"SecurityGroupIds": [], "SubnetIds": []},
        }
    )


@mock_lambda
def test_create_function_with_invalid_arn():
    err = create_invalid_lambda("test-iam-role")
    err.exception.response["Error"]["Message"].should.equal(
        "1 validation error detected: Value 'test-iam-role' at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::(\d{12}):role/?[a-zA-Z_0-9+=,.@\-_/]+"
    )


@mock_lambda
def test_create_function_with_arn_from_different_account():
    err = create_invalid_lambda("arn:aws:iam::000000000000:role/example_role")
    err.exception.response["Error"]["Message"].should.equal(
        "Cross-account pass role is not allowed."
    )


@mock_lambda
def test_create_function_with_unknown_arn():
    err = create_invalid_lambda(
        "arn:aws:iam::" + str(ACCOUNT_ID) + ":role/service-role/unknown_role"
    )
    err.exception.response["Error"]["Message"].should.equal(
        "The role defined for the function cannot be assumed by Lambda."
    )


def create_invalid_lambda(role):
    conn = boto3.client("lambda", "us-west-2")
    zip_content = get_test_zip_file1()
    with assert_raises(ClientError) as err:
        conn.create_function(
            FunctionName="testFunction",
            Runtime="python2.7",
            Role=role,
            Handler="lambda_function.handler",
            Code={"ZipFile": zip_content},
            Description="test lambda function",
            Timeout=3,
            MemorySize=128,
            Publish=True,
        )
    return err


def get_role_name():
    with mock_iam():
        iam = boto3.client("iam")
        try:
            return iam.get_role(RoleName="my-role")["Role"]["Arn"]
        except ClientError:
            return iam.create_role(
                RoleName="my-role",
                AssumeRolePolicyDocument="some policy",
                Path="/my-path/",
            )["Role"]["Arn"]