diff --git a/moto/awslambda/models.py b/moto/awslambda/models.py index 784d86b0b..0fcabbf03 100644 --- a/moto/awslambda/models.py +++ b/moto/awslambda/models.py @@ -1,6 +1,7 @@ from __future__ import unicode_literals import base64 +import time from collections import defaultdict import copy import datetime @@ -31,6 +32,7 @@ from moto.logs.models import logs_backends from moto.s3.exceptions import MissingBucket, MissingKey from moto import settings from .utils import make_function_arn, make_function_ver_arn +from moto.sqs import sqs_backends logger = logging.getLogger(__name__) @@ -429,24 +431,39 @@ class LambdaFunction(BaseModel): class EventSourceMapping(BaseModel): def __init__(self, spec): # required - self.function_name = spec['FunctionName'] + self.function_arn = spec['FunctionArn'] self.event_source_arn = spec['EventSourceArn'] - self.starting_position = spec['StartingPosition'] - + self.uuid = str(uuid.uuid4()) + self.last_modified = time.mktime(datetime.datetime.utcnow().timetuple()) # optional - self.batch_size = spec.get('BatchSize', 100) + self.starting_position = spec.get('StartingPosition', 'TRIM_HORIZON') + self.batch_size = spec.get('BatchSize', 10) # TODO: Add source type-specific defaults self.enabled = spec.get('Enabled', True) self.starting_position_timestamp = spec.get('StartingPositionTimestamp', None) + def get_configuration(self): + return { + 'UUID': self.uuid, + 'BatchSize': self.batch_size, + 'EventSourceArn': self.event_source_arn, + 'FunctionArn': self.function_arn, + 'LastModified': self.last_modified, + 'LastProcessingResult': '', + 'State': 'Enabled' if self.enabled else 'Disabled', + 'StateTransitionReason': 'User initiated' + } + @classmethod def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name): properties = cloudformation_json['Properties'] + func = lambda_backends[region_name].get_function(properties['FunctionName']) spec = { - 'FunctionName': properties['FunctionName'], + 'FunctionArn': func.function_arn, 'EventSourceArn': properties['EventSourceArn'], - 'StartingPosition': properties['StartingPosition'] + 'StartingPosition': properties['StartingPosition'], + 'BatchSize': properties.get('BatchSize', 100) } optional_properties = 'BatchSize Enabled StartingPositionTimestamp'.split() for prop in optional_properties: @@ -466,8 +483,10 @@ class LambdaVersion(BaseModel): def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name): properties = cloudformation_json['Properties'] + function_name = properties['FunctionName'] + func = lambda_backends[region_name].publish_function(function_name) spec = { - 'Version': properties.get('Version') + 'Version': func.version } return LambdaVersion(spec) @@ -515,6 +534,9 @@ class LambdaStorage(object): def get_arn(self, arn): return self._arns.get(arn, None) + def get_function_by_name_or_arn(self, input): + return self.get_function(input) or self.get_arn(input) + def put_function(self, fn): """ :param fn: Function @@ -596,6 +618,7 @@ class LambdaStorage(object): class LambdaBackend(BaseBackend): def __init__(self, region_name): self._lambdas = LambdaStorage() + self._event_source_mappings = {} self.region_name = region_name def reset(self): @@ -617,6 +640,43 @@ class LambdaBackend(BaseBackend): fn.version = ver.version return fn + def create_event_source_mapping(self, spec): + required = [ + 'EventSourceArn', + 'FunctionName', + ] + for param in required: + if not spec.get(param): + raise RESTError('InvalidParameterValueException', 'Missing {}'.format(param)) + + # Validate function name + func = self._lambdas.get_function_by_name_or_arn(spec.get('FunctionName', '')) + if not func: + raise RESTError('ResourceNotFoundException', 'Invalid FunctionName') + + # Validate queue + for queue in sqs_backends[self.region_name].queues.values(): + if queue.queue_arn == spec['EventSourceArn']: + if queue.lambda_event_source_mappings.get('func.function_arn'): + # TODO: Correct exception? + raise RESTError('ResourceConflictException', 'The resource already exists.') + if queue.fifo_queue: + raise RESTError('InvalidParameterValueException', + '{} is FIFO'.format(queue.queue_arn)) + else: + esm_spec = { + 'EventSourceArn': spec['EventSourceArn'], + 'FunctionArn': func.function_arn, + } + esm = EventSourceMapping(esm_spec) + self._event_source_mappings[esm.uuid] = esm + + # Set backend function on queue + queue.lambda_event_source_mappings[esm.function_arn] = esm + + return esm + raise RESTError('ResourceNotFoundException', 'Invalid EventSourceArn') + def publish_function(self, function_name): return self._lambdas.publish_function(function_name) @@ -626,6 +686,33 @@ class LambdaBackend(BaseBackend): def list_versions_by_function(self, function_name): return self._lambdas.list_versions_by_function(function_name) + def get_event_source_mapping(self, uuid): + return self._event_source_mappings.get(uuid) + + def delete_event_source_mapping(self, uuid): + return self._event_source_mappings.pop(uuid) + + def update_event_source_mapping(self, uuid, spec): + esm = self.get_event_source_mapping(uuid) + if esm: + if spec.get('FunctionName'): + func = self._lambdas.get_function_by_name_or_arn(spec.get('FunctionName')) + esm.function_arn = func.function_arn + if 'BatchSize' in spec: + esm.batch_size = spec['BatchSize'] + if 'Enabled' in spec: + esm.enabled = spec['Enabled'] + return esm + return False + + def list_event_source_mappings(self, event_source_arn, function_name): + esms = list(self._event_source_mappings.values()) + if event_source_arn: + esms = list(filter(lambda x: x.event_source_arn == event_source_arn, esms)) + if function_name: + esms = list(filter(lambda x: x.function_name == function_name, esms)) + return esms + def get_function_by_arn(self, function_arn): return self._lambdas.get_arn(function_arn) @@ -635,7 +722,43 @@ class LambdaBackend(BaseBackend): def list_functions(self): return self._lambdas.all() - def send_message(self, function_name, message, subject=None, qualifier=None): + def send_sqs_batch(self, function_arn, messages, queue_arn): + success = True + for message in messages: + func = self.get_function_by_arn(function_arn) + result = self._send_sqs_message(func, message, queue_arn) + if not result: + success = False + return success + + def _send_sqs_message(self, func, message, queue_arn): + event = { + "Records": [ + { + "messageId": message.id, + "receiptHandle": message.receipt_handle, + "body": message.body, + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "098f6bcd4621d373cade4e832627b4f6", + "eventSource": "aws:sqs", + "eventSourceARN": queue_arn, + "awsRegion": self.region_name + } + ] + } + + request_headers = {} + response_headers = {} + func.invoke(json.dumps(event), request_headers, response_headers) + return 'x-amz-function-error' not in response_headers + + def send_sns_message(self, function_name, message, subject=None, qualifier=None): event = { "Records": [ { diff --git a/moto/awslambda/responses.py b/moto/awslambda/responses.py index c29c9acd9..1e7feb0d0 100644 --- a/moto/awslambda/responses.py +++ b/moto/awslambda/responses.py @@ -39,6 +39,31 @@ class LambdaResponse(BaseResponse): else: raise ValueError("Cannot handle request") + def event_source_mappings(self, request, full_url, headers): + self.setup_class(request, full_url, headers) + if request.method == 'GET': + querystring = self.querystring + event_source_arn = querystring.get('EventSourceArn', [None])[0] + function_name = querystring.get('FunctionName', [None])[0] + return self._list_event_source_mappings(event_source_arn, function_name) + elif request.method == 'POST': + return self._create_event_source_mapping(request, full_url, headers) + else: + raise ValueError("Cannot handle request") + + def event_source_mapping(self, request, full_url, headers): + self.setup_class(request, full_url, headers) + path = request.path if hasattr(request, 'path') else path_url(request.url) + uuid = path.split('/')[-1] + if request.method == 'GET': + return self._get_event_source_mapping(uuid) + elif request.method == 'PUT': + return self._update_event_source_mapping(uuid) + elif request.method == 'DELETE': + return self._delete_event_source_mapping(uuid) + else: + raise ValueError("Cannot handle request") + def function(self, request, full_url, headers): self.setup_class(request, full_url, headers) if request.method == 'GET': @@ -177,6 +202,45 @@ class LambdaResponse(BaseResponse): config = fn.get_configuration() return 201, {}, json.dumps(config) + def _create_event_source_mapping(self, request, full_url, headers): + try: + fn = self.lambda_backend.create_event_source_mapping(self.json_body) + except ValueError as e: + return 400, {}, json.dumps({"Error": {"Code": e.args[0], "Message": e.args[1]}}) + else: + config = fn.get_configuration() + return 201, {}, json.dumps(config) + + def _list_event_source_mappings(self, event_source_arn, function_name): + esms = self.lambda_backend.list_event_source_mappings(event_source_arn, function_name) + result = { + 'EventSourceMappings': [esm.get_configuration() for esm in esms] + } + return 200, {}, json.dumps(result) + + def _get_event_source_mapping(self, uuid): + result = self.lambda_backend.get_event_source_mapping(uuid) + if result: + return 200, {}, json.dumps(result.get_configuration()) + else: + return 404, {}, "{}" + + def _update_event_source_mapping(self, uuid): + result = self.lambda_backend.update_event_source_mapping(uuid, self.json_body) + if result: + return 202, {}, json.dumps(result.get_configuration()) + else: + return 404, {}, "{}" + + def _delete_event_source_mapping(self, uuid): + esm = self.lambda_backend.delete_event_source_mapping(uuid) + if esm: + json_result = esm.get_configuration() + json_result.update({'State': 'Deleting'}) + return 202, {}, json.dumps(json_result) + else: + return 404, {}, "{}" + def _publish_function(self, request, full_url, headers): function_name = self.path.rsplit('/', 2)[-2] diff --git a/moto/awslambda/urls.py b/moto/awslambda/urls.py index 7c4d064dc..fb2c6ee7e 100644 --- a/moto/awslambda/urls.py +++ b/moto/awslambda/urls.py @@ -11,6 +11,8 @@ url_paths = { '{0}/(?P[^/]+)/functions/?$': response.root, r'{0}/(?P[^/]+)/functions/(?P[\w_-]+)/?$': response.function, r'{0}/(?P[^/]+)/functions/(?P[\w_-]+)/versions/?$': response.versions, + r'{0}/(?P[^/]+)/event-source-mappings/?$': response.event_source_mappings, + r'{0}/(?P[^/]+)/event-source-mappings/(?P[\w_-]+)/?$': response.event_source_mapping, r'{0}/(?P[^/]+)/functions/(?P[\w_-]+)/invocations/?$': response.invoke, r'{0}/(?P[^/]+)/functions/(?P[\w_-]+)/invoke-async/?$': response.invoke_async, r'{0}/(?P[^/]+)/tags/(?P.+)': response.tag, diff --git a/moto/sns/models.py b/moto/sns/models.py index 18b86cb93..f1293eb0f 100644 --- a/moto/sns/models.py +++ b/moto/sns/models.py @@ -119,7 +119,7 @@ class Subscription(BaseModel): else: assert False - lambda_backends[region].send_message(function_name, message, subject=subject, qualifier=qualifier) + lambda_backends[region].send_sns_message(function_name, message, subject=subject, qualifier=qualifier) def _matches_filter_policy(self, message_attributes): # TODO: support Anything-but matching, prefix matching and diff --git a/moto/sqs/models.py b/moto/sqs/models.py index f2e3ed400..e774e261c 100644 --- a/moto/sqs/models.py +++ b/moto/sqs/models.py @@ -189,6 +189,8 @@ class Queue(BaseModel): self.name) self.dead_letter_queue = None + self.lambda_event_source_mappings = {} + # default settings for a non fifo queue defaults = { 'ContentBasedDeduplication': 'false', @@ -360,6 +362,33 @@ class Queue(BaseModel): def add_message(self, message): self._messages.append(message) + from moto.awslambda import lambda_backends + for arn, esm in self.lambda_event_source_mappings.items(): + backend = sqs_backends[self.region] + + """ + Lambda polls the queue and invokes your function synchronously with an event + that contains queue messages. Lambda reads messages in batches and invokes + your function once for each batch. When your function successfully processes + a batch, Lambda deletes its messages from the queue. + """ + messages = backend.receive_messages( + self.name, + esm.batch_size, + self.receive_message_wait_time_seconds, + self.visibility_timeout, + ) + + result = lambda_backends[self.region].send_sqs_batch( + arn, + messages, + self.queue_arn, + ) + + if result: + [backend.delete_message(self.name, m.receipt_handle) for m in messages] + else: + [backend.change_message_visibility(self.name, m.receipt_handle, 0) for m in messages] def get_cfn_attribute(self, attribute_name): from moto.cloudformation.exceptions import UnformattedGetAttTemplateException diff --git a/tests/test_awslambda/test_lambda.py b/tests/test_awslambda/test_lambda.py index 9ef6fdb0d..9467b0803 100644 --- a/tests/test_awslambda/test_lambda.py +++ b/tests/test_awslambda/test_lambda.py @@ -1,6 +1,7 @@ from __future__ import unicode_literals import base64 +import uuid import botocore.client import boto3 import hashlib @@ -11,11 +12,12 @@ import zipfile import sure # noqa from freezegun import freeze_time -from moto import mock_lambda, mock_s3, mock_ec2, mock_sns, mock_logs, settings +from moto import mock_lambda, mock_s3, mock_ec2, mock_sns, mock_logs, settings, mock_sqs from nose.tools import assert_raises from botocore.exceptions import ClientError _lambda_region = 'us-west-2' +boto3.setup_default_session(region_name=_lambda_region) def _process_lambda(func_str): @@ -59,6 +61,13 @@ def lambda_handler(event, context): """ return _process_lambda(pfunc) +def get_test_zip_file4(): + pfunc = """ +def lambda_handler(event, context): + raise Exception('I failed!') +""" + return _process_lambda(pfunc) + @mock_lambda def test_list_functions(): @@ -933,3 +942,306 @@ def test_list_versions_by_function_for_nonexistent_function(): versions = conn.list_versions_by_function(FunctionName='testFunction') assert len(versions['Versions']) == 0 + + +@mock_logs +@mock_lambda +@mock_sqs +def test_create_event_source_mapping(): + sqs = boto3.resource('sqs') + queue = sqs.create_queue(QueueName="test-sqs-queue1") + + conn = boto3.client('lambda') + func = conn.create_function( + FunctionName='testFunction', + Runtime='python2.7', + Role='test-iam-role', + Handler='lambda_function.lambda_handler', + Code={ + 'ZipFile': get_test_zip_file3(), + }, + Description='test lambda function', + Timeout=3, + MemorySize=128, + Publish=True, + ) + + response = conn.create_event_source_mapping( + EventSourceArn=queue.attributes['QueueArn'], + FunctionName=func['FunctionArn'], + ) + + assert response['EventSourceArn'] == queue.attributes['QueueArn'] + assert response['FunctionArn'] == func['FunctionArn'] + assert response['State'] == 'Enabled' + + +@mock_logs +@mock_lambda +@mock_sqs +def test_invoke_function_from_sqs(): + logs_conn = boto3.client("logs") + sqs = boto3.resource('sqs') + queue = sqs.create_queue(QueueName="test-sqs-queue1") + + conn = boto3.client('lambda') + func = conn.create_function( + FunctionName='testFunction', + Runtime='python2.7', + Role='test-iam-role', + Handler='lambda_function.lambda_handler', + Code={ + 'ZipFile': get_test_zip_file3(), + }, + Description='test lambda function', + Timeout=3, + MemorySize=128, + Publish=True, + ) + + response = conn.create_event_source_mapping( + EventSourceArn=queue.attributes['QueueArn'], + FunctionName=func['FunctionArn'], + ) + + assert response['EventSourceArn'] == queue.attributes['QueueArn'] + assert response['State'] == 'Enabled' + + sqs_client = boto3.client('sqs') + sqs_client.send_message(QueueUrl=queue.url, MessageBody='test') + start = time.time() + while (time.time() - start) < 30: + result = logs_conn.describe_log_streams(logGroupName='/aws/lambda/testFunction') + log_streams = result.get('logStreams') + if not log_streams: + time.sleep(1) + continue + + assert len(log_streams) == 1 + result = logs_conn.get_log_events(logGroupName='/aws/lambda/testFunction', logStreamName=log_streams[0]['logStreamName']) + for event in result.get('events'): + if event['message'] == 'get_test_zip_file3 success': + return + time.sleep(1) + + assert False, "Test Failed" + + +@mock_logs +@mock_lambda +@mock_sqs +def test_invoke_function_from_sqs_exception(): + logs_conn = boto3.client("logs") + sqs = boto3.resource('sqs') + queue = sqs.create_queue(QueueName="test-sqs-queue1") + + conn = boto3.client('lambda') + func = conn.create_function( + FunctionName='testFunction', + Runtime='python2.7', + Role='test-iam-role', + Handler='lambda_function.lambda_handler', + Code={ + 'ZipFile': get_test_zip_file4(), + }, + Description='test lambda function', + Timeout=3, + MemorySize=128, + Publish=True, + ) + + response = conn.create_event_source_mapping( + EventSourceArn=queue.attributes['QueueArn'], + FunctionName=func['FunctionArn'], + ) + + assert response['EventSourceArn'] == queue.attributes['QueueArn'] + assert response['State'] == 'Enabled' + + entries = [] + for i in range(3): + body = { + "uuid": str(uuid.uuid4()), + "test": "test_{}".format(i), + } + entry = { + 'Id': str(i), + 'MessageBody': json.dumps(body) + } + entries.append(entry) + + queue.send_messages(Entries=entries) + + start = time.time() + while (time.time() - start) < 30: + result = logs_conn.describe_log_streams(logGroupName='/aws/lambda/testFunction') + log_streams = result.get('logStreams') + if not log_streams: + time.sleep(1) + continue + assert len(log_streams) >= 1 + + result = logs_conn.get_log_events(logGroupName='/aws/lambda/testFunction', logStreamName=log_streams[0]['logStreamName']) + for event in result.get('events'): + if 'I failed!' in event['message']: + messages = queue.receive_messages(MaxNumberOfMessages=10) + # Verify messages are still visible and unprocessed + assert len(messages) is 3 + return + time.sleep(1) + + assert False, "Test Failed" + + +@mock_logs +@mock_lambda +@mock_sqs +def test_list_event_source_mappings(): + sqs = boto3.resource('sqs') + queue = sqs.create_queue(QueueName="test-sqs-queue1") + + conn = boto3.client('lambda') + func = conn.create_function( + FunctionName='testFunction', + Runtime='python2.7', + Role='test-iam-role', + Handler='lambda_function.lambda_handler', + Code={ + 'ZipFile': get_test_zip_file3(), + }, + Description='test lambda function', + Timeout=3, + MemorySize=128, + Publish=True, + ) + response = conn.create_event_source_mapping( + EventSourceArn=queue.attributes['QueueArn'], + FunctionName=func['FunctionArn'], + ) + mappings = conn.list_event_source_mappings(EventSourceArn='123') + assert len(mappings['EventSourceMappings']) == 0 + + mappings = conn.list_event_source_mappings(EventSourceArn=queue.attributes['QueueArn']) + assert len(mappings['EventSourceMappings']) == 1 + assert mappings['EventSourceMappings'][0]['UUID'] == response['UUID'] + assert mappings['EventSourceMappings'][0]['FunctionArn'] == func['FunctionArn'] + + +@mock_lambda +@mock_sqs +def test_get_event_source_mapping(): + sqs = boto3.resource('sqs') + queue = sqs.create_queue(QueueName="test-sqs-queue1") + + conn = boto3.client('lambda') + func = conn.create_function( + FunctionName='testFunction', + Runtime='python2.7', + Role='test-iam-role', + Handler='lambda_function.lambda_handler', + Code={ + 'ZipFile': get_test_zip_file3(), + }, + Description='test lambda function', + Timeout=3, + MemorySize=128, + Publish=True, + ) + response = conn.create_event_source_mapping( + EventSourceArn=queue.attributes['QueueArn'], + FunctionName=func['FunctionArn'], + ) + mapping = conn.get_event_source_mapping(UUID=response['UUID']) + assert mapping['UUID'] == response['UUID'] + assert mapping['FunctionArn'] == func['FunctionArn'] + + conn.get_event_source_mapping.when.called_with(UUID='1')\ + .should.throw(botocore.client.ClientError) + + +@mock_lambda +@mock_sqs +def test_update_event_source_mapping(): + sqs = boto3.resource('sqs') + queue = sqs.create_queue(QueueName="test-sqs-queue1") + + conn = boto3.client('lambda') + func1 = conn.create_function( + FunctionName='testFunction', + Runtime='python2.7', + Role='test-iam-role', + Handler='lambda_function.lambda_handler', + Code={ + 'ZipFile': get_test_zip_file3(), + }, + Description='test lambda function', + Timeout=3, + MemorySize=128, + Publish=True, + ) + func2 = conn.create_function( + FunctionName='testFunction2', + Runtime='python2.7', + Role='test-iam-role', + Handler='lambda_function.lambda_handler', + Code={ + 'ZipFile': get_test_zip_file3(), + }, + Description='test lambda function', + Timeout=3, + MemorySize=128, + Publish=True, + ) + response = conn.create_event_source_mapping( + EventSourceArn=queue.attributes['QueueArn'], + FunctionName=func1['FunctionArn'], + ) + assert response['FunctionArn'] == func1['FunctionArn'] + assert response['BatchSize'] == 10 + assert response['State'] == 'Enabled' + + mapping = conn.update_event_source_mapping( + UUID=response['UUID'], + Enabled=False, + BatchSize=15, + FunctionName='testFunction2' + + ) + assert mapping['UUID'] == response['UUID'] + assert mapping['FunctionArn'] == func2['FunctionArn'] + assert mapping['State'] == 'Disabled' + + +@mock_lambda +@mock_sqs +def test_delete_event_source_mapping(): + sqs = boto3.resource('sqs') + queue = sqs.create_queue(QueueName="test-sqs-queue1") + + conn = boto3.client('lambda') + func1 = conn.create_function( + FunctionName='testFunction', + Runtime='python2.7', + Role='test-iam-role', + Handler='lambda_function.lambda_handler', + Code={ + 'ZipFile': get_test_zip_file3(), + }, + Description='test lambda function', + Timeout=3, + MemorySize=128, + Publish=True, + ) + response = conn.create_event_source_mapping( + EventSourceArn=queue.attributes['QueueArn'], + FunctionName=func1['FunctionArn'], + ) + assert response['FunctionArn'] == func1['FunctionArn'] + assert response['BatchSize'] == 10 + assert response['State'] == 'Enabled' + + response = conn.delete_event_source_mapping(UUID=response['UUID']) + + assert response['State'] == 'Deleting' + conn.get_event_source_mapping.when.called_with(UUID=response['UUID'])\ + .should.throw(botocore.client.ClientError) diff --git a/tests/test_cloudformation/test_cloudformation_stack_crud.py b/tests/test_cloudformation/test_cloudformation_stack_crud.py index b7906632b..27424bf8c 100644 --- a/tests/test_cloudformation/test_cloudformation_stack_crud.py +++ b/tests/test_cloudformation/test_cloudformation_stack_crud.py @@ -593,9 +593,11 @@ def test_create_stack_lambda_and_dynamodb(): } }, "func1version": { - "Type": "AWS::Lambda::LambdaVersion", - "Properties" : { - "Version": "v1.2.3" + "Type": "AWS::Lambda::Version", + "Properties": { + "FunctionName": { + "Ref": "func1" + } } }, "tab1": { @@ -618,8 +620,10 @@ def test_create_stack_lambda_and_dynamodb(): }, "func1mapping": { "Type": "AWS::Lambda::EventSourceMapping", - "Properties" : { - "FunctionName": "v1.2.3", + "Properties": { + "FunctionName": { + "Ref": "func1" + }, "EventSourceArn": "arn:aws:dynamodb:region:XXXXXX:table/tab1/stream/2000T00:00:00.000", "StartingPosition": "0", "BatchSize": 100,