2016-02-12 19:39:20 +00:00
from __future__ import unicode_literals
2016-10-06 09:52:23 +00:00
import base64
2016-02-16 20:38:59 +00:00
import botocore . client
2016-02-12 19:39:20 +00:00
import boto3
2016-02-16 20:15:34 +00:00
import hashlib
import io
2016-10-06 14:14:47 +00:00
import json
2016-02-16 20:15:34 +00:00
import zipfile
2016-02-12 19:39:20 +00:00
import sure # noqa
from freezegun import freeze_time
2016-10-06 09:52:23 +00:00
from moto import mock_lambda , mock_s3 , mock_ec2
2016-02-16 21:43:33 +00:00
2016-10-06 09:52:23 +00:00
def _process_lamda ( pfunc ) :
2016-02-16 21:43:33 +00:00
zip_output = io . BytesIO ( )
2016-10-06 09:52:23 +00:00
zip_file = zipfile . ZipFile ( zip_output , ' w ' , zipfile . ZIP_DEFLATED )
zip_file . writestr ( ' lambda_function.zip ' , pfunc )
2016-02-16 21:56:20 +00:00
zip_file . close ( )
2016-02-16 21:43:33 +00:00
zip_output . seek ( 0 )
return zip_output . read ( )
2016-02-12 19:39:20 +00:00
2016-10-06 09:52:23 +00:00
def get_test_zip_file1 ( ) :
2016-10-09 15:13:52 +00:00
pfunc = """
2016-10-06 09:52:23 +00:00
def lambda_handler ( event , context ) :
return ( event , context )
"""
return _process_lamda ( pfunc )
def get_test_zip_file2 ( ) :
2016-10-09 15:13:52 +00:00
pfunc = """
2016-10-06 09:52:23 +00:00
def lambda_handler ( event , context ) :
volume_id = event . get ( ' volume_id ' )
2016-10-09 15:13:52 +00:00
print ( ' get volume details for %s ' % volume_id )
2016-10-06 09:52:23 +00:00
import boto3
ec2 = boto3 . resource ( ' ec2 ' , region_name = ' us-west-2 ' )
vol = ec2 . Volume ( volume_id )
2016-10-09 15:13:52 +00:00
print ( ' Volume - %s state= %s , size= %s ' % ( volume_id , vol . state , vol . size ) )
2016-10-06 09:52:23 +00:00
"""
return _process_lamda ( pfunc )
2016-02-12 19:39:20 +00:00
@mock_lambda
2016-10-06 09:52:23 +00:00
@mock_s3
2016-02-12 19:39:20 +00:00
def test_list_functions ( ) :
conn = boto3 . client ( ' lambda ' , ' us-west-2 ' )
result = conn . list_functions ( )
result [ ' Functions ' ] . should . have . length_of ( 0 )
2016-06-22 19:24:46 +00:00
@mock_lambda
@freeze_time ( ' 2015-01-01 00:00:00 ' )
2016-10-09 15:13:52 +00:00
def test_invoke_event_function ( ) :
2016-06-22 19:24:46 +00:00
conn = boto3 . client ( ' lambda ' , ' us-west-2 ' )
conn . create_function (
FunctionName = ' testFunction ' ,
Runtime = ' python2.7 ' ,
Role = ' test-iam-role ' ,
Handler = ' lambda_function.handler ' ,
Code = {
2016-10-06 09:52:23 +00:00
' ZipFile ' : get_test_zip_file1 ( ) ,
2016-06-22 19:24:46 +00:00
} ,
Description = ' test lambda function ' ,
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
2016-10-09 15:13:52 +00:00
success_result = conn . invoke ( FunctionName = ' testFunction ' , InvocationType = ' Event ' , Payload = json . dumps ( { ' msg ' : ' Mostly Harmless ' } ) )
2016-10-06 09:52:23 +00:00
success_result [ " StatusCode " ] . should . equal ( 202 )
2016-06-22 19:24:46 +00:00
conn . invoke . when . called_with (
FunctionName = ' notAFunction ' ,
InvocationType = ' Event ' ,
Payload = ' {} '
) . should . throw ( botocore . client . ClientError )
2016-10-09 15:13:52 +00:00
@mock_lambda
@freeze_time ( ' 2015-01-01 00:00:00 ' )
def test_invoke_requestresponse_function ( ) :
conn = boto3 . client ( ' lambda ' , ' us-west-2 ' )
conn . create_function (
FunctionName = ' testFunction ' ,
Runtime = ' python2.7 ' ,
Role = ' test-iam-role ' ,
Handler = ' lambda_function.handler ' ,
Code = {
' ZipFile ' : get_test_zip_file1 ( ) ,
} ,
Description = ' test lambda function ' ,
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
2016-10-06 14:14:47 +00:00
success_result = conn . invoke ( FunctionName = ' testFunction ' , InvocationType = ' RequestResponse ' ,
Payload = json . dumps ( { ' msg ' : ' So long and thanks for all the fish ' } ) )
2016-10-06 09:52:23 +00:00
success_result [ " StatusCode " ] . should . equal ( 202 )
2016-10-09 15:13:52 +00:00
#nasty hack - hope someone has better solution dealing with unicode tests working for Py2 and Py3.
base64 . b64decode ( success_result [ " LogResult " ] ) . decode ( ' utf-8 ' ) . replace ( " u ' " , " ' " ) . should . equal ( " ( { ' msg ' : ' So long and thanks for all the fish ' }, {} ) \n \n " )
2016-10-06 09:52:23 +00:00
@mock_ec2
@mock_lambda
@freeze_time ( ' 2015-01-01 00:00:00 ' )
def test_invoke_function_get_ec2_volume ( ) :
conn = boto3 . resource ( " ec2 " , " us-west-2 " )
vol = conn . create_volume ( Size = 99 , AvailabilityZone = ' us-west-2 ' )
vol = conn . Volume ( vol . id )
conn = boto3 . client ( ' lambda ' , ' us-west-2 ' )
conn . create_function (
FunctionName = ' testFunction ' ,
Runtime = ' python2.7 ' ,
Role = ' test-iam-role ' ,
Handler = ' lambda_function.handler ' ,
Code = {
' ZipFile ' : get_test_zip_file2 ( ) ,
} ,
Description = ' test lambda function ' ,
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
import json
success_result = conn . invoke ( FunctionName = ' testFunction ' , InvocationType = ' RequestResponse ' , Payload = json . dumps ( { ' volume_id ' : vol . id } ) )
success_result [ " StatusCode " ] . should . equal ( 202 )
import base64
msg = ' get volume details for %s \n Volume - %s state= %s , size= %s \n None \n \n ' % ( vol . id , vol . id , vol . state , vol . size )
2016-10-09 15:13:52 +00:00
# yet again hacky solution to allow code to run tests for python2 and python3 - pls someone fix :(
base64 . b64decode ( success_result [ " LogResult " ] ) . decode ( ' utf-8 ' ) . replace ( " u ' " , " ' " ) . should . equal ( msg )
2016-06-22 19:24:46 +00:00
2016-02-12 19:39:20 +00:00
@mock_lambda
@freeze_time ( ' 2015-01-01 00:00:00 ' )
2016-02-16 21:43:33 +00:00
def test_create_based_on_s3_with_missing_bucket ( ) :
conn = boto3 . client ( ' lambda ' , ' us-west-2 ' )
conn . create_function . when . called_with (
FunctionName = ' testFunction ' ,
Runtime = ' python2.7 ' ,
Role = ' test-iam-role ' ,
Handler = ' lambda_function.handler ' ,
Code = {
' S3Bucket ' : ' this-bucket-does-not-exist ' ,
' S3Key ' : ' test.zip ' ,
} ,
Description = ' test lambda function ' ,
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
VpcConfig = {
" SecurityGroupIds " : [ " sg-123abc " ] ,
" SubnetIds " : [ " subnet-123abc " ] ,
} ,
) . should . throw ( botocore . client . ClientError )
@mock_lambda
@mock_s3
@freeze_time ( ' 2015-01-01 00:00:00 ' )
2016-02-16 19:17:10 +00:00
def test_create_function_from_aws_bucket ( ) :
2016-02-16 21:43:33 +00:00
s3_conn = boto3 . client ( ' s3 ' , ' us-west-2 ' )
s3_conn . create_bucket ( Bucket = ' test-bucket ' )
2016-10-06 09:52:23 +00:00
zip_content = get_test_zip_file2 ( )
2016-10-09 15:13:52 +00:00
2016-02-17 21:24:17 +00:00
s3_conn . put_object ( Bucket = ' test-bucket ' , Key = ' test.zip ' , Body = zip_content )
2016-02-12 19:39:20 +00:00
conn = boto3 . client ( ' lambda ' , ' us-west-2 ' )
result = conn . create_function (
FunctionName = ' testFunction ' ,
Runtime = ' python2.7 ' ,
Role = ' test-iam-role ' ,
2016-02-16 19:17:10 +00:00
Handler = ' lambda_function.handler ' ,
2016-02-12 19:39:20 +00:00
Code = {
' S3Bucket ' : ' test-bucket ' ,
' S3Key ' : ' test.zip ' ,
} ,
Description = ' test lambda function ' ,
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
2016-02-16 20:35:29 +00:00
VpcConfig = {
" SecurityGroupIds " : [ " sg-123abc " ] ,
" SubnetIds " : [ " subnet-123abc " ] ,
} ,
2016-02-12 19:39:20 +00:00
)
2016-10-09 15:13:52 +00:00
result [ ' ResponseMetadata ' ] . pop ( ' HTTPHeaders ' , None ) # this is hard to match against, so remove it
result [ ' ResponseMetadata ' ] . pop ( ' RetryAttempts ' , None ) # Botocore inserts retry attempts not seen in Python27
2016-02-12 19:39:20 +00:00
result . should . equal ( {
' FunctionName ' : ' testFunction ' ,
' FunctionArn ' : ' arn:aws:lambda:123456789012:function:testFunction ' ,
' Runtime ' : ' python2.7 ' ,
' Role ' : ' test-iam-role ' ,
2016-02-16 20:15:34 +00:00
' Handler ' : ' lambda_function.handler ' ,
2016-02-17 21:24:17 +00:00
" CodeSha256 " : hashlib . sha256 ( zip_content ) . hexdigest ( ) ,
" CodeSize " : len ( zip_content ) ,
2016-02-12 19:39:20 +00:00
' Description ' : ' test lambda function ' ,
' Timeout ' : 3 ,
' MemorySize ' : 128 ,
' LastModified ' : ' 2015-01-01 00:00:00 ' ,
2016-02-16 20:15:34 +00:00
' Version ' : ' $LATEST ' ,
2016-02-16 20:35:29 +00:00
' VpcConfig ' : {
" SecurityGroupIds " : [ " sg-123abc " ] ,
" SubnetIds " : [ " subnet-123abc " ] ,
" VpcId " : " vpc-123abc "
} ,
2016-02-16 20:24:41 +00:00
' ResponseMetadata ' : { ' HTTPStatusCode ' : 201 } ,
2016-02-16 20:15:34 +00:00
} )
@mock_lambda
@freeze_time ( ' 2015-01-01 00:00:00 ' )
def test_create_function_from_zipfile ( ) :
conn = boto3 . client ( ' lambda ' , ' us-west-2 ' )
2016-10-06 09:52:23 +00:00
zip_content = get_test_zip_file1 ( )
2016-02-16 20:15:34 +00:00
result = conn . create_function (
FunctionName = ' testFunction ' ,
Runtime = ' python2.7 ' ,
Role = ' test-iam-role ' ,
Handler = ' lambda_function.handler ' ,
Code = {
' ZipFile ' : zip_content ,
} ,
Description = ' test lambda function ' ,
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
2016-10-09 15:13:52 +00:00
result [ ' ResponseMetadata ' ] . pop ( ' HTTPHeaders ' , None ) # this is hard to match against, so remove it
result [ ' ResponseMetadata ' ] . pop ( ' RetryAttempts ' , None ) # Botocore inserts retry attempts not seen in Python27
2016-02-16 20:15:34 +00:00
result . should . equal ( {
' FunctionName ' : ' testFunction ' ,
' FunctionArn ' : ' arn:aws:lambda:123456789012:function:testFunction ' ,
' Runtime ' : ' python2.7 ' ,
' Role ' : ' test-iam-role ' ,
' Handler ' : ' lambda_function.handler ' ,
' CodeSize ' : len ( zip_content ) ,
' Description ' : ' test lambda function ' ,
' Timeout ' : 3 ,
' MemorySize ' : 128 ,
' LastModified ' : ' 2015-01-01 00:00:00 ' ,
' CodeSha256 ' : hashlib . sha256 ( zip_content ) . hexdigest ( ) ,
2016-02-12 19:39:20 +00:00
' Version ' : ' $LATEST ' ,
2016-02-16 20:35:29 +00:00
' VpcConfig ' : {
" SecurityGroupIds " : [ ] ,
" SubnetIds " : [ ] ,
} ,
2016-02-12 19:39:20 +00:00
2016-02-16 20:24:41 +00:00
' ResponseMetadata ' : { ' HTTPStatusCode ' : 201 } ,
2016-02-12 19:39:20 +00:00
} )
@mock_lambda
2016-02-16 21:43:33 +00:00
@mock_s3
2016-02-12 19:39:20 +00:00
@freeze_time ( ' 2015-01-01 00:00:00 ' )
def test_get_function ( ) :
2016-02-16 21:43:33 +00:00
s3_conn = boto3 . client ( ' s3 ' , ' us-west-2 ' )
s3_conn . create_bucket ( Bucket = ' test-bucket ' )
2016-02-17 21:24:17 +00:00
2016-10-06 09:52:23 +00:00
zip_content = get_test_zip_file1 ( )
2016-02-17 21:24:17 +00:00
s3_conn . put_object ( Bucket = ' test-bucket ' , Key = ' test.zip ' , Body = zip_content )
2016-02-12 19:39:20 +00:00
conn = boto3 . client ( ' lambda ' , ' us-west-2 ' )
conn . create_function (
FunctionName = ' testFunction ' ,
Runtime = ' python2.7 ' ,
Role = ' test-iam-role ' ,
2016-02-16 19:17:10 +00:00
Handler = ' lambda_function.handler ' ,
2016-02-12 19:39:20 +00:00
Code = {
' S3Bucket ' : ' test-bucket ' ,
' S3Key ' : ' test.zip ' ,
} ,
Description = ' test lambda function ' ,
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
result = conn . get_function ( FunctionName = ' testFunction ' )
2016-10-09 15:13:52 +00:00
result [ ' ResponseMetadata ' ] . pop ( ' HTTPHeaders ' , None ) # this is hard to match against, so remove it
result [ ' ResponseMetadata ' ] . pop ( ' RetryAttempts ' , None ) # Botocore inserts retry attempts not seen in Python27
2016-02-12 19:39:20 +00:00
result . should . equal ( {
" Code " : {
" Location " : " s3://lambda-functions.aws.amazon.com/test.zip " ,
" RepositoryType " : " S3 "
} ,
" Configuration " : {
2016-02-17 21:24:17 +00:00
" CodeSha256 " : hashlib . sha256 ( zip_content ) . hexdigest ( ) ,
" CodeSize " : len ( zip_content ) ,
2016-02-12 19:39:20 +00:00
" Description " : " test lambda function " ,
" FunctionArn " : " arn:aws:lambda:123456789012:function:testFunction " ,
" FunctionName " : " testFunction " ,
2016-02-16 19:17:10 +00:00
" Handler " : " lambda_function.handler " ,
2016-02-12 19:39:20 +00:00
" LastModified " : " 2015-01-01 00:00:00 " ,
" MemorySize " : 128 ,
" Role " : " test-iam-role " ,
" Runtime " : " python2.7 " ,
" Timeout " : 3 ,
" Version " : ' $LATEST ' ,
2016-02-16 20:35:29 +00:00
" VpcConfig " : {
" SecurityGroupIds " : [ ] ,
" SubnetIds " : [ ] ,
}
2016-02-12 19:39:20 +00:00
} ,
' ResponseMetadata ' : { ' HTTPStatusCode ' : 200 } ,
} )
@mock_lambda
2016-02-16 21:43:33 +00:00
@mock_s3
2016-02-12 19:39:20 +00:00
def test_delete_function ( ) :
2016-02-16 21:43:33 +00:00
s3_conn = boto3 . client ( ' s3 ' , ' us-west-2 ' )
s3_conn . create_bucket ( Bucket = ' test-bucket ' )
2016-02-17 21:24:17 +00:00
2016-10-06 09:52:23 +00:00
zip_content = get_test_zip_file2 ( )
2016-02-17 21:24:17 +00:00
s3_conn . put_object ( Bucket = ' test-bucket ' , Key = ' test.zip ' , Body = zip_content )
2016-02-12 19:39:20 +00:00
conn = boto3 . client ( ' lambda ' , ' us-west-2 ' )
conn . create_function (
FunctionName = ' testFunction ' ,
Runtime = ' python2.7 ' ,
Role = ' test-iam-role ' ,
2016-02-16 19:17:10 +00:00
Handler = ' lambda_function.handler ' ,
2016-02-12 19:39:20 +00:00
Code = {
' S3Bucket ' : ' test-bucket ' ,
' S3Key ' : ' test.zip ' ,
} ,
Description = ' test lambda function ' ,
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
success_result = conn . delete_function ( FunctionName = ' testFunction ' )
2016-10-09 15:13:52 +00:00
success_result [ ' ResponseMetadata ' ] . pop ( ' HTTPHeaders ' , None ) # this is hard to match against, so remove it
success_result [ ' ResponseMetadata ' ] . pop ( ' RetryAttempts ' , None ) # Botocore inserts retry attempts not seen in Python27
2016-02-12 19:39:20 +00:00
success_result . should . equal ( { ' ResponseMetadata ' : { ' HTTPStatusCode ' : 204 } } )
2016-02-16 20:38:59 +00:00
conn . delete_function . when . called_with ( FunctionName = ' testFunctionThatDoesntExist ' ) . should . throw ( botocore . client . ClientError )
2016-02-12 19:39:20 +00:00
@mock_lambda
2016-02-16 21:43:33 +00:00
@mock_s3
2016-02-12 19:39:20 +00:00
@freeze_time ( ' 2015-01-01 00:00:00 ' )
def test_list_create_list_get_delete_list ( ) :
"""
test ` list - > create - > list - > get - > delete - > list ` integration
"""
2016-02-16 21:43:33 +00:00
s3_conn = boto3 . client ( ' s3 ' , ' us-west-2 ' )
s3_conn . create_bucket ( Bucket = ' test-bucket ' )
2016-02-17 21:24:17 +00:00
2016-10-06 09:52:23 +00:00
zip_content = get_test_zip_file2 ( )
2016-02-17 21:24:17 +00:00
s3_conn . put_object ( Bucket = ' test-bucket ' , Key = ' test.zip ' , Body = zip_content )
2016-02-12 19:39:20 +00:00
conn = boto3 . client ( ' lambda ' , ' us-west-2 ' )
conn . list_functions ( ) [ ' Functions ' ] . should . have . length_of ( 0 )
conn . create_function (
FunctionName = ' testFunction ' ,
Runtime = ' python2.7 ' ,
Role = ' test-iam-role ' ,
2016-02-16 19:17:10 +00:00
Handler = ' lambda_function.handler ' ,
2016-02-12 19:39:20 +00:00
Code = {
' S3Bucket ' : ' test-bucket ' ,
' S3Key ' : ' test.zip ' ,
} ,
Description = ' test lambda function ' ,
Timeout = 3 ,
MemorySize = 128 ,
Publish = True ,
)
expected_function_result = {
" Code " : {
" Location " : " s3://lambda-functions.aws.amazon.com/test.zip " ,
" RepositoryType " : " S3 "
} ,
" Configuration " : {
2016-02-17 21:24:17 +00:00
" CodeSha256 " : hashlib . sha256 ( zip_content ) . hexdigest ( ) ,
" CodeSize " : len ( zip_content ) ,
2016-02-12 19:39:20 +00:00
" Description " : " test lambda function " ,
" FunctionArn " : " arn:aws:lambda:123456789012:function:testFunction " ,
" FunctionName " : " testFunction " ,
2016-02-16 19:17:10 +00:00
" Handler " : " lambda_function.handler " ,
2016-02-12 19:39:20 +00:00
" LastModified " : " 2015-01-01 00:00:00 " ,
" MemorySize " : 128 ,
" Role " : " test-iam-role " ,
" Runtime " : " python2.7 " ,
" Timeout " : 3 ,
" Version " : ' $LATEST ' ,
2016-02-16 20:35:29 +00:00
" VpcConfig " : {
" SecurityGroupIds " : [ ] ,
" SubnetIds " : [ ] ,
}
2016-02-12 19:39:20 +00:00
} ,
' ResponseMetadata ' : { ' HTTPStatusCode ' : 200 } ,
}
conn . list_functions ( ) [ ' Functions ' ] . should . equal ( [ expected_function_result [ ' Configuration ' ] ] )
2016-06-29 23:45:21 +00:00
func = conn . get_function ( FunctionName = ' testFunction ' )
2016-10-09 15:13:52 +00:00
func [ ' ResponseMetadata ' ] . pop ( ' HTTPHeaders ' , None ) # this is hard to match against, so remove it
func [ ' ResponseMetadata ' ] . pop ( ' RetryAttempts ' , None ) # Botocore inserts retry attempts not seen in Python27
2016-06-29 23:45:21 +00:00
func . should . equal ( expected_function_result )
2016-02-12 19:39:20 +00:00
conn . delete_function ( FunctionName = ' testFunction ' )
conn . list_functions ( ) [ ' Functions ' ] . should . have . length_of ( 0 )