Added basic cognitoidentity (not working)

This commit is contained in:
Barry Ruffner 2018-03-28 12:40:42 -07:00
parent 1a8a4a084d
commit f4f79b2a8e
6 changed files with 1069 additions and 0 deletions

View File

@ -0,0 +1,7 @@
from __future__ import unicode_literals
from .models import cognitoidentity_backends
from ..core.models import base_decorator, deprecated_base_decorator
cognitoidentity_backend = cognitoidentity_backends['us-east-1']
mock_datapipeline = base_decorator(cognitoidentity_backends)
mock_datapipeline_deprecated = deprecated_base_decorator(cognitoidentity_backends)

View File

@ -0,0 +1,163 @@
from __future__ import unicode_literals
import datetime
import boto.cognito.identity
from moto.compat import OrderedDict
from moto.core import BaseBackend, BaseModel
from .utils import get_random_pipeline_id, remove_capitalization_of_dict_keys
class CognitoIdentityObject(BaseModel):
def __init__(self, object_id, name, fields):
self.object_id = object_id
self.name = name
self.fields = fields
def to_json(self):
return {
"fields": self.fields,
"id": self.object_id,
"name": self.name,
}
class CognitoIdentity(BaseModel):
def __init__(self, name, unique_id, **kwargs):
self.name = name
# self.unique_id = unique_id
# self.description = kwargs.get('description', '')
self.identity_pool_id = get_random_identity_id()
# self.creation_time = datetime.datetime.utcnow()
# self.objects = []
# self.status = "PENDING"
# self.tags = kwargs.get('tags', [])
@property
def physical_resource_id(self):
return self.pipeline_id
def to_meta_json(self):
return {
"id": self.pipeline_id,
"name": self.name,
}
def to_json(self):
return {
"description": self.description,
"fields": [{
"key": "@pipelineState",
"stringValue": self.status,
}, {
"key": "description",
"stringValue": self.description
}, {
"key": "name",
"stringValue": self.name
}, {
"key": "@creationTime",
"stringValue": datetime.datetime.strftime(self.creation_time, '%Y-%m-%dT%H-%M-%S'),
}, {
"key": "@id",
"stringValue": self.pipeline_id,
}, {
"key": "@sphere",
"stringValue": "PIPELINE"
}, {
"key": "@version",
"stringValue": "1"
}, {
"key": "@userId",
"stringValue": "924374875933"
}, {
"key": "@accountId",
"stringValue": "924374875933"
}, {
"key": "uniqueId",
"stringValue": self.unique_id
}],
"name": self.name,
"pipelineId": self.pipeline_id,
"tags": self.tags
}
def set_pipeline_objects(self, pipeline_objects):
self.objects = [
PipelineObject(pipeline_object['id'], pipeline_object[
'name'], pipeline_object['fields'])
for pipeline_object in remove_capitalization_of_dict_keys(pipeline_objects)
]
def activate(self):
self.status = "SCHEDULED"
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
datapipeline_backend = cognitoidentity_backends[region_name]
properties = cloudformation_json["Properties"]
cloudformation_unique_id = "cf-" + properties["Name"]
pipeline = datapipeline_backend.create_pipeline(
properties["Name"], cloudformation_unique_id)
datapipeline_backend.put_pipeline_definition(
pipeline.pipeline_id, properties["PipelineObjects"])
if properties["Activate"]:
pipeline.activate()
return pipeline
class CognitoIdentityBackend(BaseBackend):
def __init__(self, region):
self.region = region
self.identity_pools = OrderedDict()
def create_identity_pool(self, identity_pool_name, allow_unauthenticated_identities, , region='us-east-1',**kwargs):
identity_pool = CognitoIdentity(identity_pool_name, allow_unauthenticated_identities, **kwargs)
self.identity_pools[identity_pool.identity_pool_name] = identity_pool
return identity_pool
def delete_identity_pool(self, identity_pool_id):
pass
def list_pipelines(self):
return self.pipelines.values()
def describe_pipelines(self, pipeline_ids):
pipelines = [pipeline for pipeline in self.pipelines.values(
) if pipeline.pipeline_id in pipeline_ids]
return pipelines
def get_pipeline(self, pipeline_id):
return self.pipelines[pipeline_id]
def delete_pipeline(self, pipeline_id):
self.pipelines.pop(pipeline_id, None)
def put_pipeline_definition(self, pipeline_id, pipeline_objects):
pipeline = self.get_pipeline(pipeline_id)
pipeline.set_pipeline_objects(pipeline_objects)
def get_pipeline_definition(self, pipeline_id):
pipeline = self.get_pipeline(pipeline_id)
return pipeline.objects
def describe_objects(self, object_ids, pipeline_id):
pipeline = self.get_pipeline(pipeline_id)
pipeline_objects = [
pipeline_object for pipeline_object in pipeline.objects
if pipeline_object.object_id in object_ids
]
return pipeline_objects
def activate_pipeline(self, pipeline_id):
pipeline = self.get_pipeline(pipeline_id)
pipeline.activate()
cognitoidentity_backends = {}
for region in boto.cognito.identity.regions():
cognitoidentity_backends[region.name] = CognitoIdentityBackend(region.name)

