moto/tests/test_awslambda/test_lambda.py
2023-06-28 21:23:33 +00:00

1332 lines
45 KiB
Python

import base64
import json
import os
from unittest import SkipTest, mock
import boto3
import hashlib
import pytest
from botocore.exceptions import ClientError
from freezegun import freeze_time
from tests.test_ecr.test_ecr_helpers import _create_image_manifest
from moto import mock_lambda, mock_s3, mock_ecr, settings
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from uuid import uuid4
from .utilities import (
get_role_name,
get_test_zip_file1,
get_test_zip_file2,
get_test_zip_file3,
create_invalid_lambda,
_process_lambda,
)
_lambda_region = "us-west-2"
boto3.setup_default_session(region_name=_lambda_region)
@mock.patch.dict("os.environ", {"MOTO_ENABLE_ISO_REGIONS": "true"})
@pytest.mark.parametrize("region", ["us-west-2", "cn-northwest-1", "us-isob-east-1"])
@mock_lambda
def test_lambda_regions(region):
client = boto3.client("lambda", region_name=region)
resp = client.list_functions()
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
@mock_lambda
def test_list_functions():
conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6]
initial_list = conn.list_functions()["Functions"]
initial_names = [f["FunctionName"] for f in initial_list]
assert function_name not in initial_names
conn.create_function(
FunctionName=function_name,
Runtime="python3.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file1()},
)
names = [f["FunctionName"] for f in conn.list_functions()["Functions"]]
assert function_name in names
conn.publish_version(FunctionName=function_name, Description="v2")
func_list = conn.list_functions()["Functions"]
our_functions = [f for f in func_list if f["FunctionName"] == function_name]
assert len(our_functions) == 1
# FunctionVersion=ALL means we should get a list of all versions
full_list = conn.list_functions(FunctionVersion="ALL")["Functions"]
our_functions = [f for f in full_list if f["FunctionName"] == function_name]
assert len(our_functions) == 2
v1 = [f for f in our_functions if f["Version"] == "1"][0]
assert v1["Description"] == "v2"
assert (
v1["FunctionArn"]
== f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:1"
)
latest = [f for f in our_functions if f["Version"] == "$LATEST"][0]
assert latest["Description"] == ""
assert (
latest["FunctionArn"]
== f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:$LATEST"
)
@mock_lambda
def test_create_based_on_s3_with_missing_bucket():
conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6]
with pytest.raises(ClientError) as exc:
conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": "this-bucket-does-not-exist", "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
VpcConfig={
"SecurityGroupIds": ["sg-123abc"],
"SubnetIds": ["subnet-123abc"],
},
)
err = exc.value.response["Error"]
assert (
err["Message"]
== "Error occurred while GetObject. S3 Error Code: NoSuchBucket. S3 Error Message: The specified bucket does not exist"
)
@mock_lambda
@mock_s3
@freeze_time("2015-01-01 00:00:00")
def test_create_function_from_aws_bucket():
bucket_name = str(uuid4())
s3_conn = boto3.client("s3", _lambda_region)
s3_conn.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": _lambda_region},
)
zip_content = get_test_zip_file2()
s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6]
result = conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
PackageType="ZIP",
Publish=True,
VpcConfig={"SecurityGroupIds": ["sg-123abc"], "SubnetIds": ["subnet-123abc"]},
)
assert result["FunctionName"] == function_name
assert (
result["FunctionArn"]
== f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}"
)
assert result["Runtime"] == "python2.7"
assert result["Handler"] == "lambda_function.lambda_handler"
assert result["CodeSha256"] == base64.b64encode(
hashlib.sha256(zip_content).digest()
).decode("utf-8")
assert result["State"] == "Active"
@mock_lambda
@freeze_time("2015-01-01 00:00:00")
def test_create_function_from_zipfile():
conn = boto3.client("lambda", _lambda_region)
zip_content = get_test_zip_file1()
function_name = str(uuid4())[0:6]
result = conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": zip_content},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
# this is hard to match against, so remove it
result["ResponseMetadata"].pop("HTTPHeaders", None)
# Botocore inserts retry attempts not seen in Python27
result["ResponseMetadata"].pop("RetryAttempts", None)
result.pop("LastModified")
assert result == {
"FunctionName": function_name,
"FunctionArn": f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}",
"Runtime": "python2.7",
"Role": result["Role"],
"Handler": "lambda_function.lambda_handler",
"CodeSize": len(zip_content),
"Description": "test lambda function",
"Timeout": 3,
"MemorySize": 128,
"CodeSha256": base64.b64encode(hashlib.sha256(zip_content).digest()).decode(
"utf-8"
),
"Version": "1",
"VpcConfig": {"SecurityGroupIds": [], "SubnetIds": []},
"ResponseMetadata": {"HTTPStatusCode": 201},
"State": "Active",
"Layers": [],
"TracingConfig": {"Mode": "PassThrough"},
}
@mock_lambda
@pytest.mark.parametrize(
"tracing_mode",
[(None, "PassThrough"), ("PassThrough", "PassThrough"), ("Active", "Active")],
)
def test_create_function__with_tracingmode(tracing_mode):
conn = boto3.client("lambda", _lambda_region)
source, output = tracing_mode
zip_content = get_test_zip_file1()
function_name = str(uuid4())[0:6]
kwargs = dict(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": zip_content},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
if source:
kwargs["TracingConfig"] = {"Mode": source}
result = conn.create_function(**kwargs)
assert result["TracingConfig"] == {"Mode": output}
@pytest.fixture(name="with_ecr_mock")
def ecr_repo_fixture():
with mock_ecr():
os.environ["MOTO_LAMBDA_STUB_ECR"] = "FALSE"
repo_name = "testlambdaecr"
ecr_client = boto3.client("ecr", "us-east-1")
ecr_client.create_repository(repositoryName=repo_name)
response = ecr_client.put_image(
repositoryName=repo_name,
imageManifest=json.dumps(_create_image_manifest()),
imageTag="latest",
)
yield response["image"]["imageId"]
ecr_client.delete_repository(repositoryName=repo_name, force=True)
os.environ["MOTO_LAMBDA_STUB_ECR"] = "TRUE"
@mock_lambda
def test_create_function_from_stubbed_ecr():
lambda_client = boto3.client("lambda", "us-east-1")
fn_name = str(uuid4())[0:6]
image_uri = "111122223333.dkr.ecr.us-east-1.amazonaws.com/testlambda:latest"
dic = {
"FunctionName": fn_name,
"Role": get_role_name(),
"Code": {"ImageUri": image_uri},
"PackageType": "Image",
"Timeout": 100,
}
resp = lambda_client.create_function(**dic)
assert resp["FunctionName"] == fn_name
assert resp["CodeSize"] == 0
assert "CodeSha256" in resp
assert resp["PackageType"] == "Image"
result = lambda_client.get_function(FunctionName=fn_name)
assert "Configuration" in result
config = result["Configuration"]
assert "Code" in result
code = result["Code"]
assert code["RepositoryType"] == "ECR"
assert code["ImageUri"] == image_uri
image_uri_without_tag = image_uri.split(":")[0]
resolved_image_uri = f"{image_uri_without_tag}@sha256:{config['CodeSha256']}"
assert code["ResolvedImageUri"] == resolved_image_uri
@mock_lambda
def test_create_function_from_mocked_ecr_image_tag(
with_ecr_mock,
): # pylint: disable=unused-argument
if settings.TEST_SERVER_MODE:
raise SkipTest(
"Envars not easily set in server mode, feature off by default, skipping..."
)
lambda_client = boto3.client("lambda", "us-east-1")
fn_name = str(uuid4())[0:6]
image = with_ecr_mock
image_uri = f"{ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/testlambdaecr:{image['imageTag']}"
dic = {
"FunctionName": fn_name,
"Role": get_role_name(),
"Code": {"ImageUri": image_uri},
"PackageType": "Image",
"Timeout": 100,
}
resp = lambda_client.create_function(**dic)
assert resp["FunctionName"] == fn_name
assert resp["CodeSize"] > 0
assert "CodeSha256" in resp
assert resp["PackageType"] == "Image"
result = lambda_client.get_function(FunctionName=fn_name)
assert "Configuration" in result
config = result["Configuration"]
assert config["CodeSha256"] == image["imageDigest"].replace("sha256:", "")
assert config["CodeSize"] == resp["CodeSize"]
assert "Code" in result
code = result["Code"]
assert code["RepositoryType"] == "ECR"
assert code["ImageUri"] == image_uri
image_uri_without_tag = image_uri.split(":")[0]
resolved_image_uri = f"{image_uri_without_tag}@sha256:{config['CodeSha256']}"
assert code["ResolvedImageUri"] == resolved_image_uri
@mock_lambda
def test_create_function_from_mocked_ecr_image_digest(
with_ecr_mock,
): # pylint: disable=unused-argument
if settings.TEST_SERVER_MODE:
raise SkipTest(
"Envars not easily set in server mode, feature off by default, skipping..."
)
lambda_client = boto3.client("lambda", "us-east-1")
fn_name = str(uuid4())[0:6]
image = with_ecr_mock
image_uri = f"{ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/testlambdaecr@{image['imageDigest']}"
dic = {
"FunctionName": fn_name,
"Role": get_role_name(),
"Code": {"ImageUri": image_uri},
"PackageType": "Image",
"Timeout": 100,
}
resp = lambda_client.create_function(**dic)
assert resp["FunctionName"] == fn_name
assert resp["CodeSize"] > 0
assert resp["CodeSha256"] == image["imageDigest"].replace("sha256:", "")
assert resp["PackageType"] == "Image"
@mock_lambda
def test_create_function_from_mocked_ecr_missing_image(
with_ecr_mock,
): # pylint: disable=unused-argument
if settings.TEST_SERVER_MODE:
raise SkipTest(
"Envars not easily set in server mode, feature off by default, skipping..."
)
lambda_client = boto3.client("lambda", "us-east-1")
fn_name = str(uuid4())[0:6]
image_uri = f"{ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/testlambdaecr:dne"
dic = {
"FunctionName": fn_name,
"Role": get_role_name(),
"Code": {"ImageUri": image_uri},
"PackageType": "Image",
"Timeout": 100,
}
with pytest.raises(ClientError) as exc:
lambda_client.create_function(**dic)
err = exc.value.response["Error"]
assert err["Code"] == "ImageNotFoundException"
assert (
err["Message"]
== "The image with imageId {'imageTag': 'dne'} does not exist within the repository with name 'testlambdaecr' in the registry with id '123456789012'"
)
@mock_lambda
@mock_s3
@freeze_time("2015-01-01 00:00:00")
def test_get_function():
bucket_name = str(uuid4())
s3_conn = boto3.client("s3", _lambda_region)
s3_conn.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": _lambda_region},
)
zip_content = get_test_zip_file1()
s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6]
conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
Environment={"Variables": {"test_variable": "test_value"}},
)
result = conn.get_function(FunctionName=function_name)
# this is hard to match against, so remove it
result["ResponseMetadata"].pop("HTTPHeaders", None)
# Botocore inserts retry attempts not seen in Python27
result["ResponseMetadata"].pop("RetryAttempts", None)
result["Configuration"].pop("LastModified")
assert (
result["Code"]["Location"]
== f"s3://awslambda-{_lambda_region}-tasks.s3-{_lambda_region}.amazonaws.com/test.zip"
)
assert result["Code"]["RepositoryType"] == "S3"
assert result["Configuration"]["CodeSha256"] == base64.b64encode(
hashlib.sha256(zip_content).digest()
).decode("utf-8")
assert result["Configuration"]["CodeSize"] == len(zip_content)
assert result["Configuration"]["Description"] == "test lambda function"
assert "FunctionArn" in result["Configuration"]
assert result["Configuration"]["FunctionName"] == function_name
assert result["Configuration"]["Handler"] == "lambda_function.lambda_handler"
assert result["Configuration"]["MemorySize"] == 128
assert result["Configuration"]["Role"] == get_role_name()
assert result["Configuration"]["Runtime"] == "python2.7"
assert result["Configuration"]["Timeout"] == 3
assert result["Configuration"]["Version"] == "$LATEST"
assert "VpcConfig" in result["Configuration"]
assert "Environment" in result["Configuration"]
assert "Variables" in result["Configuration"]["Environment"]
assert result["Configuration"]["Environment"]["Variables"] == {
"test_variable": "test_value"
}
# Test get function with qualifier
result = conn.get_function(FunctionName=function_name, Qualifier="$LATEST")
assert result["Configuration"]["Version"] == "$LATEST"
assert (
result["Configuration"]["FunctionArn"]
== f"arn:aws:lambda:us-west-2:{ACCOUNT_ID}:function:{function_name}:$LATEST"
)
# Test get function when can't find function name
with pytest.raises(conn.exceptions.ResourceNotFoundException):
conn.get_function(FunctionName="junk", Qualifier="$LATEST")
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
@mock_lambda
@mock_s3
@freeze_time("2015-01-01 00:00:00")
def test_get_function_configuration(key):
bucket_name = str(uuid4())
s3_conn = boto3.client("s3", _lambda_region)
s3_conn.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": _lambda_region},
)
zip_content = get_test_zip_file1()
s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6]
fxn = conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
Environment={"Variables": {"test_variable": "test_value"}},
)
name_or_arn = fxn[key]
result = conn.get_function_configuration(FunctionName=name_or_arn)
assert result["CodeSha256"] == base64.b64encode(
hashlib.sha256(zip_content).digest()
).decode("utf-8")
assert result["CodeSize"] == len(zip_content)
assert result["Description"] == "test lambda function"
assert "FunctionArn" in result
assert result["FunctionName"] == function_name
assert result["Handler"] == "lambda_function.lambda_handler"
assert result["MemorySize"] == 128
assert result["Role"] == get_role_name()
assert result["Runtime"] == "python2.7"
assert result["Timeout"] == 3
assert result["Version"] == "$LATEST"
assert "VpcConfig" in result
assert "Environment" in result
assert "Variables" in result["Environment"]
assert result["Environment"]["Variables"] == {"test_variable": "test_value"}
# Test get function with qualifier
result = conn.get_function_configuration(
FunctionName=name_or_arn, Qualifier="$LATEST"
)
assert result["Version"] == "$LATEST"
assert (
result["FunctionArn"]
== f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:$LATEST"
)
# Test get function when can't find function name
with pytest.raises(conn.exceptions.ResourceNotFoundException):
conn.get_function_configuration(FunctionName="junk", Qualifier="$LATEST")
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
@mock_lambda
@mock_s3
def test_get_function_code_signing_config(key):
bucket_name = str(uuid4())
s3_conn = boto3.client("s3", _lambda_region)
s3_conn.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": _lambda_region},
)
zip_content = get_test_zip_file1()
s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6]
fxn = conn.create_function(
FunctionName=function_name,
Runtime="python3.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
CodeSigningConfigArn="csc:arn",
)
name_or_arn = fxn[key]
result = conn.get_function_code_signing_config(FunctionName=name_or_arn)
assert result["FunctionName"] == function_name
assert result["CodeSigningConfigArn"] == "csc:arn"
@mock_lambda
@mock_s3
def test_get_function_by_arn():
bucket_name = str(uuid4())
s3_conn = boto3.client("s3", "us-east-1")
s3_conn.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": _lambda_region},
)
zip_content = get_test_zip_file2()
s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
conn = boto3.client("lambda", "us-east-1")
function_name = str(uuid4())[0:6]
fnc = conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
result = conn.get_function(FunctionName=fnc["FunctionArn"])
assert result["Configuration"]["FunctionName"] == function_name
@mock_lambda
@mock_s3
def test_delete_function():
bucket_name = str(uuid4())
s3_conn = boto3.client("s3", _lambda_region)
s3_conn.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": _lambda_region},
)
zip_content = get_test_zip_file2()
s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6]
conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
success_result = conn.delete_function(FunctionName=function_name)
# this is hard to match against, so remove it
success_result["ResponseMetadata"].pop("HTTPHeaders", None)
# Botocore inserts retry attempts not seen in Python27
success_result["ResponseMetadata"].pop("RetryAttempts", None)
assert success_result == {"ResponseMetadata": {"HTTPStatusCode": 204}}
func_list = conn.list_functions()["Functions"]
our_functions = [f for f in func_list if f["FunctionName"] == function_name]
assert len(our_functions) == 0
@mock_lambda
@mock_s3
def test_delete_function_by_arn():
bucket_name = str(uuid4())
s3_conn = boto3.client("s3", "us-east-1")
s3_conn.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": _lambda_region},
)
zip_content = get_test_zip_file2()
s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
conn = boto3.client("lambda", "us-east-1")
function_name = str(uuid4())[0:6]
fnc = conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
conn.delete_function(FunctionName=fnc["FunctionArn"])
func_list = conn.list_functions()["Functions"]
our_functions = [f for f in func_list if f["FunctionName"] == function_name]
assert len(our_functions) == 0
@mock_lambda
def test_delete_unknown_function():
conn = boto3.client("lambda", _lambda_region)
with pytest.raises(ClientError) as exc:
conn.delete_function(FunctionName="testFunctionThatDoesntExist")
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
@mock_lambda
@pytest.mark.parametrize(
"name",
[
"bad_function_name",
f"arn:aws:lambda:eu-west-1:{ACCOUNT_ID}:function:bad_function_name",
],
)
def test_publish_version_unknown_function(name):
client = boto3.client("lambda", "eu-west-1")
with pytest.raises(ClientError) as exc:
client.publish_version(FunctionName=name, Description="v2")
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert (
err["Message"]
== f"Function not found: arn:aws:lambda:eu-west-1:{ACCOUNT_ID}:function:bad_function_name"
)
@mock_lambda
@mock_s3
def test_publish():
bucket_name = str(uuid4())
s3_conn = boto3.client("s3", _lambda_region)
s3_conn.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": _lambda_region},
)
zip_content = get_test_zip_file2()
s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6]
conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=False,
)
function_list = conn.list_functions(FunctionVersion="ALL")["Functions"]
our_functions = [f for f in function_list if f["FunctionName"] == function_name]
assert len(our_functions) == 1
latest_arn = our_functions[0]["FunctionArn"]
res = conn.publish_version(FunctionName=function_name)
assert res["ResponseMetadata"]["HTTPStatusCode"] == 201
function_list = conn.list_functions(FunctionVersion="ALL")["Functions"]
our_functions = [f for f in function_list if f["FunctionName"] == function_name]
assert len(our_functions) == 2
# #SetComprehension ;-)
published_arn = list({f["FunctionArn"] for f in our_functions} - {latest_arn})[0]
assert f"{function_name}:1" in published_arn
conn.delete_function(FunctionName=function_name, Qualifier="1")
function_list = conn.list_functions()["Functions"]
our_functions = [f for f in function_list if f["FunctionName"] == function_name]
assert len(our_functions) == 1
assert function_name in our_functions[0]["FunctionArn"]
@mock_lambda
@mock_s3
@freeze_time("2015-01-01 00:00:00")
def test_list_create_list_get_delete_list():
"""
test `list -> create -> list -> get -> delete -> list` integration
"""
bucket_name = str(uuid4())
s3_conn = boto3.client("s3", _lambda_region)
s3_conn.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": _lambda_region},
)
zip_content = get_test_zip_file2()
s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6]
initial_list = conn.list_functions()["Functions"]
initial_names = [f["FunctionName"] for f in initial_list]
assert function_name not in initial_names
function_name = function_name
conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
expected_function_result = {
"Code": {
"Location": f"s3://awslambda-{_lambda_region}-tasks.s3-{_lambda_region}.amazonaws.com/test.zip",
"RepositoryType": "S3",
},
"Configuration": {
"CodeSha256": base64.b64encode(hashlib.sha256(zip_content).digest()).decode(
"utf-8"
),
"CodeSize": len(zip_content),
"Description": "test lambda function",
"FunctionName": function_name,
"Handler": "lambda_function.lambda_handler",
"MemorySize": 128,
"Role": get_role_name(),
"Runtime": "python2.7",
"Timeout": 3,
"Version": "$LATEST",
"VpcConfig": {"SecurityGroupIds": [], "SubnetIds": []},
"State": "Active",
"Layers": [],
"LastUpdateStatus": "Successful",
"TracingConfig": {"Mode": "PassThrough"},
},
"ResponseMetadata": {"HTTPStatusCode": 200},
}
functions = conn.list_functions()["Functions"]
func_names = [f["FunctionName"] for f in functions]
assert function_name in func_names
func_arn = [
f["FunctionArn"] for f in functions if f["FunctionName"] == function_name
][0]
assert (
func_arn
== f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}"
)
functions = conn.list_functions(FunctionVersion="ALL")["Functions"]
our_functions = [f for f in functions if f["FunctionName"] == function_name]
assert len(our_functions) == 2
latest = [f for f in our_functions if f["Version"] == "$LATEST"][0]
assert (
latest["FunctionArn"]
== f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:$LATEST"
)
latest.pop("FunctionArn")
latest.pop("LastModified")
assert latest == expected_function_result["Configuration"]
published = [f for f in our_functions if f["Version"] != "$LATEST"][0]
assert published["Version"] == "1"
assert (
published["FunctionArn"]
== f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:1"
)
func = conn.get_function(FunctionName=function_name)
assert (
func["Configuration"]["FunctionArn"]
== f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}"
)
# this is hard to match against, so remove it
func["ResponseMetadata"].pop("HTTPHeaders", None)
# Botocore inserts retry attempts not seen in Python27
func["ResponseMetadata"].pop("RetryAttempts", None)
func["Configuration"].pop("LastModified")
func["Configuration"].pop("FunctionArn")
assert func == expected_function_result
conn.delete_function(FunctionName=function_name)
functions = conn.list_functions()["Functions"]
func_names = [f["FunctionName"] for f in functions]
assert function_name not in func_names
@mock_lambda
@freeze_time("2015-01-01 00:00:00")
def test_get_function_created_with_zipfile():
conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6]
zip_content = get_test_zip_file1()
conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.handler",
Code={"ZipFile": zip_content},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.get_function(FunctionName=function_name)
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
assert len(response["Code"]) == 2
assert response["Code"]["RepositoryType"] == "S3"
assert response["Code"]["Location"].startswith(
f"s3://awslambda-{_lambda_region}-tasks.s3-{_lambda_region}.amazonaws.com"
)
assert "Configuration" in response
config = response["Configuration"]
assert config["CodeSha256"] == base64.b64encode(
hashlib.sha256(zip_content).digest()
).decode("utf-8")
assert config["CodeSize"] == len(zip_content)
assert config["Description"] == "test lambda function"
assert (
config["FunctionArn"]
== f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}"
)
assert config["FunctionName"] == function_name
assert config["Handler"] == "lambda_function.handler"
assert config["MemorySize"] == 128
assert config["Role"] == get_role_name()
assert config["Runtime"] == "python2.7"
assert config["Timeout"] == 3
assert config["Version"] == "$LATEST"
assert config["State"] == "Active"
assert config["Layers"] == []
assert config["LastUpdateStatus"] == "Successful"
@mock_lambda
@mock_s3
def test_list_versions_by_function():
bucket_name = str(uuid4())
s3_conn = boto3.client("s3", _lambda_region)
s3_conn.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": _lambda_region},
)
zip_content = get_test_zip_file2()
s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6]
conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
conn.update_function_code(FunctionName=function_name, ZipFile=get_test_zip_file1())
res = conn.publish_version(FunctionName=function_name)
assert res["ResponseMetadata"]["HTTPStatusCode"] == 201
versions = conn.list_versions_by_function(FunctionName=function_name)
assert len(versions["Versions"]) == 3
assert (
versions["Versions"][0]["FunctionArn"]
== f"arn:aws:lambda:us-west-2:{ACCOUNT_ID}:function:{function_name}:$LATEST"
)
assert (
versions["Versions"][1]["FunctionArn"]
== f"arn:aws:lambda:us-west-2:{ACCOUNT_ID}:function:{function_name}:1"
)
assert (
versions["Versions"][2]["FunctionArn"]
== f"arn:aws:lambda:us-west-2:{ACCOUNT_ID}:function:{function_name}:2"
)
conn.create_function(
FunctionName="testFunction_2",
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=False,
)
versions = conn.list_versions_by_function(FunctionName="testFunction_2")
assert len(versions["Versions"]) == 1
assert (
versions["Versions"][0]["FunctionArn"]
== f"arn:aws:lambda:us-west-2:{ACCOUNT_ID}:function:testFunction_2:$LATEST"
)
@mock_lambda
@mock_s3
def test_create_function_with_already_exists():
bucket_name = str(uuid4())
s3_conn = boto3.client("s3", _lambda_region)
s3_conn.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": _lambda_region},
)
zip_content = get_test_zip_file2()
s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6]
conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
assert response["FunctionName"] == function_name
@mock_lambda
@mock_s3
def test_list_versions_by_function_for_nonexistent_function():
conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6]
versions = conn.list_versions_by_function(FunctionName=function_name)
assert len(versions["Versions"]) == 0
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
@mock_lambda
@mock_s3
def test_update_configuration(key):
bucket_name = str(uuid4())
function_name = str(uuid4())[0:6]
s3_conn = boto3.client("s3", _lambda_region)
s3_conn.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": _lambda_region},
)
zip_content = get_test_zip_file2()
s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
conn = boto3.client("lambda", _lambda_region)
fxn = conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
Environment={"Variables": {"test_old_environment": "test_old_value"}},
)
name_or_arn = fxn[key]
assert fxn["Description"] == "test lambda function"
assert fxn["Handler"] == "lambda_function.lambda_handler"
assert fxn["MemorySize"] == 128
assert fxn["Runtime"] == "python2.7"
assert fxn["Timeout"] == 3
updated_config = conn.update_function_configuration(
FunctionName=name_or_arn,
Description="updated test lambda function",
Handler="lambda_function.new_lambda_handler",
Runtime="python3.6",
Timeout=7,
VpcConfig={"SecurityGroupIds": ["sg-123abc"], "SubnetIds": ["subnet-123abc"]},
Environment={"Variables": {"test_environment": "test_value"}},
)
assert updated_config["ResponseMetadata"]["HTTPStatusCode"] == 200
assert updated_config["Description"] == "updated test lambda function"
assert updated_config["Handler"] == "lambda_function.new_lambda_handler"
assert updated_config["MemorySize"] == 128
assert updated_config["Runtime"] == "python3.6"
assert updated_config["Timeout"] == 7
assert updated_config["Environment"]["Variables"] == {
"test_environment": "test_value"
}
assert updated_config["VpcConfig"] == {
"SecurityGroupIds": ["sg-123abc"],
"SubnetIds": ["subnet-123abc"],
"VpcId": "vpc-123abc",
}
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
@mock_lambda
def test_update_function_zip(key):
conn = boto3.client("lambda", _lambda_region)
zip_content_one = get_test_zip_file1()
function_name = str(uuid4())[0:6]
fxn = conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": zip_content_one},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
name_or_arn = fxn[key]
first_sha = fxn["CodeSha256"]
zip_content_two = get_test_zip_file2()
update1 = conn.update_function_code(
FunctionName=name_or_arn, ZipFile=zip_content_two, Publish=True
)
assert update1["CodeSha256"] != first_sha
response = conn.get_function(FunctionName=function_name, Qualifier="2")
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
assert len(response["Code"]) == 2
assert response["Code"]["RepositoryType"] == "S3"
assert response["Code"]["Location"].startswith(
f"s3://awslambda-{_lambda_region}-tasks.s3-{_lambda_region}.amazonaws.com"
)
config = response["Configuration"]
assert config["CodeSize"] == len(zip_content_two)
assert config["Description"] == "test lambda function"
assert (
config["FunctionArn"]
== f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:2"
)
assert config["FunctionName"] == function_name
assert config["Version"] == "2"
assert config["LastUpdateStatus"] == "Successful"
assert config["CodeSha256"] == update1["CodeSha256"]
most_recent_config = conn.get_function(FunctionName=function_name)
assert most_recent_config["Configuration"]["CodeSha256"] == update1["CodeSha256"]
# Publishing this again, with the same code, gives us the same version
same_update = conn.update_function_code(
FunctionName=name_or_arn, ZipFile=zip_content_two, Publish=True
)
assert (
same_update["FunctionArn"]
== most_recent_config["Configuration"]["FunctionArn"] + ":2"
)
assert same_update["Version"] == "2"
# Only when updating the code should we have a new version
new_update = conn.update_function_code(
FunctionName=name_or_arn, ZipFile=get_test_zip_file3(), Publish=True
)
assert (
new_update["FunctionArn"]
== most_recent_config["Configuration"]["FunctionArn"] + ":3"
)
assert new_update["Version"] == "3"
@mock_lambda
@mock_s3
def test_update_function_s3():
bucket_name = str(uuid4())
s3_conn = boto3.client("s3", _lambda_region)
s3_conn.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": _lambda_region},
)
zip_content = get_test_zip_file1()
s3_conn.put_object(Bucket=bucket_name, Key="test.zip", Body=zip_content)
conn = boto3.client("lambda", _lambda_region)
function_name = str(uuid4())[0:6]
conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
zip_content_two = get_test_zip_file2()
s3_conn.put_object(Bucket=bucket_name, Key="test2.zip", Body=zip_content_two)
conn.update_function_code(
FunctionName=function_name,
S3Bucket=bucket_name,
S3Key="test2.zip",
Publish=True,
)
response = conn.get_function(FunctionName=function_name, Qualifier="2")
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
assert len(response["Code"]) == 2
assert response["Code"]["RepositoryType"] == "S3"
assert response["Code"]["Location"].startswith(
f"s3://awslambda-{_lambda_region}-tasks.s3-{_lambda_region}.amazonaws.com"
)
config = response["Configuration"]
assert config["CodeSha256"] == base64.b64encode(
hashlib.sha256(zip_content_two).digest()
).decode("utf-8")
assert config["CodeSize"] == len(zip_content_two)
assert config["Description"] == "test lambda function"
assert (
config["FunctionArn"]
== f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:2"
)
assert config["FunctionName"] == function_name
assert config["Version"] == "2"
assert config["LastUpdateStatus"] == "Successful"
@mock_lambda
def test_create_function_with_invalid_arn():
err = create_invalid_lambda("test-iam-role")
assert (
err.value.response["Error"]["Message"]
== r"1 validation error detected: Value 'test-iam-role' at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::(\d{12}):role/?[a-zA-Z_0-9+=,.@\-_/]+"
)
@mock_lambda
def test_create_function_with_arn_from_different_account():
err = create_invalid_lambda("arn:aws:iam::000000000000:role/example_role")
assert (
err.value.response["Error"]["Message"]
== "Cross-account pass role is not allowed."
)
@mock_lambda
def test_create_function_with_unknown_arn():
err = create_invalid_lambda(
"arn:aws:iam::" + str(ACCOUNT_ID) + ":role/service-role/unknown_role"
)
assert (
err.value.response["Error"]["Message"]
== "The role defined for the function cannot be assumed by Lambda."
)
@mock_lambda
def test_remove_unknown_permission_throws_error():
conn = boto3.client("lambda", _lambda_region)
zip_content = get_test_zip_file1()
function_name = str(uuid4())[0:6]
f = conn.create_function(
FunctionName=function_name,
Runtime="python3.7",
Role=(get_role_name()),
Handler="lambda_function.handler",
Code={"ZipFile": zip_content},
)
arn = f["FunctionArn"]
with pytest.raises(ClientError) as exc:
conn.remove_permission(FunctionName=arn, StatementId="1")
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert err["Message"] == "No policy is associated with the given resource."
@mock_lambda
def test_multiple_qualifiers():
client = boto3.client("lambda", "us-east-1")
zip_content = get_test_zip_file1()
fn_name = str(uuid4())[0:6]
client.create_function(
FunctionName=fn_name,
Runtime="python3.7",
Role=(get_role_name()),
Handler="lambda_function.handler",
Code={"ZipFile": zip_content},
)
for _ in range(10):
new_zip = _process_lambda(f"func content {_}")
client.update_function_code(FunctionName=fn_name, ZipFile=new_zip)
client.publish_version(FunctionName=fn_name)
resp = client.list_versions_by_function(FunctionName=fn_name)["Versions"]
qualis = [fn["FunctionArn"].split(":")[-1] for fn in resp]
assert qualis == ["$LATEST", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]
client.delete_function(FunctionName=fn_name, Qualifier="4")
client.delete_function(FunctionName=fn_name, Qualifier="5")
resp = client.list_versions_by_function(FunctionName=fn_name)["Versions"]
qualis = [fn["FunctionArn"].split(":")[-1] for fn in resp]
assert qualis == ["$LATEST", "1", "2", "3", "6", "7", "8", "9", "10"]
fn = client.get_function(FunctionName=fn_name, Qualifier="6")["Configuration"]
assert (
fn["FunctionArn"]
== f"arn:aws:lambda:us-east-1:{ACCOUNT_ID}:function:{fn_name}:6"
)
def test_get_role_name_utility_race_condition():
# Play with these variables as needed to reproduce the error.
max_workers, num_threads = 3, 15
errors = []
roles = []
def thread_function(_):
while True:
# noinspection PyBroadException
try:
role = get_role_name()
except ClientError as e:
errors.append(str(e))
break
except Exception:
# boto3 and our own IAMBackend are not thread-safe,
# and occasionally throw weird errors, so we just
# pass and retry.
# https://github.com/boto/boto3/issues/1592
pass
else:
roles.append(role)
break
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(thread_function, range(num_threads))
# Check all threads are accounted for, all roles are the same entity,
# and there are no client errors.
assert len(errors) + len(roles) == num_threads
assert roles.count(roles[0]) == len(roles)
assert len(errors) == 0