moto/tests/test_awslambda/__init__.py

271 lines
7.9 KiB
Python
Raw Normal View History

from __future__ import unicode_literals
import boto3
import hashlib
import io
import zipfile
import sure # noqa
from freezegun import freeze_time
from moto import mock_lambda
@mock_lambda
def test_list_functions():
conn = boto3.client('lambda', 'us-west-2')
result = conn.list_functions()
result['Functions'].should.have.length_of(0)
@mock_lambda
@freeze_time('2015-01-01 00:00:00')
def test_create_function_from_aws_bucket():
conn = boto3.client('lambda', 'us-west-2')
result = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.handler',
Code={
'S3Bucket': 'test-bucket',
'S3Key': 'test.zip',
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
# boto3 doesnt support it
# VpcConfig={
# "SecurityGroupIds": ["sg-123abc"],
# "SubnetIds": ["subnet-123abc"],
# "VpcId": "vpc-123abc"
# },
)
result.should.equal({
'FunctionName': 'testFunction',
'FunctionArn': 'arn:aws:lambda:123456789012:function:testFunction',
'Runtime': 'python2.7',
'Role': 'test-iam-role',
'Handler': 'lambda_function.handler',
'CodeSize': 123,
'Description': 'test lambda function',
'Timeout': 3,
'MemorySize': 128,
'LastModified': '2015-01-01 00:00:00',
'CodeSha256': 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
'Version': '$LATEST',
# boto3 doesnt support it
# VpcConfig={
# "SecurityGroupIds": ["sg-123abc"],
# "SubnetIds": ["subnet-123abc"],
# "VpcId": "vpc-123abc"
# },
'ResponseMetadata': {'HTTPStatusCode': 200},
})
@mock_lambda
@freeze_time('2015-01-01 00:00:00')
def test_create_function_from_zipfile():
conn = boto3.client('lambda', 'us-west-2')
zip_output = io.BytesIO()
with zipfile.ZipFile(zip_output, 'w') as f:
f.writestr('lambda_function.py', b'''\
def handler(event, context):
return "hello world"
''')
zip_output.seek(0)
zip_content = zip_output.read()
result = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.handler',
Code={
'ZipFile': zip_content,
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
# boto3 doesnt support it
# VpcConfig={
# "SecurityGroupIds": ["sg-123abc"],
# "SubnetIds": ["subnet-123abc"],
# "VpcId": "vpc-123abc"
# },
)
result.should.equal({
'FunctionName': 'testFunction',
'FunctionArn': 'arn:aws:lambda:123456789012:function:testFunction',
'Runtime': 'python2.7',
'Role': 'test-iam-role',
'Handler': 'lambda_function.handler',
'CodeSize': len(zip_content),
'Description': 'test lambda function',
'Timeout': 3,
'MemorySize': 128,
'LastModified': '2015-01-01 00:00:00',
'CodeSha256': hashlib.sha256(zip_content).hexdigest(),
'Version': '$LATEST',
# boto3 doesnt support it
# VpcConfig={
# "SecurityGroupIds": ["sg-123abc"],
# "SubnetIds": ["subnet-123abc"],
# "VpcId": "vpc-123abc"
# },
'ResponseMetadata': {'HTTPStatusCode': 200},
})
@mock_lambda
@freeze_time('2015-01-01 00:00:00')
def test_get_function():
conn = boto3.client('lambda', 'us-west-2')
conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.handler',
Code={
'S3Bucket': 'test-bucket',
'S3Key': 'test.zip',
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
result = conn.get_function(FunctionName='testFunction')
result.should.equal({
"Code": {
"Location": "s3://lambda-functions.aws.amazon.com/test.zip",
"RepositoryType": "S3"
},
"Configuration": {
"CodeSha256": 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
"CodeSize": 123,
"Description": "test lambda function",
"FunctionArn": "arn:aws:lambda:123456789012:function:testFunction",
"FunctionName": "testFunction",
"Handler": "lambda_function.handler",
"LastModified": "2015-01-01 00:00:00",
"MemorySize": 128,
"Role": "test-iam-role",
"Runtime": "python2.7",
"Timeout": 3,
"Version": '$LATEST',
# "VpcConfig": {
# "SecurityGroupIds": [
# "string"
# ],
# "SubnetIds": [
# "string"
# ],
# "VpcId": "string"
# }
},
'ResponseMetadata': {'HTTPStatusCode': 200},
})
@mock_lambda
def test_delete_function():
conn = boto3.client('lambda', 'us-west-2')
conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.handler',
Code={
'S3Bucket': 'test-bucket',
'S3Key': 'test.zip',
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
success_result = conn.delete_function(FunctionName='testFunction')
success_result.should.equal({'ResponseMetadata': {'HTTPStatusCode': 204}})
# FIXME:!!!!
# not_found_result = conn.delete_function(FunctionName='testFunctionThatDoesntExist')
# not_found_result.should.equal({'ResponseMetadata': {'HTTPStatusCode': 404}})
@mock_lambda
@freeze_time('2015-01-01 00:00:00')
def test_list_create_list_get_delete_list():
"""
test `list -> create -> list -> get -> delete -> list` integration
"""
conn = boto3.client('lambda', 'us-west-2')
conn.list_functions()['Functions'].should.have.length_of(0)
conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.handler',
Code={
'S3Bucket': 'test-bucket',
'S3Key': 'test.zip',
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
expected_function_result = {
"Code": {
"Location": "s3://lambda-functions.aws.amazon.com/test.zip",
"RepositoryType": "S3"
},
"Configuration": {
"CodeSha256": 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
"CodeSize": 123,
"Description": "test lambda function",
"FunctionArn": "arn:aws:lambda:123456789012:function:testFunction",
"FunctionName": "testFunction",
"Handler": "lambda_function.handler",
"LastModified": "2015-01-01 00:00:00",
"MemorySize": 128,
"Role": "test-iam-role",
"Runtime": "python2.7",
"Timeout": 3,
"Version": '$LATEST',
# "VpcConfig": {
# "SecurityGroupIds": [
# "string"
# ],
# "SubnetIds": [
# "string"
# ],
# "VpcId": "string"
# }
},
'ResponseMetadata': {'HTTPStatusCode': 200},
}
conn.list_functions()['Functions'].should.equal([expected_function_result['Configuration']])
conn.get_function(FunctionName='testFunction').should.equal(expected_function_result)
conn.delete_function(FunctionName='testFunction')
conn.list_functions()['Functions'].should.have.length_of(0)