View File

@ -0,0 +1,104 @@
from __future__ import unicode_literals
import json
from moto.core.responses import BaseResponse
from .models import datapipeline_backends
class DataPipelineResponse(BaseResponse):
@property
def parameters(self):
# TODO this should really be moved to core/responses.py
if self.body:
return json.loads(self.body)
else:
return self.querystring
@property
def datapipeline_backend(self):
return datapipeline_backends[self.region]
def create_pipeline(self):
name = self.parameters.get('name')
unique_id = self.parameters.get('uniqueId')
description = self.parameters.get('description', '')
tags = self.parameters.get('tags', [])
pipeline = self.datapipeline_backend.create_pipeline(name, unique_id, description=description, tags=tags)
return json.dumps({
"pipelineId": pipeline.pipeline_id,
})
def list_pipelines(self):
pipelines = list(self.datapipeline_backend.list_pipelines())
pipeline_ids = [pipeline.pipeline_id for pipeline in pipelines]
max_pipelines = 50
marker = self.parameters.get('marker')
if marker:
start = pipeline_ids.index(marker) + 1
else:
start = 0
pipelines_resp = pipelines[start:start + max_pipelines]
has_more_results = False
marker = None
if start + max_pipelines < len(pipeline_ids) - 1:
has_more_results = True
marker = pipelines_resp[-1].pipeline_id
return json.dumps({
"hasMoreResults": has_more_results,
"marker": marker,
"pipelineIdList": [
pipeline.to_meta_json() for pipeline in pipelines_resp
]
})
def describe_pipelines(self):
pipeline_ids = self.parameters["pipelineIds"]
pipelines = self.datapipeline_backend.describe_pipelines(pipeline_ids)
return json.dumps({
"pipelineDescriptionList": [
pipeline.to_json() for pipeline in pipelines
]
})
def delete_pipeline(self):
pipeline_id = self.parameters["pipelineId"]
self.datapipeline_backend.delete_pipeline(pipeline_id)
return json.dumps({})
def put_pipeline_definition(self):
pipeline_id = self.parameters["pipelineId"]
pipeline_objects = self.parameters["pipelineObjects"]
self.datapipeline_backend.put_pipeline_definition(
pipeline_id, pipeline_objects)
return json.dumps({"errored": False})
def get_pipeline_definition(self):
pipeline_id = self.parameters["pipelineId"]
pipeline_definition = self.datapipeline_backend.get_pipeline_definition(
pipeline_id)
return json.dumps({
"pipelineObjects": [pipeline_object.to_json() for pipeline_object in pipeline_definition]
})
def describe_objects(self):
pipeline_id = self.parameters["pipelineId"]
object_ids = self.parameters["objectIds"]
pipeline_objects = self.datapipeline_backend.describe_objects(
object_ids, pipeline_id)
return json.dumps({
"hasMoreResults": False,
"marker": None,
"pipelineObjects": [
pipeline_object.to_json() for pipeline_object in pipeline_objects
]
})
def activate_pipeline(self):
pipeline_id = self.parameters["pipelineId"]
self.datapipeline_backend.activate_pipeline(pipeline_id)
return json.dumps({})

View File

