Feature: Add option for DynamoDB stream to kick off lambda
This commit is contained in:
parent
2190eca96a
commit
91b13f998f
@ -33,6 +33,8 @@ from moto.s3.exceptions import MissingBucket, MissingKey
|
|||||||
from moto import settings
|
from moto import settings
|
||||||
from .utils import make_function_arn, make_function_ver_arn
|
from .utils import make_function_arn, make_function_ver_arn
|
||||||
from moto.sqs import sqs_backends
|
from moto.sqs import sqs_backends
|
||||||
|
from moto.dynamodb2 import dynamodb_backends2
|
||||||
|
from moto.dynamodbstreams import dynamodbstreams_backends
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -692,6 +694,18 @@ class LambdaBackend(BaseBackend):
|
|||||||
queue.lambda_event_source_mappings[esm.function_arn] = esm
|
queue.lambda_event_source_mappings[esm.function_arn] = esm
|
||||||
|
|
||||||
return esm
|
return esm
|
||||||
|
try:
|
||||||
|
stream = json.loads(dynamodbstreams_backends[self.region_name].describe_stream(spec['EventSourceArn']))
|
||||||
|
spec.update({'FunctionArn': func.function_arn})
|
||||||
|
esm = EventSourceMapping(spec)
|
||||||
|
self._event_source_mappings[esm.uuid] = esm
|
||||||
|
table_name = stream['StreamDescription']['TableName']
|
||||||
|
table = dynamodb_backends2[self.region_name].get_table(table_name)
|
||||||
|
table.lambda_event_source_mappings[esm.function_arn] = esm
|
||||||
|
|
||||||
|
return esm
|
||||||
|
except Exception:
|
||||||
|
pass # No DynamoDB stream exists
|
||||||
raise RESTError('ResourceNotFoundException', 'Invalid EventSourceArn')
|
raise RESTError('ResourceNotFoundException', 'Invalid EventSourceArn')
|
||||||
|
|
||||||
def publish_function(self, function_name):
|
def publish_function(self, function_name):
|
||||||
@ -811,6 +825,19 @@ class LambdaBackend(BaseBackend):
|
|||||||
func = self._lambdas.get_function(function_name, qualifier)
|
func = self._lambdas.get_function(function_name, qualifier)
|
||||||
func.invoke(json.dumps(event), {}, {})
|
func.invoke(json.dumps(event), {}, {})
|
||||||
|
|
||||||
|
def send_dynamodb_items(self, function_arn, items, source):
|
||||||
|
event = {'Records': [
|
||||||
|
{
|
||||||
|
'eventID': item.to_json()['eventID'],
|
||||||
|
'eventName': 'INSERT',
|
||||||
|
'eventVersion': item.to_json()['eventVersion'],
|
||||||
|
'eventSource': item.to_json()['eventSource'],
|
||||||
|
'awsRegion': 'us-east-1',
|
||||||
|
'dynamodb': item.to_json()['dynamodb'],
|
||||||
|
'eventSourceARN': source} for item in items]}
|
||||||
|
func = self._lambdas.get_arn(function_arn)
|
||||||
|
func.invoke(json.dumps(event), {}, {})
|
||||||
|
|
||||||
def list_tags(self, resource):
|
def list_tags(self, resource):
|
||||||
return self.get_function_by_arn(resource).tags
|
return self.get_function_by_arn(resource).tags
|
||||||
|
|
||||||
|
@ -409,6 +409,15 @@ class StreamShard(BaseModel):
|
|||||||
seq = len(self.items) + self.starting_sequence_number
|
seq = len(self.items) + self.starting_sequence_number
|
||||||
self.items.append(
|
self.items.append(
|
||||||
StreamRecord(self.table, t, event_name, old, new, seq))
|
StreamRecord(self.table, t, event_name, old, new, seq))
|
||||||
|
result = None
|
||||||
|
from moto.awslambda import lambda_backends
|
||||||
|
for arn, esm in self.table.lambda_event_source_mappings.items():
|
||||||
|
region = arn[len('arn:aws:lambda:'):arn.index(':', len('arn:aws:lambda:'))]
|
||||||
|
|
||||||
|
result = lambda_backends[region].send_dynamodb_items(arn, self.items, esm.event_source_arn)
|
||||||
|
|
||||||
|
if result:
|
||||||
|
self.items = []
|
||||||
|
|
||||||
def get(self, start, quantity):
|
def get(self, start, quantity):
|
||||||
start -= self.starting_sequence_number
|
start -= self.starting_sequence_number
|
||||||
@ -451,6 +460,7 @@ class Table(BaseModel):
|
|||||||
# 'AttributeName': 'string' # Can contain this
|
# 'AttributeName': 'string' # Can contain this
|
||||||
}
|
}
|
||||||
self.set_stream_specification(streams)
|
self.set_stream_specification(streams)
|
||||||
|
self.lambda_event_source_mappings = {}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
|
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
|
||||||
|
@ -12,7 +12,7 @@ import zipfile
|
|||||||
import sure # noqa
|
import sure # noqa
|
||||||
|
|
||||||
from freezegun import freeze_time
|
from freezegun import freeze_time
|
||||||
from moto import mock_lambda, mock_s3, mock_ec2, mock_sns, mock_logs, settings, mock_sqs
|
from moto import mock_dynamodb2, mock_lambda, mock_s3, mock_ec2, mock_sns, mock_logs, settings, mock_sqs
|
||||||
from nose.tools import assert_raises
|
from nose.tools import assert_raises
|
||||||
from botocore.exceptions import ClientError
|
from botocore.exceptions import ClientError
|
||||||
|
|
||||||
@ -1027,6 +1027,54 @@ def test_invoke_function_from_sqs():
|
|||||||
assert False, "Test Failed"
|
assert False, "Test Failed"
|
||||||
|
|
||||||
|
|
||||||
|
@mock_logs
|
||||||
|
@mock_lambda
|
||||||
|
@mock_dynamodb2
|
||||||
|
def test_invoke_function_from_dynamodb():
|
||||||
|
logs_conn = boto3.client("logs")
|
||||||
|
dynamodb = boto3.client('dynamodb')
|
||||||
|
table_name = 'table_with_stream'
|
||||||
|
table = dynamodb.create_table(TableName=table_name,
|
||||||
|
KeySchema=[{'AttributeName':'id','KeyType':'HASH'}],
|
||||||
|
AttributeDefinitions=[{'AttributeName':'id','AttributeType':'S'}],
|
||||||
|
StreamSpecification={'StreamEnabled': True,
|
||||||
|
'StreamViewType': 'NEW_AND_OLD_IMAGES'})
|
||||||
|
|
||||||
|
conn = boto3.client('lambda')
|
||||||
|
func = conn.create_function(FunctionName='testFunction', Runtime='python2.7',
|
||||||
|
Role='test-iam-role',
|
||||||
|
Handler='lambda_function.lambda_handler',
|
||||||
|
Code={'ZipFile': get_test_zip_file3()},
|
||||||
|
Description='test lambda function executed after a DynamoDB table is updated',
|
||||||
|
Timeout=3, MemorySize=128, Publish=True)
|
||||||
|
|
||||||
|
response = conn.create_event_source_mapping(
|
||||||
|
EventSourceArn=table['TableDescription']['LatestStreamArn'],
|
||||||
|
FunctionName=func['FunctionArn']
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response['EventSourceArn'] == table['TableDescription']['LatestStreamArn']
|
||||||
|
assert response['State'] == 'Enabled'
|
||||||
|
|
||||||
|
dynamodb.put_item(TableName=table_name, Item={'id': { 'S': 'item 1' }})
|
||||||
|
start = time.time()
|
||||||
|
while (time.time() - start) < 30:
|
||||||
|
result = logs_conn.describe_log_streams(logGroupName='/aws/lambda/testFunction')
|
||||||
|
log_streams = result.get('logStreams')
|
||||||
|
if not log_streams:
|
||||||
|
time.sleep(1)
|
||||||
|
continue
|
||||||
|
|
||||||
|
assert len(log_streams) == 1
|
||||||
|
result = logs_conn.get_log_events(logGroupName='/aws/lambda/testFunction', logStreamName=log_streams[0]['logStreamName'])
|
||||||
|
for event in result.get('events'):
|
||||||
|
if event['message'] == 'get_test_zip_file3 success':
|
||||||
|
return
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
assert False, "Test Failed"
|
||||||
|
|
||||||
|
|
||||||
@mock_logs
|
@mock_logs
|
||||||
@mock_lambda
|
@mock_lambda
|
||||||
@mock_sqs
|
@mock_sqs
|
||||||
|
Loading…
Reference in New Issue
Block a user