From 038ff620b2e1e67fde707b38ee518e4a53088249 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Mon, 24 Feb 2020 09:28:52 +0000 Subject: [PATCH] DDB Streams - Bugfix where processed items are resend every time --- moto/awslambda/models.py | 2 +- tests/test_awslambda/test_lambda.py | 60 ++++++++++++++++++----------- 2 files changed, 39 insertions(+), 23 deletions(-) diff --git a/moto/awslambda/models.py b/moto/awslambda/models.py index 939952d5e..9cdf2397c 100644 --- a/moto/awslambda/models.py +++ b/moto/awslambda/models.py @@ -981,7 +981,7 @@ class LambdaBackend(BaseBackend): ] } func = self._lambdas.get_arn(function_arn) - func.invoke(json.dumps(event), {}, {}) + return func.invoke(json.dumps(event), {}, {}) def list_tags(self, resource): return self.get_function_by_arn(resource).tags diff --git a/tests/test_awslambda/test_lambda.py b/tests/test_awslambda/test_lambda.py index 48539c0e6..eb8453e43 100644 --- a/tests/test_awslambda/test_lambda.py +++ b/tests/test_awslambda/test_lambda.py @@ -70,6 +70,7 @@ def lambda_handler(event, context): def get_test_zip_file3(): pfunc = """ def lambda_handler(event, context): + print("Nr_of_records("+str(len(event['Records']))+")") print("get_test_zip_file3 success") return event """ @@ -1139,9 +1140,13 @@ def test_invoke_function_from_sqs(): expected_msg = "get_test_zip_file3 success" log_group = "/aws/lambda/testFunction" - msg_showed_up = wait_for_log_msg(expected_msg, log_group) + msg_showed_up, all_logs = wait_for_log_msg(expected_msg, log_group) - assert msg_showed_up, "Message was not found in log_group, so sending an SQS message did not result in a successful Lambda execution" + assert msg_showed_up, ( + expected_msg + + " was not found after sending an SQS message. All logs: " + + all_logs + ) @mock_logs @@ -1185,9 +1190,11 @@ def test_invoke_function_from_dynamodb_put(): expected_msg = "get_test_zip_file3 success" log_group = "/aws/lambda/testFunction" - msg_showed_up = wait_for_log_msg(expected_msg, log_group) + msg_showed_up, all_logs = wait_for_log_msg(expected_msg, log_group) - assert msg_showed_up, "Message was not found in log_group, so inserting DynamoDB did not result in a successful Lambda execution" + assert msg_showed_up, ( + expected_msg + " was not found after a DDB insert. All logs: " + all_logs + ) @mock_logs @@ -1205,7 +1212,6 @@ def test_invoke_function_from_dynamodb_update(): "StreamViewType": "NEW_AND_OLD_IMAGES", }, ) - dynamodb.put_item(TableName=table_name, Item={"id": {"S": "item 1"}}) conn = boto3.client("lambda", region_name="us-east-1") func = conn.create_function( @@ -1220,13 +1226,17 @@ def test_invoke_function_from_dynamodb_update(): Publish=True, ) - response = conn.create_event_source_mapping( + conn.create_event_source_mapping( EventSourceArn=table["TableDescription"]["LatestStreamArn"], FunctionName=func["FunctionArn"], ) - assert response["EventSourceArn"] == table["TableDescription"]["LatestStreamArn"] - assert response["State"] == "Enabled" + dynamodb.put_item(TableName=table_name, Item={"id": {"S": "item 1"}}) + log_group = "/aws/lambda/testFunction" + expected_msg = "get_test_zip_file3 success" + msg_showed_up, all_logs = wait_for_log_msg(expected_msg, log_group) + assert "Nr_of_records(1)" in all_logs, "Only one item should be inserted" + dynamodb.update_item( TableName=table_name, Key={"id": {"S": "item 1"}}, @@ -1234,33 +1244,39 @@ def test_invoke_function_from_dynamodb_update(): ExpressionAttributeNames={"#attr": "new_attr"}, ExpressionAttributeValues={":val": {"S": "new_val"}}, ) - expected_msg = "get_test_zip_file3 success" - log_group = "/aws/lambda/testFunction" - msg_showed_up = wait_for_log_msg(expected_msg, log_group) + msg_showed_up, all_logs = wait_for_log_msg(expected_msg, log_group) - assert msg_showed_up, "Message was not found in log_group, so updating DynamoDB did not result in a successful Lambda execution" + assert msg_showed_up, ( + expected_msg + " was not found after updating DDB. All logs: " + str(all_logs) + ) + assert "Nr_of_records(1)" in all_logs, "Only one item should be updated" + assert ( + "Nr_of_records(2)" not in all_logs + ), "The inserted item should not show up again" def wait_for_log_msg(expected_msg, log_group): logs_conn = boto3.client("logs", region_name="us-east-1") + received_messages = [] start = time.time() - while (time.time() - start) < 30: + while (time.time() - start) < 10: result = logs_conn.describe_log_streams(logGroupName=log_group) log_streams = result.get("logStreams") if not log_streams: time.sleep(1) continue - assert len(log_streams) == 1 - result = logs_conn.get_log_events( - logGroupName=log_group, - logStreamName=log_streams[0]["logStreamName"], - ) - for event in result.get("events"): - if event["message"] == expected_msg: - return True + for log_stream in log_streams: + result = logs_conn.get_log_events( + logGroupName=log_group, logStreamName=log_stream["logStreamName"], + ) + received_messages.extend( + [event["message"] for event in result.get("events")] + ) + if expected_msg in received_messages: + return True, received_messages time.sleep(1) - return False + return False, received_messages @mock_logs