@ -0,0 +1,10 @@
from __future__ import unicode_literals
from .responses import DataPipelineResponse
url_bases = [
"https?://datapipeline.(.+).amazonaws.com",
]
url_paths = {
'{0}/$': DataPipelineResponse.dispatch,
}

View File

@ -0,0 +1,23 @@
import collections
import six
from moto.core.utils import get_random_hex
def get_random_identity_id(region):
return "{0}:{0}".format(region, get_random_hex(length=19))
def remove_capitalization_of_dict_keys(obj):
if isinstance(obj, collections.Mapping):
result = obj.__class__()
for key, value in obj.items():
normalized_key = key[:1].lower() + key[1:]
result[normalized_key] = remove_capitalization_of_dict_keys(value)
return result
elif isinstance(obj, collections.Iterable) and not isinstance(obj, six.string_types):
result = obj.__class__()
for item in obj:
result += (remove_capitalization_of_dict_keys(item),)
return result
else:
return obj

View File

@ -0,0 +1,762 @@
from __future__ import unicode_literals
import base64
import botocore.client
import boto3
import hashlib
import io
import json
import zipfile
import sure # noqa
from freezegun import freeze_time
from moto import mock_lambda, mock_s3, mock_ec2, settings
_lambda_region = 'us-west-2'
def _process_lambda(func_str):
zip_output = io.BytesIO()
zip_file = zipfile.ZipFile(zip_output, 'w', zipfile.ZIP_DEFLATED)
zip_file.writestr('lambda_function.py', func_str)
zip_file.close()
zip_output.seek(0)
return zip_output.read()
def get_test_zip_file1():
pfunc = """
def lambda_handler(event, context):
return event
"""
return _process_lambda(pfunc)
def get_test_zip_file2():
func_str = """
import boto3
def lambda_handler(event, context):
ec2 = boto3.resource('ec2', region_name='us-west-2', endpoint_url='http://{base_url}')
volume_id = event.get('volume_id')
vol = ec2.Volume(volume_id)
print('get volume details for %s\\nVolume - %s state=%s, size=%s' % (volume_id, volume_id, vol.state, vol.size))
return event
""".format(base_url="motoserver:5000" if settings.TEST_SERVER_MODE else "ec2.us-west-2.amazonaws.com")
return _process_lambda(func_str)
@mock_lambda
def test_list_functions():
conn = boto3.client('lambda', 'us-west-2')
result = conn.list_functions()
result['Functions'].should.have.length_of(0)
@mock_lambda
def test_invoke_requestresponse_function():
conn = boto3.client('lambda', 'us-west-2')
conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'ZipFile': get_test_zip_file1(),
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
in_data = {'msg': 'So long and thanks for all the fish'}
success_result = conn.invoke(FunctionName='testFunction', InvocationType='RequestResponse',
Payload=json.dumps(in_data))
success_result["StatusCode"].should.equal(202)
result_obj = json.loads(
base64.b64decode(success_result["LogResult"]).decode('utf-8'))
result_obj.should.equal(in_data)
payload = success_result["Payload"].read().decode('utf-8')
json.loads(payload).should.equal(in_data)
@mock_lambda
def test_invoke_event_function():
conn = boto3.client('lambda', 'us-west-2')
conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'ZipFile': get_test_zip_file1(),
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
conn.invoke.when.called_with(
FunctionName='notAFunction',
InvocationType='Event',
Payload='{}'
).should.throw(botocore.client.ClientError)
in_data = {'msg': 'So long and thanks for all the fish'}
success_result = conn.invoke(
FunctionName='testFunction', InvocationType='Event', Payload=json.dumps(in_data))
success_result["StatusCode"].should.equal(202)
json.loads(success_result['Payload'].read().decode(
'utf-8')).should.equal({})
if settings.TEST_SERVER_MODE:
@mock_ec2
@mock_lambda
def test_invoke_function_get_ec2_volume():
conn = boto3.resource("ec2", "us-west-2")
vol = conn.create_volume(Size=99, AvailabilityZone='us-west-2')
vol = conn.Volume(vol.id)
conn = boto3.client('lambda', 'us-west-2')
conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'ZipFile': get_test_zip_file2(),
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
in_data = {'volume_id': vol.id}
result = conn.invoke(FunctionName='testFunction',
InvocationType='RequestResponse', Payload=json.dumps(in_data))
result["StatusCode"].should.equal(202)
msg = 'get volume details for %s\nVolume - %s state=%s, size=%s\n%s' % (
vol.id, vol.id, vol.state, vol.size, json.dumps(in_data))
log_result = base64.b64decode(result["LogResult"]).decode('utf-8')
# fix for running under travis (TODO: investigate why it has an extra newline)
log_result = log_result.replace('\n\n', '\n')
log_result.should.equal(msg)
payload = result['Payload'].read().decode('utf-8')
# fix for running under travis (TODO: investigate why it has an extra newline)
payload = payload.replace('\n\n', '\n')
payload.should.equal(msg)
@mock_lambda
def test_create_based_on_s3_with_missing_bucket():
conn = boto3.client('lambda', 'us-west-2')
conn.create_function.when.called_with(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'S3Bucket': 'this-bucket-does-not-exist',
'S3Key': 'test.zip',
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
VpcConfig={
"SecurityGroupIds": ["sg-123abc"],
"SubnetIds": ["subnet-123abc"],
},
).should.throw(botocore.client.ClientError)
@mock_lambda
@mock_s3
@freeze_time('2015-01-01 00:00:00')
def test_create_function_from_aws_bucket():
s3_conn = boto3.client('s3', 'us-west-2')
s3_conn.create_bucket(Bucket='test-bucket')
zip_content = get_test_zip_file2()
s3_conn.put_object(Bucket='test-bucket', Key='test.zip', Body=zip_content)
conn = boto3.client('lambda', 'us-west-2')
result = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'S3Bucket': 'test-bucket',
'S3Key': 'test.zip',
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
VpcConfig={
"SecurityGroupIds": ["sg-123abc"],
"SubnetIds": ["subnet-123abc"],
},
)
# this is hard to match against, so remove it
result['ResponseMetadata'].pop('HTTPHeaders', None)
# Botocore inserts retry attempts not seen in Python27
result['ResponseMetadata'].pop('RetryAttempts', None)
result.pop('LastModified')
result.should.equal({
'FunctionName': 'testFunction',
'FunctionArn': 'arn:aws:lambda:{}:123456789012:function:testFunction:$LATEST'.format(_lambda_region),
'Runtime': 'python2.7',
'Role': 'test-iam-role',
'Handler': 'lambda_function.lambda_handler',
"CodeSha256": hashlib.sha256(zip_content).hexdigest(),
"CodeSize": len(zip_content),
'Description': 'test lambda function',
'Timeout': 3,
'MemorySize': 128,
'Version': '$LATEST',
'VpcConfig': {
"SecurityGroupIds": ["sg-123abc"],
"SubnetIds": ["subnet-123abc"],
"VpcId": "vpc-123abc"
},
'ResponseMetadata': {'HTTPStatusCode': 201},
})
@mock_lambda
@freeze_time('2015-01-01 00:00:00')
def test_create_function_from_zipfile():
conn = boto3.client('lambda', 'us-west-2')
zip_content = get_test_zip_file1()
result = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'ZipFile': zip_content,
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
# this is hard to match against, so remove it
result['ResponseMetadata'].pop('HTTPHeaders', None)
# Botocore inserts retry attempts not seen in Python27
result['ResponseMetadata'].pop('RetryAttempts', None)
result.pop('LastModified')
result.should.equal({
'FunctionName': 'testFunction',
'FunctionArn': 'arn:aws:lambda:{}:123456789012:function:testFunction:$LATEST'.format(_lambda_region),
'Runtime': 'python2.7',
'Role': 'test-iam-role',
'Handler': 'lambda_function.lambda_handler',
'CodeSize': len(zip_content),
'Description': 'test lambda function',
'Timeout': 3,
'MemorySize': 128,
'CodeSha256': hashlib.sha256(zip_content).hexdigest(),
'Version': '$LATEST',
'VpcConfig': {
"SecurityGroupIds": [],
"SubnetIds": [],
},
'ResponseMetadata': {'HTTPStatusCode': 201},
})
@mock_lambda
@mock_s3
@freeze_time('2015-01-01 00:00:00')
def test_get_function():
s3_conn = boto3.client('s3', 'us-west-2')
s3_conn.create_bucket(Bucket='test-bucket')
zip_content = get_test_zip_file1()
s3_conn.put_object(Bucket='test-bucket', Key='test.zip', Body=zip_content)
conn = boto3.client('lambda', 'us-west-2')
conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'S3Bucket': 'test-bucket',
'S3Key': 'test.zip',
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
result = conn.get_function(FunctionName='testFunction')
# this is hard to match against, so remove it
result['ResponseMetadata'].pop('HTTPHeaders', None)
# Botocore inserts retry attempts not seen in Python27
result['ResponseMetadata'].pop('RetryAttempts', None)
result['Configuration'].pop('LastModified')
result['Code']['Location'].should.equal('s3://awslambda-{0}-tasks.s3-{0}.amazonaws.com/test.zip'.format(_lambda_region))
result['Code']['RepositoryType'].should.equal('S3')
result['Configuration']['CodeSha256'].should.equal(hashlib.sha256(zip_content).hexdigest())
result['Configuration']['CodeSize'].should.equal(len(zip_content))
result['Configuration']['Description'].should.equal('test lambda function')
result['Configuration'].should.contain('FunctionArn')
result['Configuration']['FunctionName'].should.equal('testFunction')
result['Configuration']['Handler'].should.equal('lambda_function.lambda_handler')
result['Configuration']['MemorySize'].should.equal(128)
result['Configuration']['Role'].should.equal('test-iam-role')
result['Configuration']['Runtime'].should.equal('python2.7')
result['Configuration']['Timeout'].should.equal(3)
result['Configuration']['Version'].should.equal('$LATEST')
result['Configuration'].should.contain('VpcConfig')
# Test get function with
result = conn.get_function(FunctionName='testFunction', Qualifier='$LATEST')
result['Configuration']['Version'].should.equal('$LATEST')
@mock_lambda
@mock_s3
def test_delete_function():
s3_conn = boto3.client('s3', 'us-west-2')
s3_conn.create_bucket(Bucket='test-bucket')
zip_content = get_test_zip_file2()
s3_conn.put_object(Bucket='test-bucket', Key='test.zip', Body=zip_content)
conn = boto3.client('lambda', 'us-west-2')
conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'S3Bucket': 'test-bucket',
'S3Key': 'test.zip',
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
success_result = conn.delete_function(FunctionName='testFunction')
# this is hard to match against, so remove it
success_result['ResponseMetadata'].pop('HTTPHeaders', None)
# Botocore inserts retry attempts not seen in Python27
success_result['ResponseMetadata'].pop('RetryAttempts', None)
success_result.should.equal({'ResponseMetadata': {'HTTPStatusCode': 204}})
conn.delete_function.when.called_with(
FunctionName='testFunctionThatDoesntExist').should.throw(botocore.client.ClientError)
@mock_lambda
@mock_s3
def test_publish():
s3_conn = boto3.client('s3', 'us-west-2')
s3_conn.create_bucket(Bucket='test-bucket')
zip_content = get_test_zip_file2()
s3_conn.put_object(Bucket='test-bucket', Key='test.zip', Body=zip_content)
conn = boto3.client('lambda', 'us-west-2')
conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'S3Bucket': 'test-bucket',
'S3Key': 'test.zip',
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
function_list = conn.list_functions()
function_list['Functions'].should.have.length_of(1)
latest_arn = function_list['Functions'][0]['FunctionArn']
conn.publish_version(FunctionName='testFunction')
function_list = conn.list_functions()
function_list['Functions'].should.have.length_of(2)
# #SetComprehension ;-)
published_arn = list({f['FunctionArn'] for f in function_list['Functions']} - {latest_arn})[0]
published_arn.should.contain('testFunction:1')
conn.delete_function(FunctionName='testFunction', Qualifier='1')
function_list = conn.list_functions()
function_list['Functions'].should.have.length_of(1)
function_list['Functions'][0]['FunctionArn'].should.contain('testFunction:$LATEST')
@mock_lambda
@mock_s3
@freeze_time('2015-01-01 00:00:00')
def test_list_create_list_get_delete_list():
"""
test `list -> create -> list -> get -> delete -> list` integration
"""
s3_conn = boto3.client('s3', 'us-west-2')
s3_conn.create_bucket(Bucket='test-bucket')
zip_content = get_test_zip_file2()
s3_conn.put_object(Bucket='test-bucket', Key='test.zip', Body=zip_content)
conn = boto3.client('lambda', 'us-west-2')
conn.list_functions()['Functions'].should.have.length_of(0)
conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'S3Bucket': 'test-bucket',
'S3Key': 'test.zip',
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
expected_function_result = {
"Code": {
"Location": "s3://awslambda-{0}-tasks.s3-{0}.amazonaws.com/test.zip".format(_lambda_region),
"RepositoryType": "S3"
},
"Configuration": {
"CodeSha256": hashlib.sha256(zip_content).hexdigest(),
"CodeSize": len(zip_content),
"Description": "test lambda function",
"FunctionArn": 'arn:aws:lambda:{}:123456789012:function:testFunction:$LATEST'.format(_lambda_region),
"FunctionName": "testFunction",
"Handler": "lambda_function.lambda_handler",
"MemorySize": 128,
"Role": "test-iam-role",
"Runtime": "python2.7",
"Timeout": 3,
"Version": '$LATEST',
"VpcConfig": {
"SecurityGroupIds": [],
"SubnetIds": [],
}
},
'ResponseMetadata': {'HTTPStatusCode': 200},
}
func = conn.list_functions()['Functions'][0]
func.pop('LastModified')
func.should.equal(expected_function_result['Configuration'])
func = conn.get_function(FunctionName='testFunction')
# this is hard to match against, so remove it
func['ResponseMetadata'].pop('HTTPHeaders', None)
# Botocore inserts retry attempts not seen in Python27
func['ResponseMetadata'].pop('RetryAttempts', None)
func['Configuration'].pop('LastModified')
func.should.equal(expected_function_result)
conn.delete_function(FunctionName='testFunction')
conn.list_functions()['Functions'].should.have.length_of(0)
@mock_lambda
def test_invoke_lambda_error():
lambda_fx = """
def lambda_handler(event, context):
raise Exception('failsauce')
"""
zip_output = io.BytesIO()
zip_file = zipfile.ZipFile(zip_output, 'w', zipfile.ZIP_DEFLATED)
zip_file.writestr('lambda_function.py', lambda_fx)
zip_file.close()
zip_output.seek(0)
client = boto3.client('lambda', region_name='us-east-1')
client.create_function(
FunctionName='test-lambda-fx',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
Code={
'ZipFile': zip_output.read()
},
)
result = client.invoke(
FunctionName='test-lambda-fx',
InvocationType='RequestResponse',
LogType='Tail'
)
assert 'FunctionError' in result
assert result['FunctionError'] == 'Handled'
@mock_lambda
@mock_s3
def test_tags():
"""
test list_tags -> tag_resource -> list_tags -> tag_resource -> list_tags -> untag_resource -> list_tags integration
"""
s3_conn = boto3.client('s3', 'us-west-2')
s3_conn.create_bucket(Bucket='test-bucket')
zip_content = get_test_zip_file2()
s3_conn.put_object(Bucket='test-bucket', Key='test.zip', Body=zip_content)
conn = boto3.client('lambda', 'us-west-2')
function = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.handler',
Code={
'S3Bucket': 'test-bucket',
'S3Key': 'test.zip',
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
# List tags when there are none
conn.list_tags(
Resource=function['FunctionArn']
)['Tags'].should.equal(dict())
# List tags when there is one
conn.tag_resource(
Resource=function['FunctionArn'],
Tags=dict(spam='eggs')
)['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
conn.list_tags(
Resource=function['FunctionArn']
)['Tags'].should.equal(dict(spam='eggs'))
# List tags when another has been added
conn.tag_resource(
Resource=function['FunctionArn'],
Tags=dict(foo='bar')
)['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
conn.list_tags(
Resource=function['FunctionArn']
)['Tags'].should.equal(dict(spam='eggs', foo='bar'))
# Untag resource
conn.untag_resource(
Resource=function['FunctionArn'],
TagKeys=['spam', 'trolls']
)['ResponseMetadata']['HTTPStatusCode'].should.equal(204)
conn.list_tags(
Resource=function['FunctionArn']
)['Tags'].should.equal(dict(foo='bar'))
# Untag a tag that does not exist (no error and no change)
conn.untag_resource(
Resource=function['FunctionArn'],
TagKeys=['spam']
)['ResponseMetadata']['HTTPStatusCode'].should.equal(204)
@mock_lambda
def test_tags_not_found():
"""
Test list_tags and tag_resource when the lambda with the given arn does not exist
"""
conn = boto3.client('lambda', 'us-west-2')
conn.list_tags.when.called_with(
Resource='arn:aws:lambda:123456789012:function:not-found'
).should.throw(botocore.client.ClientError)
conn.tag_resource.when.called_with(
Resource='arn:aws:lambda:123456789012:function:not-found',
Tags=dict(spam='eggs')
).should.throw(botocore.client.ClientError)
conn.untag_resource.when.called_with(
Resource='arn:aws:lambda:123456789012:function:not-found',
TagKeys=['spam']
).should.throw(botocore.client.ClientError)
@mock_lambda
def test_invoke_async_function():
conn = boto3.client('lambda', 'us-west-2')
conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={'ZipFile': get_test_zip_file1()},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
success_result = conn.invoke_async(
FunctionName='testFunction',
InvokeArgs=json.dumps({'test': 'event'})
)
success_result['Status'].should.equal(202)
@mock_lambda
@freeze_time('2015-01-01 00:00:00')
def test_get_function_created_with_zipfile():
conn = boto3.client('lambda', 'us-west-2')
zip_content = get_test_zip_file1()
result = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.handler',
Code={
'ZipFile': zip_content,
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.get_function(
FunctionName='testFunction'
)
response['Configuration'].pop('LastModified')
response['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
assert len(response['Code']) == 2
assert response['Code']['RepositoryType'] == 'S3'
assert response['Code']['Location'].startswith('s3://awslambda-{0}-tasks.s3-{0}.amazonaws.com'.format(_lambda_region))
response['Configuration'].should.equal(
{
"CodeSha256": hashlib.sha256(zip_content).hexdigest(),
"CodeSize": len(zip_content),
"Description": "test lambda function",
"FunctionArn":'arn:aws:lambda:{}:123456789012:function:testFunction:$LATEST'.format(_lambda_region),
"FunctionName": "testFunction",
"Handler": "lambda_function.handler",
"MemorySize": 128,
"Role": "test-iam-role",
"Runtime": "python2.7",
"Timeout": 3,
"Version": '$LATEST',
"VpcConfig": {
"SecurityGroupIds": [],
"SubnetIds": [],
}
},
)
@mock_lambda
def add_function_permission():
conn = boto3.client('lambda', 'us-west-2')
zip_content = get_test_zip_file1()
result = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.handler',
Code={
'ZipFile': zip_content,
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.add_permission(
FunctionName='testFunction',
StatementId='1',
Action="lambda:InvokeFunction",
Principal='432143214321',
SourceArn="arn:aws:lambda:us-west-2:account-id:function:helloworld",
SourceAccount='123412341234',
EventSourceToken='blah',
Qualifier='2'
)
assert 'Statement' in response
res = json.loads(response['Statement'])
assert res['Action'] == "lambda:InvokeFunction"
@mock_lambda
def get_function_policy():
conn = boto3.client('lambda', 'us-west-2')
zip_content = get_test_zip_file1()
result = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.handler',
Code={
'ZipFile': zip_content,
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.add_permission(
FunctionName='testFunction',
StatementId='1',
Action="lambda:InvokeFunction",
Principal='432143214321',
SourceArn="arn:aws:lambda:us-west-2:account-id:function:helloworld",
SourceAccount='123412341234',
EventSourceToken='blah',
Qualifier='2'
)
response = conn.get_policy(
FunctionName='testFunction'
)
assert 'Policy' in response
assert isinstance(response['Policy'], str)
res = json.loads(response['Policy'])
assert res['Statement'][0]['Action'] == 'lambda:InvokeFunction'