And event source mapping endpoints and SQS trigger support

This commit is contained in:
Randy Westergren 2019-08-20 21:54:57 -04:00
parent c0c86be6ee
commit ccceb70397
No known key found for this signature in database
GPG Key ID: 9169FEB434BFAB92
7 changed files with 549 additions and 15 deletions

View File

@ -1,6 +1,7 @@
from __future__ import unicode_literals
import base64
import time
from collections import defaultdict
import copy
import datetime
@ -31,6 +32,7 @@ from moto.logs.models import logs_backends
from moto.s3.exceptions import MissingBucket, MissingKey
from moto import settings
from .utils import make_function_arn, make_function_ver_arn
from moto.sqs import sqs_backends
logger = logging.getLogger(__name__)
@ -429,24 +431,39 @@ class LambdaFunction(BaseModel):
class EventSourceMapping(BaseModel):
def __init__(self, spec):
# required
self.function_name = spec['FunctionName']
self.function_arn = spec['FunctionArn']
self.event_source_arn = spec['EventSourceArn']
self.starting_position = spec['StartingPosition']
self.uuid = str(uuid.uuid4())
self.last_modified = time.mktime(datetime.datetime.utcnow().timetuple())
# optional
self.batch_size = spec.get('BatchSize', 100)
self.starting_position = spec.get('StartingPosition', 'TRIM_HORIZON')
self.batch_size = spec.get('BatchSize', 10) # TODO: Add source type-specific defaults
self.enabled = spec.get('Enabled', True)
self.starting_position_timestamp = spec.get('StartingPositionTimestamp',
None)
def get_configuration(self):
return {
'UUID': self.uuid,
'BatchSize': self.batch_size,
'EventSourceArn': self.event_source_arn,
'FunctionArn': self.function_arn,
'LastModified': self.last_modified,
'LastProcessingResult': '',
'State': 'Enabled' if self.enabled else 'Disabled',
'StateTransitionReason': 'User initiated'
}
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json,
region_name):
properties = cloudformation_json['Properties']
func = lambda_backends[region_name].get_function(properties['FunctionName'])
spec = {
'FunctionName': properties['FunctionName'],
'FunctionArn': func.function_arn,
'EventSourceArn': properties['EventSourceArn'],
'StartingPosition': properties['StartingPosition']
'StartingPosition': properties['StartingPosition'],
'BatchSize': properties.get('BatchSize', 100)
}
optional_properties = 'BatchSize Enabled StartingPositionTimestamp'.split()
for prop in optional_properties:
@ -466,8 +483,10 @@ class LambdaVersion(BaseModel):
def create_from_cloudformation_json(cls, resource_name, cloudformation_json,
region_name):
properties = cloudformation_json['Properties']
function_name = properties['FunctionName']
func = lambda_backends[region_name].publish_function(function_name)
spec = {
'Version': properties.get('Version')
'Version': func.version
}
return LambdaVersion(spec)
@ -515,6 +534,9 @@ class LambdaStorage(object):
def get_arn(self, arn):
return self._arns.get(arn, None)
def get_function_by_name_or_arn(self, input):
return self.get_function(input) or self.get_arn(input)
def put_function(self, fn):
"""
:param fn: Function
@ -596,6 +618,7 @@ class LambdaStorage(object):
class LambdaBackend(BaseBackend):
def __init__(self, region_name):
self._lambdas = LambdaStorage()
self._event_source_mappings = {}
self.region_name = region_name
def reset(self):
@ -617,6 +640,43 @@ class LambdaBackend(BaseBackend):
fn.version = ver.version
return fn
def create_event_source_mapping(self, spec):
required = [
'EventSourceArn',
'FunctionName',
]
for param in required:
if not spec.get(param):
raise RESTError('InvalidParameterValueException', 'Missing {}'.format(param))
# Validate function name
func = self._lambdas.get_function_by_name_or_arn(spec.get('FunctionName', ''))
if not func:
raise RESTError('ResourceNotFoundException', 'Invalid FunctionName')
# Validate queue
for queue in sqs_backends[self.region_name].queues.values():
if queue.queue_arn == spec['EventSourceArn']:
if queue.lambda_event_source_mappings.get('func.function_arn'):
# TODO: Correct exception?
raise RESTError('ResourceConflictException', 'The resource already exists.')
if queue.fifo_queue:
raise RESTError('InvalidParameterValueException',
'{} is FIFO'.format(queue.queue_arn))
else:
esm_spec = {
'EventSourceArn': spec['EventSourceArn'],
'FunctionArn': func.function_arn,
}
esm = EventSourceMapping(esm_spec)
self._event_source_mappings[esm.uuid] = esm
# Set backend function on queue
queue.lambda_event_source_mappings[esm.function_arn] = esm
return esm
raise RESTError('ResourceNotFoundException', 'Invalid EventSourceArn')
def publish_function(self, function_name):
return self._lambdas.publish_function(function_name)
@ -626,6 +686,33 @@ class LambdaBackend(BaseBackend):
def list_versions_by_function(self, function_name):
return self._lambdas.list_versions_by_function(function_name)
def get_event_source_mapping(self, uuid):
return self._event_source_mappings.get(uuid)
def delete_event_source_mapping(self, uuid):
return self._event_source_mappings.pop(uuid)
def update_event_source_mapping(self, uuid, spec):
esm = self.get_event_source_mapping(uuid)
if esm:
if spec.get('FunctionName'):
func = self._lambdas.get_function_by_name_or_arn(spec.get('FunctionName'))
esm.function_arn = func.function_arn
if 'BatchSize' in spec:
esm.batch_size = spec['BatchSize']
if 'Enabled' in spec:
esm.enabled = spec['Enabled']
return esm
return False
def list_event_source_mappings(self, event_source_arn, function_name):
esms = list(self._event_source_mappings.values())
if event_source_arn:
esms = list(filter(lambda x: x.event_source_arn == event_source_arn, esms))
if function_name:
esms = list(filter(lambda x: x.function_name == function_name, esms))
return esms
def get_function_by_arn(self, function_arn):
return self._lambdas.get_arn(function_arn)
@ -635,7 +722,43 @@ class LambdaBackend(BaseBackend):
def list_functions(self):
return self._lambdas.all()
def send_message(self, function_name, message, subject=None, qualifier=None):
def send_sqs_batch(self, function_arn, messages, queue_arn):
success = True
for message in messages:
func = self.get_function_by_arn(function_arn)
result = self._send_sqs_message(func, message, queue_arn)
if not result:
success = False
return success
def _send_sqs_message(self, func, message, queue_arn):
event = {
"Records": [
{
"messageId": message.id,
"receiptHandle": message.receipt_handle,
"body": message.body,
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185"
},
"messageAttributes": {},
"md5OfBody": "098f6bcd4621d373cade4e832627b4f6",
"eventSource": "aws:sqs",
"eventSourceARN": queue_arn,
"awsRegion": self.region_name
}
]
}
request_headers = {}
response_headers = {}
func.invoke(json.dumps(event), request_headers, response_headers)
return 'x-amz-function-error' not in response_headers
def send_sns_message(self, function_name, message, subject=None, qualifier=None):
event = {
"Records": [
{

View File

@ -39,6 +39,31 @@ class LambdaResponse(BaseResponse):
else:
raise ValueError("Cannot handle request")
def event_source_mappings(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == 'GET':
querystring = self.querystring
event_source_arn = querystring.get('EventSourceArn', [None])[0]
function_name = querystring.get('FunctionName', [None])[0]
return self._list_event_source_mappings(event_source_arn, function_name)
elif request.method == 'POST':
return self._create_event_source_mapping(request, full_url, headers)
else:
raise ValueError("Cannot handle request")
def event_source_mapping(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
path = request.path if hasattr(request, 'path') else path_url(request.url)
uuid = path.split('/')[-1]
if request.method == 'GET':
return self._get_event_source_mapping(uuid)
elif request.method == 'PUT':
return self._update_event_source_mapping(uuid)
elif request.method == 'DELETE':
return self._delete_event_source_mapping(uuid)
else:
raise ValueError("Cannot handle request")
def function(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == 'GET':
@ -177,6 +202,45 @@ class LambdaResponse(BaseResponse):
config = fn.get_configuration()
return 201, {}, json.dumps(config)
def _create_event_source_mapping(self, request, full_url, headers):
try:
fn = self.lambda_backend.create_event_source_mapping(self.json_body)
except ValueError as e:
return 400, {}, json.dumps({"Error": {"Code": e.args[0], "Message": e.args[1]}})
else:
config = fn.get_configuration()
return 201, {}, json.dumps(config)
def _list_event_source_mappings(self, event_source_arn, function_name):
esms = self.lambda_backend.list_event_source_mappings(event_source_arn, function_name)
result = {
'EventSourceMappings': [esm.get_configuration() for esm in esms]
}
return 200, {}, json.dumps(result)
def _get_event_source_mapping(self, uuid):
result = self.lambda_backend.get_event_source_mapping(uuid)
if result:
return 200, {}, json.dumps(result.get_configuration())
else:
return 404, {}, "{}"
def _update_event_source_mapping(self, uuid):
result = self.lambda_backend.update_event_source_mapping(uuid, self.json_body)
if result:
return 202, {}, json.dumps(result.get_configuration())
else:
return 404, {}, "{}"
def _delete_event_source_mapping(self, uuid):
esm = self.lambda_backend.delete_event_source_mapping(uuid)
if esm:
json_result = esm.get_configuration()
json_result.update({'State': 'Deleting'})
return 202, {}, json.dumps(json_result)
else:
return 404, {}, "{}"
def _publish_function(self, request, full_url, headers):
function_name = self.path.rsplit('/', 2)[-2]

View File

@ -11,6 +11,8 @@ url_paths = {
'{0}/(?P<api_version>[^/]+)/functions/?$': response.root,
r'{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/?$': response.function,
r'{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/versions/?$': response.versions,
r'{0}/(?P<api_version>[^/]+)/event-source-mappings/?$': response.event_source_mappings,
r'{0}/(?P<api_version>[^/]+)/event-source-mappings/(?P<UUID>[\w_-]+)/?$': response.event_source_mapping,
r'{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/invocations/?$': response.invoke,
r'{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/invoke-async/?$': response.invoke_async,
r'{0}/(?P<api_version>[^/]+)/tags/(?P<resource_arn>.+)': response.tag,

View File

@ -119,7 +119,7 @@ class Subscription(BaseModel):
else:
assert False
lambda_backends[region].send_message(function_name, message, subject=subject, qualifier=qualifier)
lambda_backends[region].send_sns_message(function_name, message, subject=subject, qualifier=qualifier)
def _matches_filter_policy(self, message_attributes):
# TODO: support Anything-but matching, prefix matching and

View File

@ -189,6 +189,8 @@ class Queue(BaseModel):
self.name)
self.dead_letter_queue = None
self.lambda_event_source_mappings = {}
# default settings for a non fifo queue
defaults = {
'ContentBasedDeduplication': 'false',
@ -360,6 +362,33 @@ class Queue(BaseModel):
def add_message(self, message):
self._messages.append(message)
from moto.awslambda import lambda_backends
for arn, esm in self.lambda_event_source_mappings.items():
backend = sqs_backends[self.region]
"""
Lambda polls the queue and invokes your function synchronously with an event
that contains queue messages. Lambda reads messages in batches and invokes
your function once for each batch. When your function successfully processes
a batch, Lambda deletes its messages from the queue.
"""
messages = backend.receive_messages(
self.name,
esm.batch_size,
self.receive_message_wait_time_seconds,
self.visibility_timeout,
)
result = lambda_backends[self.region].send_sqs_batch(
arn,
messages,
self.queue_arn,
)
if result:
[backend.delete_message(self.name, m.receipt_handle) for m in messages]
else:
[backend.change_message_visibility(self.name, m.receipt_handle, 0) for m in messages]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException

View File

@ -1,6 +1,7 @@
from __future__ import unicode_literals
import base64
import uuid
import botocore.client
import boto3
import hashlib
@ -11,11 +12,12 @@ import zipfile
import sure # noqa
from freezegun import freeze_time
from moto import mock_lambda, mock_s3, mock_ec2, mock_sns, mock_logs, settings
from moto import mock_lambda, mock_s3, mock_ec2, mock_sns, mock_logs, settings, mock_sqs
from nose.tools import assert_raises
from botocore.exceptions import ClientError
_lambda_region = 'us-west-2'
boto3.setup_default_session(region_name=_lambda_region)
def _process_lambda(func_str):
@ -59,6 +61,13 @@ def lambda_handler(event, context):
"""
return _process_lambda(pfunc)
def get_test_zip_file4():
pfunc = """
def lambda_handler(event, context):
raise Exception('I failed!')
"""
return _process_lambda(pfunc)
@mock_lambda
def test_list_functions():
@ -933,3 +942,306 @@ def test_list_versions_by_function_for_nonexistent_function():
versions = conn.list_versions_by_function(FunctionName='testFunction')
assert len(versions['Versions']) == 0
@mock_logs
@mock_lambda
@mock_sqs
def test_create_event_source_mapping():
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="test-sqs-queue1")
conn = boto3.client('lambda')
func = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'ZipFile': get_test_zip_file3(),
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.create_event_source_mapping(
EventSourceArn=queue.attributes['QueueArn'],
FunctionName=func['FunctionArn'],
)
assert response['EventSourceArn'] == queue.attributes['QueueArn']
assert response['FunctionArn'] == func['FunctionArn']
assert response['State'] == 'Enabled'
@mock_logs
@mock_lambda
@mock_sqs
def test_invoke_function_from_sqs():
logs_conn = boto3.client("logs")
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="test-sqs-queue1")
conn = boto3.client('lambda')
func = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'ZipFile': get_test_zip_file3(),
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.create_event_source_mapping(
EventSourceArn=queue.attributes['QueueArn'],
FunctionName=func['FunctionArn'],
)
assert response['EventSourceArn'] == queue.attributes['QueueArn']
assert response['State'] == 'Enabled'
sqs_client = boto3.client('sqs')
sqs_client.send_message(QueueUrl=queue.url, MessageBody='test')
start = time.time()
while (time.time() - start) < 30:
result = logs_conn.describe_log_streams(logGroupName='/aws/lambda/testFunction')
log_streams = result.get('logStreams')
if not log_streams:
time.sleep(1)
continue
assert len(log_streams) == 1
result = logs_conn.get_log_events(logGroupName='/aws/lambda/testFunction', logStreamName=log_streams[0]['logStreamName'])
for event in result.get('events'):
if event['message'] == 'get_test_zip_file3 success':
return
time.sleep(1)
assert False, "Test Failed"
@mock_logs
@mock_lambda
@mock_sqs
def test_invoke_function_from_sqs_exception():
logs_conn = boto3.client("logs")
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="test-sqs-queue1")
conn = boto3.client('lambda')
func = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'ZipFile': get_test_zip_file4(),
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.create_event_source_mapping(
EventSourceArn=queue.attributes['QueueArn'],
FunctionName=func['FunctionArn'],
)
assert response['EventSourceArn'] == queue.attributes['QueueArn']
assert response['State'] == 'Enabled'
entries = []
for i in range(3):
body = {
"uuid": str(uuid.uuid4()),
"test": "test_{}".format(i),
}
entry = {
'Id': str(i),
'MessageBody': json.dumps(body)
}
entries.append(entry)
queue.send_messages(Entries=entries)
start = time.time()
while (time.time() - start) < 30:
result = logs_conn.describe_log_streams(logGroupName='/aws/lambda/testFunction')
log_streams = result.get('logStreams')
if not log_streams:
time.sleep(1)
continue
assert len(log_streams) >= 1
result = logs_conn.get_log_events(logGroupName='/aws/lambda/testFunction', logStreamName=log_streams[0]['logStreamName'])
for event in result.get('events'):
if 'I failed!' in event['message']:
messages = queue.receive_messages(MaxNumberOfMessages=10)
# Verify messages are still visible and unprocessed
assert len(messages) is 3
return
time.sleep(1)
assert False, "Test Failed"
@mock_logs
@mock_lambda
@mock_sqs
def test_list_event_source_mappings():
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="test-sqs-queue1")
conn = boto3.client('lambda')
func = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'ZipFile': get_test_zip_file3(),
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.create_event_source_mapping(
EventSourceArn=queue.attributes['QueueArn'],
FunctionName=func['FunctionArn'],
)
mappings = conn.list_event_source_mappings(EventSourceArn='123')
assert len(mappings['EventSourceMappings']) == 0
mappings = conn.list_event_source_mappings(EventSourceArn=queue.attributes['QueueArn'])
assert len(mappings['EventSourceMappings']) == 1
assert mappings['EventSourceMappings'][0]['UUID'] == response['UUID']
assert mappings['EventSourceMappings'][0]['FunctionArn'] == func['FunctionArn']
@mock_lambda
@mock_sqs
def test_get_event_source_mapping():
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="test-sqs-queue1")
conn = boto3.client('lambda')
func = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'ZipFile': get_test_zip_file3(),
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.create_event_source_mapping(
EventSourceArn=queue.attributes['QueueArn'],
FunctionName=func['FunctionArn'],
)
mapping = conn.get_event_source_mapping(UUID=response['UUID'])
assert mapping['UUID'] == response['UUID']
assert mapping['FunctionArn'] == func['FunctionArn']
conn.get_event_source_mapping.when.called_with(UUID='1')\
.should.throw(botocore.client.ClientError)
@mock_lambda
@mock_sqs
def test_update_event_source_mapping():
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="test-sqs-queue1")
conn = boto3.client('lambda')
func1 = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'ZipFile': get_test_zip_file3(),
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
func2 = conn.create_function(
FunctionName='testFunction2',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'ZipFile': get_test_zip_file3(),
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.create_event_source_mapping(
EventSourceArn=queue.attributes['QueueArn'],
FunctionName=func1['FunctionArn'],
)
assert response['FunctionArn'] == func1['FunctionArn']
assert response['BatchSize'] == 10
assert response['State'] == 'Enabled'
mapping = conn.update_event_source_mapping(
UUID=response['UUID'],
Enabled=False,
BatchSize=15,
FunctionName='testFunction2'
)
assert mapping['UUID'] == response['UUID']
assert mapping['FunctionArn'] == func2['FunctionArn']
assert mapping['State'] == 'Disabled'
@mock_lambda
@mock_sqs
def test_delete_event_source_mapping():
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="test-sqs-queue1")
conn = boto3.client('lambda')
func1 = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.lambda_handler',
Code={
'ZipFile': get_test_zip_file3(),
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.create_event_source_mapping(
EventSourceArn=queue.attributes['QueueArn'],
FunctionName=func1['FunctionArn'],
)
assert response['FunctionArn'] == func1['FunctionArn']
assert response['BatchSize'] == 10
assert response['State'] == 'Enabled'
response = conn.delete_event_source_mapping(UUID=response['UUID'])
assert response['State'] == 'Deleting'
conn.get_event_source_mapping.when.called_with(UUID=response['UUID'])\
.should.throw(botocore.client.ClientError)

View File

@ -593,9 +593,11 @@ def test_create_stack_lambda_and_dynamodb():
}
},
"func1version": {
"Type": "AWS::Lambda::LambdaVersion",
"Type": "AWS::Lambda::Version",
"Properties": {
"Version": "v1.2.3"
"FunctionName": {
"Ref": "func1"
}
}
},
"tab1": {
@ -619,7 +621,9 @@ def test_create_stack_lambda_and_dynamodb():
"func1mapping": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"FunctionName": "v1.2.3",
"FunctionName": {
"Ref": "func1"
},
"EventSourceArn": "arn:aws:dynamodb:region:XXXXXX:table/tab1/stream/2000T00:00:00.000",
"StartingPosition": "0",
"BatchSize": 100,