From 91b13f998f80e62945e1bf0a4c96329307fde990 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Mon, 7 Oct 2019 11:11:22 +0100 Subject: [PATCH] Feature: Add option for DynamoDB stream to kick off lambda --- moto/awslambda/models.py | 27 ++++++++++++++++ moto/dynamodb2/models.py | 10 ++++++ tests/test_awslambda/test_lambda.py | 50 ++++++++++++++++++++++++++++- 3 files changed, 86 insertions(+), 1 deletion(-) diff --git a/moto/awslambda/models.py b/moto/awslambda/models.py index acc7a5257..2630abe1b 100644 --- a/moto/awslambda/models.py +++ b/moto/awslambda/models.py @@ -33,6 +33,8 @@ from moto.s3.exceptions import MissingBucket, MissingKey from moto import settings from .utils import make_function_arn, make_function_ver_arn from moto.sqs import sqs_backends +from moto.dynamodb2 import dynamodb_backends2 +from moto.dynamodbstreams import dynamodbstreams_backends logger = logging.getLogger(__name__) @@ -692,6 +694,18 @@ class LambdaBackend(BaseBackend): queue.lambda_event_source_mappings[esm.function_arn] = esm return esm + try: + stream = json.loads(dynamodbstreams_backends[self.region_name].describe_stream(spec['EventSourceArn'])) + spec.update({'FunctionArn': func.function_arn}) + esm = EventSourceMapping(spec) + self._event_source_mappings[esm.uuid] = esm + table_name = stream['StreamDescription']['TableName'] + table = dynamodb_backends2[self.region_name].get_table(table_name) + table.lambda_event_source_mappings[esm.function_arn] = esm + + return esm + except Exception: + pass # No DynamoDB stream exists raise RESTError('ResourceNotFoundException', 'Invalid EventSourceArn') def publish_function(self, function_name): @@ -811,6 +825,19 @@ class LambdaBackend(BaseBackend): func = self._lambdas.get_function(function_name, qualifier) func.invoke(json.dumps(event), {}, {}) + def send_dynamodb_items(self, function_arn, items, source): + event = {'Records': [ + { + 'eventID': item.to_json()['eventID'], + 'eventName': 'INSERT', + 'eventVersion': item.to_json()['eventVersion'], + 'eventSource': item.to_json()['eventSource'], + 'awsRegion': 'us-east-1', + 'dynamodb': item.to_json()['dynamodb'], + 'eventSourceARN': source} for item in items]} + func = self._lambdas.get_arn(function_arn) + func.invoke(json.dumps(event), {}, {}) + def list_tags(self, resource): return self.get_function_by_arn(resource).tags diff --git a/moto/dynamodb2/models.py b/moto/dynamodb2/models.py index 4ef4461cd..265f7697a 100644 --- a/moto/dynamodb2/models.py +++ b/moto/dynamodb2/models.py @@ -409,6 +409,15 @@ class StreamShard(BaseModel): seq = len(self.items) + self.starting_sequence_number self.items.append( StreamRecord(self.table, t, event_name, old, new, seq)) + result = None + from moto.awslambda import lambda_backends + for arn, esm in self.table.lambda_event_source_mappings.items(): + region = arn[len('arn:aws:lambda:'):arn.index(':', len('arn:aws:lambda:'))] + + result = lambda_backends[region].send_dynamodb_items(arn, self.items, esm.event_source_arn) + + if result: + self.items = [] def get(self, start, quantity): start -= self.starting_sequence_number @@ -451,6 +460,7 @@ class Table(BaseModel): # 'AttributeName': 'string' # Can contain this } self.set_stream_specification(streams) + self.lambda_event_source_mappings = {} @classmethod def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name): diff --git a/tests/test_awslambda/test_lambda.py b/tests/test_awslambda/test_lambda.py index 9467b0803..d916c9918 100644 --- a/tests/test_awslambda/test_lambda.py +++ b/tests/test_awslambda/test_lambda.py @@ -12,7 +12,7 @@ import zipfile import sure # noqa from freezegun import freeze_time -from moto import mock_lambda, mock_s3, mock_ec2, mock_sns, mock_logs, settings, mock_sqs +from moto import mock_dynamodb2, mock_lambda, mock_s3, mock_ec2, mock_sns, mock_logs, settings, mock_sqs from nose.tools import assert_raises from botocore.exceptions import ClientError @@ -1027,6 +1027,54 @@ def test_invoke_function_from_sqs(): assert False, "Test Failed" +@mock_logs +@mock_lambda +@mock_dynamodb2 +def test_invoke_function_from_dynamodb(): + logs_conn = boto3.client("logs") + dynamodb = boto3.client('dynamodb') + table_name = 'table_with_stream' + table = dynamodb.create_table(TableName=table_name, + KeySchema=[{'AttributeName':'id','KeyType':'HASH'}], + AttributeDefinitions=[{'AttributeName':'id','AttributeType':'S'}], + StreamSpecification={'StreamEnabled': True, + 'StreamViewType': 'NEW_AND_OLD_IMAGES'}) + + conn = boto3.client('lambda') + func = conn.create_function(FunctionName='testFunction', Runtime='python2.7', + Role='test-iam-role', + Handler='lambda_function.lambda_handler', + Code={'ZipFile': get_test_zip_file3()}, + Description='test lambda function executed after a DynamoDB table is updated', + Timeout=3, MemorySize=128, Publish=True) + + response = conn.create_event_source_mapping( + EventSourceArn=table['TableDescription']['LatestStreamArn'], + FunctionName=func['FunctionArn'] + ) + + assert response['EventSourceArn'] == table['TableDescription']['LatestStreamArn'] + assert response['State'] == 'Enabled' + + dynamodb.put_item(TableName=table_name, Item={'id': { 'S': 'item 1' }}) + start = time.time() + while (time.time() - start) < 30: + result = logs_conn.describe_log_streams(logGroupName='/aws/lambda/testFunction') + log_streams = result.get('logStreams') + if not log_streams: + time.sleep(1) + continue + + assert len(log_streams) == 1 + result = logs_conn.get_log_events(logGroupName='/aws/lambda/testFunction', logStreamName=log_streams[0]['logStreamName']) + for event in result.get('events'): + if event['message'] == 'get_test_zip_file3 success': + return + time.sleep(1) + + assert False, "Test Failed" + + @mock_logs @mock_lambda @mock_sqs