2016-02-16 20:38:59 +00:00
import botocore . client
2016-02-12 19:39:20 +00:00
import boto3
2016-02-16 20:15:34 +00:00
import hashlib
2016-10-06 14:14:47 +00:00
import json
2021-10-18 19:44:29 +00:00
import sure # noqa # pylint: disable=unused-import
2021-09-21 15:19:49 +00:00
import pytest
2016-02-12 19:39:20 +00:00
from freezegun import freeze_time
2021-09-21 15:19:49 +00:00
from moto import mock_lambda , mock_s3
from moto . core . models import ACCOUNT_ID
from uuid import uuid4
from . utilities import (
get_role_name ,
get_test_zip_file1 ,
get_test_zip_file2 ,
create_invalid_lambda ,
2019-10-31 15:44:26 +00:00
)
2016-02-16 21:43:33 +00:00
2019-10-31 15:44:26 +00:00
_lambda_region = " us-west-2 "
2019-08-21 01:54:57 +00:00
boto3 . setup_default_session ( region_name = _lambda_region )
2016-02-16 21:43:33 +00:00
2017-09-27 23:04:58 +00:00
2021-01-08 14:22:12 +00:00
@pytest.mark.parametrize ( " region " , [ " us-west-2 " , " cn-northwest-1 " ] )
@mock_lambda
def test_lambda_regions ( region ) :
client = boto3 . client ( " lambda " , region_name = region )
resp = client . list_functions ( )
resp [ " ResponseMetadata " ] [ " HTTPStatusCode " ] . should . equal ( 200 )
2016-02-12 19:39:20 +00:00
@mock_lambda
def test_list_functions ( ) :
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
initial_list = conn . list_functions ( ) [ " Functions " ]
initial_names = [ f [ " FunctionName " ] for f in initial_list ]
initial_names . shouldnt . contain ( function_name )
2021-08-28 14:26:44 +00:00
conn . create_function (
FunctionName = function_name ,
Runtime = " python3.7 " ,
Role = get_role_name ( ) ,
Handler = " lambda_function.lambda_handler " ,
Code = { " ZipFile " : get_test_zip_file1 ( ) } ,
)
2021-09-21 15:19:49 +00:00
names = [ f [ " FunctionName " ] for f in conn . list_functions ( ) [ " Functions " ] ]
names . should . contain ( function_name )
2021-08-28 14:26:44 +00:00
conn . publish_version ( FunctionName = function_name , Description = " v2 " )
2021-09-21 15:19:49 +00:00
func_list = conn . list_functions ( ) [ " Functions " ]
our_functions = [ f for f in func_list if f [ " FunctionName " ] == function_name ]
our_functions . should . have . length_of ( 1 )
2021-08-28 14:26:44 +00:00
# FunctionVersion=ALL means we should get a list of all versions
2021-09-21 15:19:49 +00:00
full_list = conn . list_functions ( FunctionVersion = " ALL " ) [ " Functions " ]
our_functions = [ f for f in full_list if f [ " FunctionName " ] == function_name ]
our_functions . should . have . length_of ( 2 )
2021-08-28 14:26:44 +00:00
2021-09-21 15:19:49 +00:00
v1 = [ f for f in our_functions if f [ " Version " ] == " 1 " ] [ 0 ]
2021-08-28 14:26:44 +00:00
v1 [ " Description " ] . should . equal ( " v2 " )
v1 [ " FunctionArn " ] . should . equal (
" arn:aws:lambda: {} : {} :function: {} :1 " . format (
_lambda_region , ACCOUNT_ID , function_name
)
)
2021-09-21 15:19:49 +00:00
latest = [ f for f in our_functions if f [ " Version " ] == " $LATEST " ] [ 0 ]
2021-08-28 14:26:44 +00:00
latest [ " Description " ] . should . equal ( " " )
latest [ " FunctionArn " ] . should . equal (
" arn:aws:lambda: {} : {} :function: {} :$LATEST " . format (
_lambda_region , ACCOUNT_ID , function_name
)
)
2016-02-12 19:39:20 +00:00
2016-12-03 23:17:15 +00:00
2016-02-12 19:39:20 +00:00
@mock_lambda
2016-02-16 21:43:33 +00:00
def test_create_based_on_s3_with_missing_bucket ( ) :
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2016-02-16 21:43:33 +00:00
conn . create_function . when . called_with (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
Code = { " S3Bucket " : " this-bucket-does-not-exist " , " S3Key " : " test.zip " } ,
Description = " test lambda function " ,
2016-02-16 21:43:33 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
2019-10-31 15:44:26 +00:00
VpcConfig = { " SecurityGroupIds " : [ " sg-123abc " ] , " SubnetIds " : [ " subnet-123abc " ] } ,
2016-02-16 21:43:33 +00:00
) . should . throw ( botocore . client . ClientError )
@mock_lambda
@mock_s3
2019-10-31 15:44:26 +00:00
@freeze_time ( " 2015-01-01 00:00:00 " )
2016-02-16 19:17:10 +00:00
def test_create_function_from_aws_bucket ( ) :
2021-09-21 15:19:49 +00:00
bucket_name = str ( uuid4 ( ) )
2020-02-02 11:48:32 +00:00
s3_conn = boto3 . client ( " s3 " , _lambda_region )
s3_conn . create_bucket (
2021-09-21 15:19:49 +00:00
Bucket = bucket_name ,
2020-02-02 11:48:32 +00:00
CreateBucketConfiguration = { " LocationConstraint " : _lambda_region } ,
)
2016-10-06 09:52:23 +00:00
zip_content = get_test_zip_file2 ( )
2016-10-09 15:13:52 +00:00
2021-09-21 15:19:49 +00:00
s3_conn . put_object ( Bucket = bucket_name , Key = " test.zip " , Body = zip_content )
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2016-02-12 19:39:20 +00:00
result = conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
2021-09-21 15:19:49 +00:00
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
2019-10-31 15:44:26 +00:00
Description = " test lambda function " ,
2016-02-12 19:39:20 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
2019-10-31 15:44:26 +00:00
VpcConfig = { " SecurityGroupIds " : [ " sg-123abc " ] , " SubnetIds " : [ " subnet-123abc " ] } ,
2016-02-12 19:39:20 +00:00
)
2017-02-24 02:37:43 +00:00
# this is hard to match against, so remove it
2019-10-31 15:44:26 +00:00
result [ " ResponseMetadata " ] . pop ( " HTTPHeaders " , None )
2017-02-24 02:37:43 +00:00
# Botocore inserts retry attempts not seen in Python27
2019-10-31 15:44:26 +00:00
result [ " ResponseMetadata " ] . pop ( " RetryAttempts " , None )
result . pop ( " LastModified " )
result . should . equal (
{
2021-09-21 15:19:49 +00:00
" FunctionName " : function_name ,
" FunctionArn " : " arn:aws:lambda: {} : {} :function: {} " . format (
_lambda_region , ACCOUNT_ID , function_name
2019-10-31 15:44:26 +00:00
) ,
" Runtime " : " python2.7 " ,
2019-11-07 17:11:13 +00:00
" Role " : result [ " Role " ] ,
2019-10-31 15:44:26 +00:00
" Handler " : " lambda_function.lambda_handler " ,
" CodeSha256 " : hashlib . sha256 ( zip_content ) . hexdigest ( ) ,
" CodeSize " : len ( zip_content ) ,
" Description " : " test lambda function " ,
" Timeout " : 3 ,
" MemorySize " : 128 ,
" Version " : " 1 " ,
" VpcConfig " : {
" SecurityGroupIds " : [ " sg-123abc " ] ,
" SubnetIds " : [ " subnet-123abc " ] ,
" VpcId " : " vpc-123abc " ,
} ,
" ResponseMetadata " : { " HTTPStatusCode " : 201 } ,
2020-01-23 18:46:24 +00:00
" State " : " Active " ,
2021-01-17 15:28:49 +00:00
" Layers " : [ ] ,
2019-10-31 15:44:26 +00:00
}
)
2016-02-16 20:15:34 +00:00
@mock_lambda
2019-10-31 15:44:26 +00:00
@freeze_time ( " 2015-01-01 00:00:00 " )
2016-02-16 20:15:34 +00:00
def test_create_function_from_zipfile ( ) :
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2016-10-06 09:52:23 +00:00
zip_content = get_test_zip_file1 ( )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2016-02-16 20:15:34 +00:00
result = conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
Code = { " ZipFile " : zip_content } ,
Description = " test lambda function " ,
2016-02-16 20:15:34 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
2017-02-24 02:37:43 +00:00
# this is hard to match against, so remove it
2019-10-31 15:44:26 +00:00
result [ " ResponseMetadata " ] . pop ( " HTTPHeaders " , None )
2017-02-24 02:37:43 +00:00
# Botocore inserts retry attempts not seen in Python27
2019-10-31 15:44:26 +00:00
result [ " ResponseMetadata " ] . pop ( " RetryAttempts " , None )
result . pop ( " LastModified " )
2016-02-12 19:39:20 +00:00
2019-10-31 15:44:26 +00:00
result . should . equal (
{
2021-09-21 15:19:49 +00:00
" FunctionName " : function_name ,
" FunctionArn " : " arn:aws:lambda: {} : {} :function: {} " . format (
_lambda_region , ACCOUNT_ID , function_name
2019-10-31 15:44:26 +00:00
) ,
" Runtime " : " python2.7 " ,
2019-11-07 17:11:13 +00:00
" Role " : result [ " Role " ] ,
2019-10-31 15:44:26 +00:00
" Handler " : " lambda_function.lambda_handler " ,
" CodeSize " : len ( zip_content ) ,
" Description " : " test lambda function " ,
" Timeout " : 3 ,
" MemorySize " : 128 ,
" CodeSha256 " : hashlib . sha256 ( zip_content ) . hexdigest ( ) ,
" Version " : " 1 " ,
" VpcConfig " : { " SecurityGroupIds " : [ ] , " SubnetIds " : [ ] } ,
" ResponseMetadata " : { " HTTPStatusCode " : 201 } ,
2020-01-23 18:46:24 +00:00
" State " : " Active " ,
2021-01-17 15:28:49 +00:00
" Layers " : [ ] ,
2019-10-31 15:44:26 +00:00
}
)
2016-02-12 19:39:20 +00:00
@mock_lambda
2016-02-16 21:43:33 +00:00
@mock_s3
2019-10-31 15:44:26 +00:00
@freeze_time ( " 2015-01-01 00:00:00 " )
2016-02-12 19:39:20 +00:00
def test_get_function ( ) :
2021-09-21 15:19:49 +00:00
bucket_name = str ( uuid4 ( ) )
2020-02-02 11:48:32 +00:00
s3_conn = boto3 . client ( " s3 " , _lambda_region )
s3_conn . create_bucket (
2021-09-21 15:19:49 +00:00
Bucket = bucket_name ,
2020-02-02 11:48:32 +00:00
CreateBucketConfiguration = { " LocationConstraint " : _lambda_region } ,
)
2016-02-17 21:24:17 +00:00
2016-10-06 09:52:23 +00:00
zip_content = get_test_zip_file1 ( )
2021-09-21 15:19:49 +00:00
s3_conn . put_object ( Bucket = bucket_name , Key = " test.zip " , Body = zip_content )
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2016-02-12 19:39:20 +00:00
conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
2021-09-21 15:19:49 +00:00
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
2019-10-31 15:44:26 +00:00
Description = " test lambda function " ,
2016-02-12 19:39:20 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
2019-11-04 15:22:03 +00:00
Environment = { " Variables " : { " test_variable " : " test_value " } } ,
2016-02-12 19:39:20 +00:00
)
2021-09-21 15:19:49 +00:00
result = conn . get_function ( FunctionName = function_name )
2017-02-24 02:37:43 +00:00
# this is hard to match against, so remove it
2019-10-31 15:44:26 +00:00
result [ " ResponseMetadata " ] . pop ( " HTTPHeaders " , None )
2017-02-24 02:37:43 +00:00
# Botocore inserts retry attempts not seen in Python27
2019-10-31 15:44:26 +00:00
result [ " ResponseMetadata " ] . pop ( " RetryAttempts " , None )
result [ " Configuration " ] . pop ( " LastModified " )
2016-02-12 19:39:20 +00:00
2019-10-31 15:44:26 +00:00
result [ " Code " ] [ " Location " ] . should . equal (
" s3://awslambda- {0} -tasks.s3- {0} .amazonaws.com/test.zip " . format ( _lambda_region )
)
result [ " Code " ] [ " RepositoryType " ] . should . equal ( " S3 " )
2017-11-26 21:28:28 +00:00
2019-10-31 15:44:26 +00:00
result [ " Configuration " ] [ " CodeSha256 " ] . should . equal (
hashlib . sha256 ( zip_content ) . hexdigest ( )
)
result [ " Configuration " ] [ " CodeSize " ] . should . equal ( len ( zip_content ) )
result [ " Configuration " ] [ " Description " ] . should . equal ( " test lambda function " )
result [ " Configuration " ] . should . contain ( " FunctionArn " )
2021-09-21 15:19:49 +00:00
result [ " Configuration " ] [ " FunctionName " ] . should . equal ( function_name )
2019-10-31 15:44:26 +00:00
result [ " Configuration " ] [ " Handler " ] . should . equal ( " lambda_function.lambda_handler " )
result [ " Configuration " ] [ " MemorySize " ] . should . equal ( 128 )
2019-11-07 17:11:13 +00:00
result [ " Configuration " ] [ " Role " ] . should . equal ( get_role_name ( ) )
2019-10-31 15:44:26 +00:00
result [ " Configuration " ] [ " Runtime " ] . should . equal ( " python2.7 " )
result [ " Configuration " ] [ " Timeout " ] . should . equal ( 3 )
result [ " Configuration " ] [ " Version " ] . should . equal ( " $LATEST " )
result [ " Configuration " ] . should . contain ( " VpcConfig " )
2019-11-04 15:22:03 +00:00
result [ " Configuration " ] . should . contain ( " Environment " )
result [ " Configuration " ] [ " Environment " ] . should . contain ( " Variables " )
result [ " Configuration " ] [ " Environment " ] [ " Variables " ] . should . equal (
{ " test_variable " : " test_value " }
)
2017-11-26 21:28:28 +00:00
2020-08-26 10:06:53 +00:00
# Test get function with qualifier
2021-09-21 15:19:49 +00:00
result = conn . get_function ( FunctionName = function_name , Qualifier = " $LATEST " )
2019-10-31 15:44:26 +00:00
result [ " Configuration " ] [ " Version " ] . should . equal ( " $LATEST " )
result [ " Configuration " ] [ " FunctionArn " ] . should . equal (
2021-09-21 15:19:49 +00:00
" arn:aws:lambda:us-west-2: {} :function: {} :$LATEST " . format (
ACCOUNT_ID , function_name
)
2019-10-31 15:44:26 +00:00
)
2016-02-12 19:39:20 +00:00
2019-02-18 04:32:39 +00:00
# Test get function when can't find function name
2020-10-06 05:54:49 +00:00
with pytest . raises ( conn . exceptions . ResourceNotFoundException ) :
2019-10-31 15:44:26 +00:00
conn . get_function ( FunctionName = " junk " , Qualifier = " $LATEST " )
2019-02-18 04:32:39 +00:00
2019-11-04 15:22:03 +00:00
2021-08-28 06:23:44 +00:00
@pytest.mark.parametrize ( " key " , [ " FunctionName " , " FunctionArn " ] )
2021-01-10 15:24:04 +00:00
@mock_lambda
@mock_s3
@freeze_time ( " 2015-01-01 00:00:00 " )
2021-08-28 06:23:44 +00:00
def test_get_function_configuration ( key ) :
2021-09-21 15:19:49 +00:00
bucket_name = str ( uuid4 ( ) )
2021-01-10 15:24:04 +00:00
s3_conn = boto3 . client ( " s3 " , _lambda_region )
s3_conn . create_bucket (
2021-09-21 15:19:49 +00:00
Bucket = bucket_name ,
2021-01-10 15:24:04 +00:00
CreateBucketConfiguration = { " LocationConstraint " : _lambda_region } ,
)
zip_content = get_test_zip_file1 ( )
2021-09-21 15:19:49 +00:00
s3_conn . put_object ( Bucket = bucket_name , Key = " test.zip " , Body = zip_content )
2021-01-10 15:24:04 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2021-01-10 15:24:04 +00:00
2021-08-28 06:23:44 +00:00
fxn = conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2021-01-10 15:24:04 +00:00
Runtime = " python2.7 " ,
Role = get_role_name ( ) ,
Handler = " lambda_function.lambda_handler " ,
2021-09-21 15:19:49 +00:00
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
2021-01-10 15:24:04 +00:00
Description = " test lambda function " ,
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
Environment = { " Variables " : { " test_variable " : " test_value " } } ,
)
2021-08-28 06:23:44 +00:00
name_or_arn = fxn [ key ]
2021-01-10 15:24:04 +00:00
2021-08-28 06:23:44 +00:00
result = conn . get_function_configuration ( FunctionName = name_or_arn )
2021-01-10 15:24:04 +00:00
result [ " CodeSha256 " ] . should . equal ( hashlib . sha256 ( zip_content ) . hexdigest ( ) )
result [ " CodeSize " ] . should . equal ( len ( zip_content ) )
result [ " Description " ] . should . equal ( " test lambda function " )
result . should . contain ( " FunctionArn " )
2021-09-21 15:19:49 +00:00
result [ " FunctionName " ] . should . equal ( function_name )
2021-01-10 15:24:04 +00:00
result [ " Handler " ] . should . equal ( " lambda_function.lambda_handler " )
result [ " MemorySize " ] . should . equal ( 128 )
result [ " Role " ] . should . equal ( get_role_name ( ) )
result [ " Runtime " ] . should . equal ( " python2.7 " )
result [ " Timeout " ] . should . equal ( 3 )
result [ " Version " ] . should . equal ( " $LATEST " )
result . should . contain ( " VpcConfig " )
result . should . contain ( " Environment " )
result [ " Environment " ] . should . contain ( " Variables " )
result [ " Environment " ] [ " Variables " ] . should . equal ( { " test_variable " : " test_value " } )
# Test get function with qualifier
result = conn . get_function_configuration (
2021-08-28 06:23:44 +00:00
FunctionName = name_or_arn , Qualifier = " $LATEST "
2021-01-10 15:24:04 +00:00
)
result [ " Version " ] . should . equal ( " $LATEST " )
result [ " FunctionArn " ] . should . equal (
2021-09-21 15:19:49 +00:00
" arn:aws:lambda: {} : {} :function: {} :$LATEST " . format (
_lambda_region , ACCOUNT_ID , function_name
2021-01-10 15:24:04 +00:00
)
)
# Test get function when can't find function name
with pytest . raises ( conn . exceptions . ResourceNotFoundException ) :
conn . get_function_configuration ( FunctionName = " junk " , Qualifier = " $LATEST " )
2019-11-04 13:44:01 +00:00
@mock_lambda
@mock_s3
def test_get_function_by_arn ( ) :
2021-09-21 15:19:49 +00:00
bucket_name = str ( uuid4 ( ) )
2019-11-04 15:22:03 +00:00
s3_conn = boto3 . client ( " s3 " , " us-east-1 " )
2020-02-02 11:48:32 +00:00
s3_conn . create_bucket (
Bucket = bucket_name ,
CreateBucketConfiguration = { " LocationConstraint " : _lambda_region } ,
)
2019-11-04 13:44:01 +00:00
zip_content = get_test_zip_file2 ( )
2019-11-04 15:22:03 +00:00
s3_conn . put_object ( Bucket = bucket_name , Key = " test.zip " , Body = zip_content )
conn = boto3 . client ( " lambda " , " us-east-1 " )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2019-11-04 13:44:01 +00:00
2019-11-04 15:22:03 +00:00
fnc = conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-11-04 15:22:03 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-11-04 15:22:03 +00:00
Handler = " lambda_function.lambda_handler " ,
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
Description = " test lambda function " ,
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
2019-11-04 13:44:01 +00:00
2019-11-04 15:22:03 +00:00
result = conn . get_function ( FunctionName = fnc [ " FunctionArn " ] )
2021-09-21 15:19:49 +00:00
result [ " Configuration " ] [ " FunctionName " ] . should . equal ( function_name )
2019-11-04 13:44:01 +00:00
2019-02-18 04:32:39 +00:00
2016-02-12 19:39:20 +00:00
@mock_lambda
2016-02-16 21:43:33 +00:00
@mock_s3
2016-02-12 19:39:20 +00:00
def test_delete_function ( ) :
2021-09-21 15:19:49 +00:00
bucket_name = str ( uuid4 ( ) )
2020-02-02 11:48:32 +00:00
s3_conn = boto3 . client ( " s3 " , _lambda_region )
s3_conn . create_bucket (
2021-09-21 15:19:49 +00:00
Bucket = bucket_name ,
2020-02-02 11:48:32 +00:00
CreateBucketConfiguration = { " LocationConstraint " : _lambda_region } ,
)
2016-02-17 21:24:17 +00:00
2016-10-06 09:52:23 +00:00
zip_content = get_test_zip_file2 ( )
2021-09-21 15:19:49 +00:00
s3_conn . put_object ( Bucket = bucket_name , Key = " test.zip " , Body = zip_content )
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2016-02-12 19:39:20 +00:00
conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
2021-09-21 15:19:49 +00:00
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
2019-10-31 15:44:26 +00:00
Description = " test lambda function " ,
2016-02-12 19:39:20 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
2021-09-21 15:19:49 +00:00
success_result = conn . delete_function ( FunctionName = function_name )
2017-02-24 02:37:43 +00:00
# this is hard to match against, so remove it
2019-10-31 15:44:26 +00:00
success_result [ " ResponseMetadata " ] . pop ( " HTTPHeaders " , None )
2017-02-24 02:37:43 +00:00
# Botocore inserts retry attempts not seen in Python27
2019-10-31 15:44:26 +00:00
success_result [ " ResponseMetadata " ] . pop ( " RetryAttempts " , None )
2016-10-09 15:13:52 +00:00
2019-10-31 15:44:26 +00:00
success_result . should . equal ( { " ResponseMetadata " : { " HTTPStatusCode " : 204 } } )
2016-02-12 19:39:20 +00:00
2021-09-21 15:19:49 +00:00
func_list = conn . list_functions ( ) [ " Functions " ]
our_functions = [ f for f in func_list if f [ " FunctionName " ] == function_name ]
our_functions . should . have . length_of ( 0 )
2019-10-22 08:31:37 +00:00
@mock_lambda
@mock_s3
def test_delete_function_by_arn ( ) :
2021-09-21 15:19:49 +00:00
bucket_name = str ( uuid4 ( ) )
2019-10-31 15:44:26 +00:00
s3_conn = boto3 . client ( " s3 " , " us-east-1 " )
2020-02-02 11:48:32 +00:00
s3_conn . create_bucket (
Bucket = bucket_name ,
CreateBucketConfiguration = { " LocationConstraint " : _lambda_region } ,
)
2019-10-22 08:31:37 +00:00
zip_content = get_test_zip_file2 ( )
2019-10-31 15:44:26 +00:00
s3_conn . put_object ( Bucket = bucket_name , Key = " test.zip " , Body = zip_content )
conn = boto3 . client ( " lambda " , " us-east-1 " )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2019-10-31 15:44:26 +00:00
fnc = conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
Description = " test lambda function " ,
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
2019-10-22 08:31:37 +00:00
2019-10-31 15:44:26 +00:00
conn . delete_function ( FunctionName = fnc [ " FunctionArn " ] )
2021-09-21 15:19:49 +00:00
func_list = conn . list_functions ( ) [ " Functions " ]
our_functions = [ f for f in func_list if f [ " FunctionName " ] == function_name ]
our_functions . should . have . length_of ( 0 )
2019-10-22 08:31:37 +00:00
@mock_lambda
def test_delete_unknown_function ( ) :
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2017-02-24 02:37:43 +00:00
conn . delete_function . when . called_with (
2019-10-31 15:44:26 +00:00
FunctionName = " testFunctionThatDoesntExist "
) . should . throw ( botocore . client . ClientError )
2016-02-12 19:39:20 +00:00
2017-11-26 21:28:28 +00:00
@mock_lambda
@mock_s3
def test_publish ( ) :
2021-09-21 15:19:49 +00:00
bucket_name = str ( uuid4 ( ) )
2020-02-02 11:48:32 +00:00
s3_conn = boto3 . client ( " s3 " , _lambda_region )
s3_conn . create_bucket (
2021-09-21 15:19:49 +00:00
Bucket = bucket_name ,
2020-02-02 11:48:32 +00:00
CreateBucketConfiguration = { " LocationConstraint " : _lambda_region } ,
)
2017-11-26 21:28:28 +00:00
zip_content = get_test_zip_file2 ( )
2021-09-21 15:19:49 +00:00
s3_conn . put_object ( Bucket = bucket_name , Key = " test.zip " , Body = zip_content )
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2017-11-26 21:28:28 +00:00
conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
2021-09-21 15:19:49 +00:00
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
2019-10-31 15:44:26 +00:00
Description = " test lambda function " ,
2017-11-26 21:28:28 +00:00
Timeout = 3 ,
MemorySize = 128 ,
2019-05-21 16:49:56 +00:00
Publish = False ,
2017-11-26 21:28:28 +00:00
)
2021-09-21 15:19:49 +00:00
function_list = conn . list_functions ( FunctionVersion = " ALL " ) [ " Functions " ]
our_functions = [ f for f in function_list if f [ " FunctionName " ] == function_name ]
our_functions . should . have . length_of ( 1 )
latest_arn = our_functions [ 0 ] [ " FunctionArn " ]
2017-11-26 21:28:28 +00:00
2021-09-21 15:19:49 +00:00
res = conn . publish_version ( FunctionName = function_name )
2019-10-31 15:44:26 +00:00
assert res [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 201
2017-11-26 21:28:28 +00:00
2021-09-21 15:19:49 +00:00
function_list = conn . list_functions ( FunctionVersion = " ALL " ) [ " Functions " ]
our_functions = [ f for f in function_list if f [ " FunctionName " ] == function_name ]
our_functions . should . have . length_of ( 2 )
2017-11-26 21:28:28 +00:00
# #SetComprehension ;-)
2021-09-21 15:19:49 +00:00
published_arn = list ( { f [ " FunctionArn " ] for f in our_functions } - { latest_arn } ) [ 0 ]
published_arn . should . contain ( " {} :1 " . format ( function_name ) )
2017-11-26 21:28:28 +00:00
2021-09-21 15:19:49 +00:00
conn . delete_function ( FunctionName = function_name , Qualifier = " 1 " )
2017-11-26 21:28:28 +00:00
2021-09-21 15:19:49 +00:00
function_list = conn . list_functions ( ) [ " Functions " ]
our_functions = [ f for f in function_list if f [ " FunctionName " ] == function_name ]
our_functions . should . have . length_of ( 1 )
our_functions [ 0 ] [ " FunctionArn " ] . should . contain ( function_name )
2017-11-26 21:28:28 +00:00
2016-02-12 19:39:20 +00:00
@mock_lambda
2016-02-16 21:43:33 +00:00
@mock_s3
2019-10-31 15:44:26 +00:00
@freeze_time ( " 2015-01-01 00:00:00 " )
2016-02-12 19:39:20 +00:00
def test_list_create_list_get_delete_list ( ) :
"""
test ` list - > create - > list - > get - > delete - > list ` integration
"""
2021-09-21 15:19:49 +00:00
bucket_name = str ( uuid4 ( ) )
2020-02-02 11:48:32 +00:00
s3_conn = boto3 . client ( " s3 " , _lambda_region )
s3_conn . create_bucket (
2021-09-21 15:19:49 +00:00
Bucket = bucket_name ,
2020-02-02 11:48:32 +00:00
CreateBucketConfiguration = { " LocationConstraint " : _lambda_region } ,
)
2016-02-17 21:24:17 +00:00
2016-10-06 09:52:23 +00:00
zip_content = get_test_zip_file2 ( )
2021-09-21 15:19:49 +00:00
s3_conn . put_object ( Bucket = bucket_name , Key = " test.zip " , Body = zip_content )
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2016-02-12 19:39:20 +00:00
2021-09-21 15:19:49 +00:00
initial_list = conn . list_functions ( ) [ " Functions " ]
initial_names = [ f [ " FunctionName " ] for f in initial_list ]
initial_names . shouldnt . contain ( function_name )
2016-02-12 19:39:20 +00:00
2021-09-21 15:19:49 +00:00
function_name = function_name
2016-02-12 19:39:20 +00:00
conn . create_function (
2021-08-28 14:26:44 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
2021-09-21 15:19:49 +00:00
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
2019-10-31 15:44:26 +00:00
Description = " test lambda function " ,
2016-02-12 19:39:20 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
expected_function_result = {
" Code " : {
2019-10-31 15:44:26 +00:00
" Location " : " s3://awslambda- {0} -tasks.s3- {0} .amazonaws.com/test.zip " . format (
_lambda_region
) ,
" RepositoryType " : " S3 " ,
2016-02-12 19:39:20 +00:00
} ,
" Configuration " : {
2016-02-17 21:24:17 +00:00
" CodeSha256 " : hashlib . sha256 ( zip_content ) . hexdigest ( ) ,
" CodeSize " : len ( zip_content ) ,
2016-02-12 19:39:20 +00:00
" Description " : " test lambda function " ,
2021-08-28 14:26:44 +00:00
" FunctionName " : function_name ,
2017-09-27 23:04:58 +00:00
" Handler " : " lambda_function.lambda_handler " ,
2016-02-12 19:39:20 +00:00
" MemorySize " : 128 ,
2019-11-07 17:11:13 +00:00
" Role " : get_role_name ( ) ,
2016-02-12 19:39:20 +00:00
" Runtime " : " python2.7 " ,
" Timeout " : 3 ,
2019-10-31 15:44:26 +00:00
" Version " : " $LATEST " ,
" VpcConfig " : { " SecurityGroupIds " : [ ] , " SubnetIds " : [ ] } ,
2020-01-23 18:46:24 +00:00
" State " : " Active " ,
2021-01-17 15:28:49 +00:00
" Layers " : [ ] ,
2016-02-12 19:39:20 +00:00
} ,
2019-10-31 15:44:26 +00:00
" ResponseMetadata " : { " HTTPStatusCode " : 200 } ,
2016-02-12 19:39:20 +00:00
}
2021-08-28 14:26:44 +00:00
functions = conn . list_functions ( ) [ " Functions " ]
2021-09-21 15:19:49 +00:00
func_names = [ f [ " FunctionName " ] for f in functions ]
func_names . should . contain ( function_name )
func_arn = [
f [ " FunctionArn " ] for f in functions if f [ " FunctionName " ] == function_name
] [ 0 ]
func_arn . should . equal (
2021-08-28 14:26:44 +00:00
" arn:aws:lambda: {} : {} :function: {} " . format (
_lambda_region , ACCOUNT_ID , function_name
)
)
functions = conn . list_functions ( FunctionVersion = " ALL " ) [ " Functions " ]
2021-09-21 15:19:49 +00:00
our_functions = [ f for f in functions if f [ " FunctionName " ] == function_name ]
our_functions . should . have . length_of ( 2 )
2021-08-28 14:26:44 +00:00
2021-09-21 15:19:49 +00:00
latest = [ f for f in our_functions if f [ " Version " ] == " $LATEST " ] [ 0 ]
2021-08-28 14:26:44 +00:00
latest [ " FunctionArn " ] . should . equal (
" arn:aws:lambda: {} : {} :function: {} :$LATEST " . format (
_lambda_region , ACCOUNT_ID , function_name
)
)
latest . pop ( " FunctionArn " )
latest . pop ( " LastModified " )
latest . should . equal ( expected_function_result [ " Configuration " ] )
2021-09-21 15:19:49 +00:00
published = [ f for f in our_functions if f [ " Version " ] != " $LATEST " ] [ 0 ]
2021-08-28 14:26:44 +00:00
published [ " Version " ] . should . equal ( " 1 " )
published [ " FunctionArn " ] . should . equal (
" arn:aws:lambda: {} : {} :function: {} :1 " . format (
_lambda_region , ACCOUNT_ID , function_name
)
)
func = conn . get_function ( FunctionName = function_name )
func [ " Configuration " ] [ " FunctionArn " ] . should . equal (
" arn:aws:lambda: {} : {} :function: {} " . format (
_lambda_region , ACCOUNT_ID , function_name
)
)
2016-02-12 19:39:20 +00:00
2017-02-24 02:37:43 +00:00
# this is hard to match against, so remove it
2019-10-31 15:44:26 +00:00
func [ " ResponseMetadata " ] . pop ( " HTTPHeaders " , None )
2017-02-24 02:37:43 +00:00
# Botocore inserts retry attempts not seen in Python27
2019-10-31 15:44:26 +00:00
func [ " ResponseMetadata " ] . pop ( " RetryAttempts " , None )
func [ " Configuration " ] . pop ( " LastModified " )
2021-08-28 14:26:44 +00:00
func [ " Configuration " ] . pop ( " FunctionArn " )
2016-10-09 15:13:52 +00:00
2016-06-29 23:45:21 +00:00
func . should . equal ( expected_function_result )
2021-09-21 15:19:49 +00:00
conn . delete_function ( FunctionName = function_name )
2017-03-17 02:00:57 +00:00
2021-09-21 15:19:49 +00:00
functions = conn . list_functions ( ) [ " Functions " ]
func_names = [ f [ " FunctionName " ] for f in functions ]
func_names . shouldnt . contain ( function_name )
2017-09-13 04:44:22 +00:00
2017-10-25 19:04:00 +00:00
2017-09-13 19:00:39 +00:00
@mock_lambda
@mock_s3
def test_tags ( ) :
"""
test list_tags - > tag_resource - > list_tags - > tag_resource - > list_tags - > untag_resource - > list_tags integration
"""
2021-09-21 15:19:49 +00:00
bucket_name = str ( uuid4 ( ) )
2020-02-02 11:48:32 +00:00
s3_conn = boto3 . client ( " s3 " , _lambda_region )
s3_conn . create_bucket (
2021-09-21 15:19:49 +00:00
Bucket = bucket_name ,
2020-02-02 11:48:32 +00:00
CreateBucketConfiguration = { " LocationConstraint " : _lambda_region } ,
)
2017-09-13 19:00:39 +00:00
zip_content = get_test_zip_file2 ( )
2021-09-21 15:19:49 +00:00
s3_conn . put_object ( Bucket = bucket_name , Key = " test.zip " , Body = zip_content )
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2017-09-13 19:00:39 +00:00
function = conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.handler " ,
2021-09-21 15:19:49 +00:00
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
2019-10-31 15:44:26 +00:00
Description = " test lambda function " ,
2017-09-13 19:00:39 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
# List tags when there are none
2019-10-31 15:44:26 +00:00
conn . list_tags ( Resource = function [ " FunctionArn " ] ) [ " Tags " ] . should . equal ( dict ( ) )
2017-09-13 19:00:39 +00:00
# List tags when there is one
2019-10-31 15:44:26 +00:00
conn . tag_resource ( Resource = function [ " FunctionArn " ] , Tags = dict ( spam = " eggs " ) ) [
" ResponseMetadata "
] [ " HTTPStatusCode " ] . should . equal ( 200 )
conn . list_tags ( Resource = function [ " FunctionArn " ] ) [ " Tags " ] . should . equal (
dict ( spam = " eggs " )
)
2017-09-13 19:00:39 +00:00
# List tags when another has been added
2019-10-31 15:44:26 +00:00
conn . tag_resource ( Resource = function [ " FunctionArn " ] , Tags = dict ( foo = " bar " ) ) [
" ResponseMetadata "
] [ " HTTPStatusCode " ] . should . equal ( 200 )
conn . list_tags ( Resource = function [ " FunctionArn " ] ) [ " Tags " ] . should . equal (
dict ( spam = " eggs " , foo = " bar " )
)
2017-09-13 19:00:39 +00:00
# Untag resource
2019-10-31 15:44:26 +00:00
conn . untag_resource ( Resource = function [ " FunctionArn " ] , TagKeys = [ " spam " , " trolls " ] ) [
" ResponseMetadata "
] [ " HTTPStatusCode " ] . should . equal ( 204 )
conn . list_tags ( Resource = function [ " FunctionArn " ] ) [ " Tags " ] . should . equal (
dict ( foo = " bar " )
)
2017-09-13 19:00:39 +00:00
# Untag a tag that does not exist (no error and no change)
2019-10-31 15:44:26 +00:00
conn . untag_resource ( Resource = function [ " FunctionArn " ] , TagKeys = [ " spam " ] ) [
" ResponseMetadata "
] [ " HTTPStatusCode " ] . should . equal ( 204 )
2017-09-13 19:00:39 +00:00
2017-10-25 19:04:00 +00:00
2017-09-13 19:00:39 +00:00
@mock_lambda
def test_tags_not_found ( ) :
"""
Test list_tags and tag_resource when the lambda with the given arn does not exist
"""
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2017-09-13 19:00:39 +00:00
conn . list_tags . when . called_with (
2019-12-16 00:22:26 +00:00
Resource = " arn:aws:lambda: {} :function:not-found " . format ( ACCOUNT_ID )
2017-09-13 19:00:39 +00:00
) . should . throw ( botocore . client . ClientError )
conn . tag_resource . when . called_with (
2019-12-16 00:22:26 +00:00
Resource = " arn:aws:lambda: {} :function:not-found " . format ( ACCOUNT_ID ) ,
2019-10-31 15:44:26 +00:00
Tags = dict ( spam = " eggs " ) ,
2017-09-13 19:00:39 +00:00
) . should . throw ( botocore . client . ClientError )
conn . untag_resource . when . called_with (
2019-12-17 02:25:20 +00:00
Resource = " arn:aws:lambda: {} :function:not-found " . format ( ACCOUNT_ID ) ,
TagKeys = [ " spam " ] ,
2017-09-13 19:00:39 +00:00
) . should . throw ( botocore . client . ClientError )
2017-09-14 02:06:05 +00:00
2017-10-25 19:04:00 +00:00
2017-09-15 02:49:32 +00:00
@mock_lambda
2019-10-31 15:44:26 +00:00
@freeze_time ( " 2015-01-01 00:00:00 " )
2017-09-15 02:49:32 +00:00
def test_get_function_created_with_zipfile ( ) :
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2017-09-15 02:49:32 +00:00
zip_content = get_test_zip_file1 ( )
2021-10-18 19:44:29 +00:00
conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.handler " ,
Code = { " ZipFile " : zip_content } ,
Description = " test lambda function " ,
2017-09-15 02:49:32 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
2021-09-21 15:19:49 +00:00
response = conn . get_function ( FunctionName = function_name )
2019-10-31 15:44:26 +00:00
response [ " Configuration " ] . pop ( " LastModified " )
2017-09-15 02:49:32 +00:00
2019-10-31 15:44:26 +00:00
response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] . should . equal ( 200 )
assert len ( response [ " Code " ] ) == 2
assert response [ " Code " ] [ " RepositoryType " ] == " S3 "
assert response [ " Code " ] [ " Location " ] . startswith (
" s3://awslambda- {0} -tasks.s3- {0} .amazonaws.com " . format ( _lambda_region )
)
response [ " Configuration " ] . should . equal (
2017-09-15 02:49:32 +00:00
{
" CodeSha256 " : hashlib . sha256 ( zip_content ) . hexdigest ( ) ,
" CodeSize " : len ( zip_content ) ,
" Description " : " test lambda function " ,
2021-09-21 15:19:49 +00:00
" FunctionArn " : " arn:aws:lambda: {} : {} :function: {} " . format (
_lambda_region , ACCOUNT_ID , function_name
2019-10-31 15:44:26 +00:00
) ,
2021-09-21 15:19:49 +00:00
" FunctionName " : function_name ,
2017-09-15 02:49:32 +00:00
" Handler " : " lambda_function.handler " ,
" MemorySize " : 128 ,
2019-11-07 17:11:13 +00:00
" Role " : get_role_name ( ) ,
2017-09-15 02:49:32 +00:00
" Runtime " : " python2.7 " ,
" Timeout " : 3 ,
2019-10-31 15:44:26 +00:00
" Version " : " $LATEST " ,
" VpcConfig " : { " SecurityGroupIds " : [ ] , " SubnetIds " : [ ] } ,
2020-01-23 18:46:24 +00:00
" State " : " Active " ,
2021-01-17 15:28:49 +00:00
" Layers " : [ ] ,
2019-10-31 15:44:26 +00:00
}
2017-09-15 03:07:02 +00:00
)
2017-10-03 00:33:50 +00:00
2017-10-25 19:04:00 +00:00
2021-08-28 06:23:44 +00:00
@pytest.mark.parametrize ( " key " , [ " FunctionName " , " FunctionArn " ] )
2017-10-03 02:23:00 +00:00
@mock_lambda
2021-08-28 06:23:44 +00:00
def test_add_function_permission ( key ) :
"""
Parametrized to ensure that we can add permission by using the FunctionName and the FunctionArn
"""
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2017-10-03 02:23:00 +00:00
zip_content = get_test_zip_file1 ( )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2021-08-28 06:23:44 +00:00
f = conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = ( get_role_name ( ) ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.handler " ,
Code = { " ZipFile " : zip_content } ,
Description = " test lambda function " ,
2017-10-03 02:23:00 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
2021-08-28 06:23:44 +00:00
name_or_arn = f [ key ]
2017-10-03 02:23:00 +00:00
response = conn . add_permission (
2021-08-28 06:23:44 +00:00
FunctionName = name_or_arn ,
2019-10-31 15:44:26 +00:00
StatementId = " 1 " ,
2017-10-03 02:23:00 +00:00
Action = " lambda:InvokeFunction " ,
2019-10-31 15:44:26 +00:00
Principal = " 432143214321 " ,
2017-10-03 02:23:00 +00:00
SourceArn = " arn:aws:lambda:us-west-2:account-id:function:helloworld " ,
2019-10-31 15:44:26 +00:00
SourceAccount = " 123412341234 " ,
EventSourceToken = " blah " ,
Qualifier = " 2 " ,
2017-10-03 02:23:00 +00:00
)
2019-10-31 15:44:26 +00:00
assert " Statement " in response
res = json . loads ( response [ " Statement " ] )
assert res [ " Action " ] == " lambda:InvokeFunction "
2017-10-03 02:23:00 +00:00
2021-08-28 06:23:44 +00:00
@pytest.mark.parametrize ( " key " , [ " FunctionName " , " FunctionArn " ] )
2017-10-03 00:33:50 +00:00
@mock_lambda
2021-08-28 06:23:44 +00:00
def test_get_function_policy ( key ) :
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2017-10-03 00:33:50 +00:00
zip_content = get_test_zip_file1 ( )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2021-08-28 06:23:44 +00:00
f = conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.handler " ,
Code = { " ZipFile " : zip_content } ,
Description = " test lambda function " ,
2017-10-03 00:33:50 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
2021-08-28 06:23:44 +00:00
name_or_arn = f [ key ]
2017-10-03 00:33:50 +00:00
2021-08-28 06:23:44 +00:00
conn . add_permission (
FunctionName = name_or_arn ,
2019-10-31 15:44:26 +00:00
StatementId = " 1 " ,
2017-10-03 02:23:00 +00:00
Action = " lambda:InvokeFunction " ,
2019-10-31 15:44:26 +00:00
Principal = " 432143214321 " ,
2017-10-03 02:23:00 +00:00
SourceArn = " arn:aws:lambda:us-west-2:account-id:function:helloworld " ,
2019-10-31 15:44:26 +00:00
SourceAccount = " 123412341234 " ,
EventSourceToken = " blah " ,
Qualifier = " 2 " ,
2017-10-03 02:23:00 +00:00
)
2021-08-28 06:23:44 +00:00
response = conn . get_policy ( FunctionName = name_or_arn )
2017-10-03 00:54:37 +00:00
2019-10-31 15:44:26 +00:00
assert " Policy " in response
res = json . loads ( response [ " Policy " ] )
assert res [ " Statement " ] [ 0 ] [ " Action " ] == " lambda:InvokeFunction "
2019-02-16 15:27:23 +00:00
@mock_lambda
@mock_s3
def test_list_versions_by_function ( ) :
2021-09-21 15:19:49 +00:00
bucket_name = str ( uuid4 ( ) )
2020-02-02 11:48:32 +00:00
s3_conn = boto3 . client ( " s3 " , _lambda_region )
s3_conn . create_bucket (
2021-09-21 15:19:49 +00:00
Bucket = bucket_name ,
2020-02-02 11:48:32 +00:00
CreateBucketConfiguration = { " LocationConstraint " : _lambda_region } ,
)
2019-02-16 15:27:23 +00:00
zip_content = get_test_zip_file2 ( )
2021-09-21 15:19:49 +00:00
s3_conn . put_object ( Bucket = bucket_name , Key = " test.zip " , Body = zip_content )
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2019-02-16 15:27:23 +00:00
conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
2021-09-21 15:19:49 +00:00
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
2019-10-31 15:44:26 +00:00
Description = " test lambda function " ,
2019-02-16 15:27:23 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
2021-09-21 15:19:49 +00:00
res = conn . publish_version ( FunctionName = function_name )
2019-10-31 15:44:26 +00:00
assert res [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 201
2021-09-21 15:19:49 +00:00
versions = conn . list_versions_by_function ( FunctionName = function_name )
2019-10-31 15:44:26 +00:00
assert len ( versions [ " Versions " ] ) == 3
2019-12-17 02:25:20 +00:00
assert versions [ " Versions " ] [ 0 ] [
" FunctionArn "
2021-09-21 15:19:49 +00:00
] == " arn:aws:lambda:us-west-2: {} :function: {} :$LATEST " . format (
ACCOUNT_ID , function_name
)
2019-12-17 02:25:20 +00:00
assert versions [ " Versions " ] [ 1 ] [
" FunctionArn "
2021-09-21 15:19:49 +00:00
] == " arn:aws:lambda:us-west-2: {} :function: {} :1 " . format ( ACCOUNT_ID , function_name )
2019-12-17 02:25:20 +00:00
assert versions [ " Versions " ] [ 2 ] [
" FunctionArn "
2021-09-21 15:19:49 +00:00
] == " arn:aws:lambda:us-west-2: {} :function: {} :2 " . format ( ACCOUNT_ID , function_name )
2019-05-21 16:49:56 +00:00
conn . create_function (
2019-10-31 15:44:26 +00:00
FunctionName = " testFunction_2 " ,
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
2021-09-21 15:19:49 +00:00
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
2019-10-31 15:44:26 +00:00
Description = " test lambda function " ,
2019-05-21 16:49:56 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = False ,
)
2019-10-31 15:44:26 +00:00
versions = conn . list_versions_by_function ( FunctionName = " testFunction_2 " )
assert len ( versions [ " Versions " ] ) == 1
2019-12-17 02:25:20 +00:00
assert versions [ " Versions " ] [ 0 ] [
" FunctionArn "
] == " arn:aws:lambda:us-west-2: {} :function:testFunction_2:$LATEST " . format (
ACCOUNT_ID
2019-10-31 15:44:26 +00:00
)
2019-02-16 15:27:23 +00:00
2019-02-16 18:37:46 +00:00
@mock_lambda
@mock_s3
def test_create_function_with_already_exists ( ) :
2021-09-21 15:19:49 +00:00
bucket_name = str ( uuid4 ( ) )
2020-02-02 11:48:32 +00:00
s3_conn = boto3 . client ( " s3 " , _lambda_region )
s3_conn . create_bucket (
2021-09-21 15:19:49 +00:00
Bucket = bucket_name ,
2020-02-02 11:48:32 +00:00
CreateBucketConfiguration = { " LocationConstraint " : _lambda_region } ,
)
2019-02-16 18:37:46 +00:00
zip_content = get_test_zip_file2 ( )
2021-09-21 15:19:49 +00:00
s3_conn . put_object ( Bucket = bucket_name , Key = " test.zip " , Body = zip_content )
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2019-02-16 18:37:46 +00:00
conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
2021-09-21 15:19:49 +00:00
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
2019-10-31 15:44:26 +00:00
Description = " test lambda function " ,
2019-02-16 18:37:46 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
response = conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
2021-09-21 15:19:49 +00:00
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
2019-10-31 15:44:26 +00:00
Description = " test lambda function " ,
2019-02-16 18:37:46 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
2021-09-21 15:19:49 +00:00
assert response [ " FunctionName " ] == function_name
2019-02-16 18:37:46 +00:00
@mock_lambda
@mock_s3
def test_list_versions_by_function_for_nonexistent_function ( ) :
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
versions = conn . list_versions_by_function ( FunctionName = function_name )
2019-02-16 18:37:46 +00:00
2019-10-31 15:44:26 +00:00
assert len ( versions [ " Versions " ] ) == 0
2019-08-21 01:54:57 +00:00
2021-08-28 06:23:44 +00:00
@pytest.mark.parametrize ( " key " , [ " FunctionName " , " FunctionArn " ] )
2019-10-08 20:59:03 +00:00
@mock_lambda
@mock_s3
2021-08-28 06:23:44 +00:00
def test_update_configuration ( key ) :
2021-09-21 15:19:49 +00:00
bucket_name = str ( uuid4 ( ) )
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2020-02-02 11:48:32 +00:00
s3_conn = boto3 . client ( " s3 " , _lambda_region )
s3_conn . create_bucket (
2021-09-21 15:19:49 +00:00
Bucket = bucket_name ,
2020-02-02 11:48:32 +00:00
CreateBucketConfiguration = { " LocationConstraint " : _lambda_region } ,
)
2019-10-08 20:59:03 +00:00
zip_content = get_test_zip_file2 ( )
2021-09-21 15:19:49 +00:00
s3_conn . put_object ( Bucket = bucket_name , Key = " test.zip " , Body = zip_content )
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2019-10-08 20:59:03 +00:00
fxn = conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
2021-09-21 15:19:49 +00:00
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
2019-10-31 15:44:26 +00:00
Description = " test lambda function " ,
2019-10-08 20:59:03 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
2019-11-04 15:22:03 +00:00
Environment = { " Variables " : { " test_old_environment " : " test_old_value " } } ,
2019-10-08 20:59:03 +00:00
)
2021-08-28 06:23:44 +00:00
name_or_arn = fxn [ key ]
2019-10-08 20:59:03 +00:00
2019-10-31 15:44:26 +00:00
assert fxn [ " Description " ] == " test lambda function "
assert fxn [ " Handler " ] == " lambda_function.lambda_handler "
assert fxn [ " MemorySize " ] == 128
assert fxn [ " Runtime " ] == " python2.7 "
assert fxn [ " Timeout " ] == 3
2019-10-08 20:59:03 +00:00
updated_config = conn . update_function_configuration (
2021-08-28 06:23:44 +00:00
FunctionName = name_or_arn ,
2019-10-31 15:44:26 +00:00
Description = " updated test lambda function " ,
Handler = " lambda_function.new_lambda_handler " ,
Runtime = " python3.6 " ,
2019-11-04 13:44:01 +00:00
Timeout = 7 ,
2020-11-18 08:45:31 +00:00
VpcConfig = { " SecurityGroupIds " : [ " sg-123abc " ] , " SubnetIds " : [ " subnet-123abc " ] } ,
2019-11-04 15:22:03 +00:00
Environment = { " Variables " : { " test_environment " : " test_value " } } ,
2019-10-08 20:59:03 +00:00
)
2019-10-31 15:44:26 +00:00
assert updated_config [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 200
assert updated_config [ " Description " ] == " updated test lambda function "
assert updated_config [ " Handler " ] == " lambda_function.new_lambda_handler "
assert updated_config [ " MemorySize " ] == 128
assert updated_config [ " Runtime " ] == " python3.6 "
assert updated_config [ " Timeout " ] == 7
2019-11-04 15:22:03 +00:00
assert updated_config [ " Environment " ] [ " Variables " ] == {
" test_environment " : " test_value "
}
2020-11-18 08:45:31 +00:00
assert updated_config [ " VpcConfig " ] == {
" SecurityGroupIds " : [ " sg-123abc " ] ,
" SubnetIds " : [ " subnet-123abc " ] ,
" VpcId " : " vpc-123abc " ,
}
2019-10-08 20:59:03 +00:00
2021-08-28 06:23:44 +00:00
@pytest.mark.parametrize ( " key " , [ " FunctionName " , " FunctionArn " ] )
2019-10-08 20:59:03 +00:00
@mock_lambda
2021-08-28 06:23:44 +00:00
def test_update_function_zip ( key ) :
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2019-10-08 20:59:03 +00:00
zip_content_one = get_test_zip_file1 ( )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2019-10-08 20:59:03 +00:00
fxn = conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
Code = { " ZipFile " : zip_content_one } ,
Description = " test lambda function " ,
2019-10-08 20:59:03 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
2021-08-28 06:23:44 +00:00
name_or_arn = fxn [ key ]
2019-10-08 20:59:03 +00:00
zip_content_two = get_test_zip_file2 ( )
2021-08-28 06:23:44 +00:00
conn . update_function_code (
FunctionName = name_or_arn , ZipFile = zip_content_two , Publish = True
2019-10-08 20:59:03 +00:00
)
2021-09-21 15:19:49 +00:00
response = conn . get_function ( FunctionName = function_name , Qualifier = " 2 " )
2019-10-31 15:44:26 +00:00
response [ " Configuration " ] . pop ( " LastModified " )
2019-10-08 20:59:03 +00:00
2019-10-31 15:44:26 +00:00
response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] . should . equal ( 200 )
assert len ( response [ " Code " ] ) == 2
assert response [ " Code " ] [ " RepositoryType " ] == " S3 "
assert response [ " Code " ] [ " Location " ] . startswith (
" s3://awslambda- {0} -tasks.s3- {0} .amazonaws.com " . format ( _lambda_region )
)
response [ " Configuration " ] . should . equal (
2019-10-08 20:59:03 +00:00
{
" CodeSha256 " : hashlib . sha256 ( zip_content_two ) . hexdigest ( ) ,
" CodeSize " : len ( zip_content_two ) ,
" Description " : " test lambda function " ,
2021-09-21 15:19:49 +00:00
" FunctionArn " : " arn:aws:lambda: {} : {} :function: {} :2 " . format (
_lambda_region , ACCOUNT_ID , function_name
2019-10-31 15:44:26 +00:00
) ,
2021-09-21 15:19:49 +00:00
" FunctionName " : function_name ,
2019-10-08 20:59:03 +00:00
" Handler " : " lambda_function.lambda_handler " ,
" MemorySize " : 128 ,
2019-11-07 17:11:13 +00:00
" Role " : fxn [ " Role " ] ,
2019-10-08 20:59:03 +00:00
" Runtime " : " python2.7 " ,
" Timeout " : 3 ,
2019-10-31 15:44:26 +00:00
" Version " : " 2 " ,
" VpcConfig " : { " SecurityGroupIds " : [ ] , " SubnetIds " : [ ] } ,
2020-01-23 18:46:24 +00:00
" State " : " Active " ,
2021-01-17 15:28:49 +00:00
" Layers " : [ ] ,
2019-10-31 15:44:26 +00:00
}
2019-10-09 21:20:49 +00:00
)
2019-10-31 15:44:26 +00:00
2019-10-09 21:20:49 +00:00
@mock_lambda
@mock_s3
def test_update_function_s3 ( ) :
2021-09-21 15:19:49 +00:00
bucket_name = str ( uuid4 ( ) )
2020-02-02 11:48:32 +00:00
s3_conn = boto3 . client ( " s3 " , _lambda_region )
s3_conn . create_bucket (
2021-09-21 15:19:49 +00:00
Bucket = bucket_name ,
2020-02-02 11:48:32 +00:00
CreateBucketConfiguration = { " LocationConstraint " : _lambda_region } ,
)
2019-10-09 21:20:49 +00:00
zip_content = get_test_zip_file1 ( )
2021-09-21 15:19:49 +00:00
s3_conn . put_object ( Bucket = bucket_name , Key = " test.zip " , Body = zip_content )
2019-10-09 21:20:49 +00:00
2020-02-02 11:48:32 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2019-10-09 21:20:49 +00:00
fxn = conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2019-10-31 15:44:26 +00:00
Runtime = " python2.7 " ,
2019-11-07 17:11:13 +00:00
Role = get_role_name ( ) ,
2019-10-31 15:44:26 +00:00
Handler = " lambda_function.lambda_handler " ,
2021-09-21 15:19:49 +00:00
Code = { " S3Bucket " : bucket_name , " S3Key " : " test.zip " } ,
2019-10-31 15:44:26 +00:00
Description = " test lambda function " ,
2019-10-09 21:20:49 +00:00
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
zip_content_two = get_test_zip_file2 ( )
2021-09-21 15:19:49 +00:00
s3_conn . put_object ( Bucket = bucket_name , Key = " test2.zip " , Body = zip_content_two )
2019-10-09 21:20:49 +00:00
2021-10-18 19:44:29 +00:00
conn . update_function_code (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
S3Bucket = bucket_name ,
2019-10-31 15:44:26 +00:00
S3Key = " test2.zip " ,
Publish = True ,
2019-10-09 21:20:49 +00:00
)
2021-09-21 15:19:49 +00:00
response = conn . get_function ( FunctionName = function_name , Qualifier = " 2 " )
2019-10-31 15:44:26 +00:00
response [ " Configuration " ] . pop ( " LastModified " )
2019-10-09 21:20:49 +00:00
2019-10-31 15:44:26 +00:00
response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] . should . equal ( 200 )
assert len ( response [ " Code " ] ) == 2
assert response [ " Code " ] [ " RepositoryType " ] == " S3 "
assert response [ " Code " ] [ " Location " ] . startswith (
" s3://awslambda- {0} -tasks.s3- {0} .amazonaws.com " . format ( _lambda_region )
)
response [ " Configuration " ] . should . equal (
2019-10-09 21:20:49 +00:00
{
" CodeSha256 " : hashlib . sha256 ( zip_content_two ) . hexdigest ( ) ,
" CodeSize " : len ( zip_content_two ) ,
" Description " : " test lambda function " ,
2021-09-21 15:19:49 +00:00
" FunctionArn " : " arn:aws:lambda: {} : {} :function: {} :2 " . format (
_lambda_region , ACCOUNT_ID , function_name
2019-10-31 15:44:26 +00:00
) ,
2021-09-21 15:19:49 +00:00
" FunctionName " : function_name ,
2019-10-09 21:20:49 +00:00
" Handler " : " lambda_function.lambda_handler " ,
" MemorySize " : 128 ,
2019-11-07 17:11:13 +00:00
" Role " : fxn [ " Role " ] ,
2019-10-09 21:20:49 +00:00
" Runtime " : " python2.7 " ,
" Timeout " : 3 ,
2019-10-31 15:44:26 +00:00
" Version " : " 2 " ,
" VpcConfig " : { " SecurityGroupIds " : [ ] , " SubnetIds " : [ ] } ,
2020-01-23 18:46:24 +00:00
" State " : " Active " ,
2021-01-17 15:28:49 +00:00
" Layers " : [ ] ,
2019-10-31 15:44:26 +00:00
}
2019-10-08 20:59:03 +00:00
)
2019-11-07 17:11:13 +00:00
@mock_lambda
def test_create_function_with_invalid_arn ( ) :
err = create_invalid_lambda ( " test-iam-role " )
2020-10-06 06:04:09 +00:00
err . value . response [ " Error " ] [ " Message " ] . should . equal (
2020-09-10 08:20:26 +00:00
r " 1 validation error detected: Value ' test-iam-role ' at ' role ' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::( \ d {12} ):role/?[a-zA-Z_0-9+=,.@ \ -_/]+ "
2019-11-07 17:11:13 +00:00
)
@mock_lambda
def test_create_function_with_arn_from_different_account ( ) :
err = create_invalid_lambda ( " arn:aws:iam::000000000000:role/example_role " )
2020-10-06 06:04:09 +00:00
err . value . response [ " Error " ] [ " Message " ] . should . equal (
2019-11-07 17:11:13 +00:00
" Cross-account pass role is not allowed. "
)
@mock_lambda
def test_create_function_with_unknown_arn ( ) :
err = create_invalid_lambda (
" arn:aws:iam:: " + str ( ACCOUNT_ID ) + " :role/service-role/unknown_role "
)
2020-10-06 06:04:09 +00:00
err . value . response [ " Error " ] [ " Message " ] . should . equal (
2019-11-07 17:11:13 +00:00
" The role defined for the function cannot be assumed by Lambda. "
)
2021-08-28 06:23:44 +00:00
@pytest.mark.parametrize ( " key " , [ " FunctionName " , " FunctionArn " ] )
2020-04-03 09:30:05 +00:00
@mock_lambda
2021-08-28 06:23:44 +00:00
def test_remove_function_permission ( key ) :
2020-04-03 09:30:05 +00:00
conn = boto3 . client ( " lambda " , _lambda_region )
zip_content = get_test_zip_file1 ( )
2021-09-21 15:19:49 +00:00
function_name = str ( uuid4 ( ) ) [ 0 : 6 ]
2021-08-28 06:23:44 +00:00
f = conn . create_function (
2021-09-21 15:19:49 +00:00
FunctionName = function_name ,
2020-04-03 09:30:05 +00:00
Runtime = " python2.7 " ,
Role = ( get_role_name ( ) ) ,
Handler = " lambda_function.handler " ,
Code = { " ZipFile " : zip_content } ,
Description = " test lambda function " ,
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
2021-08-28 06:23:44 +00:00
name_or_arn = f [ key ]
2020-04-03 09:30:05 +00:00
conn . add_permission (
2021-08-28 06:23:44 +00:00
FunctionName = name_or_arn ,
2020-04-03 09:30:05 +00:00
StatementId = " 1 " ,
Action = " lambda:InvokeFunction " ,
Principal = " 432143214321 " ,
SourceArn = " arn:aws:lambda:us-west-2:account-id:function:helloworld " ,
SourceAccount = " 123412341234 " ,
EventSourceToken = " blah " ,
Qualifier = " 2 " ,
)
remove = conn . remove_permission (
2021-08-28 06:23:44 +00:00
FunctionName = name_or_arn , StatementId = " 1 " , Qualifier = " 2 " ,
2020-04-03 09:30:05 +00:00
)
remove [ " ResponseMetadata " ] [ " HTTPStatusCode " ] . should . equal ( 204 )
2021-08-28 06:23:44 +00:00
policy = conn . get_policy ( FunctionName = name_or_arn , Qualifier = " 2 " ) [ " Policy " ]
2020-04-03 09:30:05 +00:00
policy = json . loads ( policy )
policy [ " Statement " ] . should . equal ( [ ] )