From c5a91e6cc6f6c9c0ab207f2e9f36fcec85febfaa Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Wed, 1 Mar 2023 14:03:20 -0100 Subject: [PATCH] Lambda+SQS: Allow FIFO queues to be used as event sources (#5998) --- moto/awslambda/models.py | 26 ++++---- tests/test_sqs/test_integration.py | 66 -------------------- tests/test_sqs/test_sqs_integration.py | 86 +++++++++++++++++++++++--- 3 files changed, 94 insertions(+), 84 deletions(-) delete mode 100644 tests/test_sqs/test_integration.py diff --git a/moto/awslambda/models.py b/moto/awslambda/models.py index 096b983c7..168cf0894 100644 --- a/moto/awslambda/models.py +++ b/moto/awslambda/models.py @@ -1643,19 +1643,15 @@ class LambdaBackend(BaseBackend): raise RESTError( "ResourceConflictException", "The resource already exists." ) - if queue.fifo_queue: - raise RESTError( - "InvalidParameterValueException", f"{queue.queue_arn} is FIFO" - ) - else: - spec.update({"FunctionArn": func.function_arn}) - esm = EventSourceMapping(spec) - self._event_source_mappings[esm.uuid] = esm + spec.update({"FunctionArn": func.function_arn}) + esm = EventSourceMapping(spec) + self._event_source_mappings[esm.uuid] = esm - # Set backend function on queue - queue.lambda_event_source_mappings[esm.function_arn] = esm + # Set backend function on queue + queue.lambda_event_source_mappings[esm.function_arn] = esm + + return esm - return esm ddbstream_backend = dynamodbstreams_backends[self.account_id][self.region_name] ddb_backend = dynamodb_backends[self.account_id][self.region_name] for stream in json.loads(ddbstream_backend.list_streams())["Streams"]: @@ -1792,6 +1788,14 @@ class LambdaBackend(BaseBackend): } ] } + if queue_arn.endswith(".fifo"): + # Messages from FIFO queue have additional attributes + event["Records"][0]["attributes"].update( + { + "MessageGroupId": message.group_id, + "MessageDeduplicationId": message.deduplication_id, + } + ) request_headers: Dict[str, Any] = {} response_headers: Dict[str, Any] = {} diff --git a/tests/test_sqs/test_integration.py b/tests/test_sqs/test_integration.py deleted file mode 100644 index f9e6da573..000000000 --- a/tests/test_sqs/test_integration.py +++ /dev/null @@ -1,66 +0,0 @@ -import json -import time -import uuid -import boto3 -import sure # noqa # pylint: disable=unused-import -from moto import mock_sqs, mock_lambda, mock_logs - -from tests.test_awslambda.test_lambda import get_test_zip_file1, get_role_name - - -@mock_logs -@mock_lambda -@mock_sqs -def test_invoke_function_from_sqs_exception(): - logs_conn = boto3.client("logs", region_name="us-east-1") - sqs = boto3.resource("sqs", region_name="us-east-1") - queue = sqs.create_queue(QueueName="test-sqs-queue1") - - conn = boto3.client("lambda", region_name="us-east-1") - func = conn.create_function( - FunctionName="testFunction", - Runtime="python2.7", - Role=get_role_name(), - Handler="lambda_function.lambda_handler", - Code={"ZipFile": get_test_zip_file1()}, - Description="test lambda function", - Timeout=3, - MemorySize=128, - Publish=True, - ) - - response = conn.create_event_source_mapping( - EventSourceArn=queue.attributes["QueueArn"], FunctionName=func["FunctionArn"] - ) - - assert response["EventSourceArn"] == queue.attributes["QueueArn"] - assert response["State"] == "Enabled" - - entries = [ - { - "Id": "1", - "MessageBody": json.dumps({"uuid": str(uuid.uuid4()), "test": "test"}), - } - ] - - queue.send_messages(Entries=entries) - - start = time.time() - while (time.time() - start) < 30: - result = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction") - log_streams = result.get("logStreams") - if not log_streams: - time.sleep(1) - continue - assert len(log_streams) >= 1 - - result = logs_conn.get_log_events( - logGroupName="/aws/lambda/testFunction", - logStreamName=log_streams[0]["logStreamName"], - ) - for event in result.get("events"): - if "custom log event" in event["message"]: - return - time.sleep(1) - - assert False, "Test Failed" diff --git a/tests/test_sqs/test_sqs_integration.py b/tests/test_sqs/test_sqs_integration.py index 7a7e5c14b..3aded4216 100644 --- a/tests/test_sqs/test_sqs_integration.py +++ b/tests/test_sqs/test_sqs_integration.py @@ -5,19 +5,22 @@ import uuid from moto import mock_lambda, mock_sqs, mock_logs from tests.test_awslambda.test_lambda import get_test_zip_file1, get_role_name +from tests.test_awslambda.utilities import get_test_zip_file_print_event @mock_logs @mock_lambda @mock_sqs -def test_invoke_function_from_sqs_exception(): +def test_invoke_function_from_sqs_queue(): logs_conn = boto3.client("logs", region_name="us-east-1") sqs = boto3.resource("sqs", region_name="us-east-1") - queue = sqs.create_queue(QueueName="test-sqs-queue1") + queue_name = str(uuid.uuid4())[0:6] + queue = sqs.create_queue(QueueName=queue_name) + fn_name = str(uuid.uuid4())[0:6] conn = boto3.client("lambda", region_name="us-east-1") func = conn.create_function( - FunctionName="testFunction", + FunctionName=fn_name, Runtime="python2.7", Role=get_role_name(), Handler="lambda_function.lambda_handler", @@ -46,20 +49,89 @@ def test_invoke_function_from_sqs_exception(): start = time.time() while (time.time() - start) < 30: - result = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction") + result = logs_conn.describe_log_streams(logGroupName=f"/aws/lambda/{fn_name}") log_streams = result.get("logStreams") if not log_streams: - time.sleep(1) + time.sleep(0.5) continue assert len(log_streams) >= 1 result = logs_conn.get_log_events( - logGroupName="/aws/lambda/testFunction", + logGroupName=f"/aws/lambda/{fn_name}", logStreamName=log_streams[0]["logStreamName"], ) for event in result.get("events"): if "custom log event" in event["message"]: return - time.sleep(1) + time.sleep(0.5) + + assert False, "Test Failed" + + +@mock_logs +@mock_lambda +@mock_sqs +def test_invoke_function_from_sqs_fifo_queue(): + """ + Create a FIFO Queue + Create a Lambda Function + Send a message to the queue + Verify the Lambda has been invoked with the correct message body + (By checking the resulting log files) + """ + logs_conn = boto3.client("logs", region_name="us-east-1") + sqs = boto3.resource("sqs", region_name="us-east-1") + queue_name = str(uuid.uuid4())[0:6] + ".fifo" + queue = sqs.create_queue(QueueName=queue_name, Attributes={"FifoQueue": "true"}) + + fn_name = str(uuid.uuid4())[0:6] + conn = boto3.client("lambda", region_name="us-east-1") + func = conn.create_function( + FunctionName=fn_name, + Runtime="python3.7", + Role=get_role_name(), + Handler="lambda_function.lambda_handler", + Code={"ZipFile": get_test_zip_file_print_event()}, + Timeout=3, + MemorySize=128, + Publish=True, + ) + + response = conn.create_event_source_mapping( + EventSourceArn=queue.attributes["QueueArn"], FunctionName=func["FunctionArn"] + ) + + assert response["EventSourceArn"] == queue.attributes["QueueArn"] + assert response["State"] == "Enabled" + + entries = [{"Id": "1", "MessageBody": "some body", "MessageGroupId": "mg1"}] + + queue.send_messages(Entries=entries) + + start = time.time() + while (time.time() - start) < 30: + result = logs_conn.describe_log_streams(logGroupName=f"/aws/lambda/{fn_name}") + log_streams = result.get("logStreams") + if not log_streams: + time.sleep(0.5) + continue + assert len(log_streams) >= 1 + + result = logs_conn.get_log_events( + logGroupName=f"/aws/lambda/{fn_name}", + logStreamName=log_streams[0]["logStreamName"], + ) + + for event in result.get("events"): + try: + body = json.loads(event.get("message")) + atts = body["Records"][0]["attributes"] + atts.should.have.key("MessageGroupId").equals("mg1") + atts.should.have.key("MessageDeduplicationId") + return + except: # noqa: E722 Do not use bare except + pass + + time.sleep(0.5) assert False, "Test Failed"