From cab030f4a09b8266de1d6148ad76969b3707dc13 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Tue, 30 Jan 2024 20:51:15 +0000 Subject: [PATCH] Lambda: Support Kinesis as EventSourceMapping target (#7275) --- .../workflows/tests_terraform_examples.yml | 2 +- .gitignore | 1 + moto/awslambda/models.py | 55 ++++++++++++++- moto/awslambda/responses.py | 7 +- moto/kinesis/models.py | 27 ++++++-- .../awslambda/event_source_mapping.tf | 67 +++++++++++++++++++ other_langs/terraform/awslambda/lambda.js | 0 other_langs/terraform/awslambda/provider.tf | 44 ++++++++++++ .../test_lambda_eventsourcemapping.py | 58 +++++++++++++++- 9 files changed, 251 insertions(+), 10 deletions(-) create mode 100644 other_langs/terraform/awslambda/event_source_mapping.tf create mode 100644 other_langs/terraform/awslambda/lambda.js create mode 100644 other_langs/terraform/awslambda/provider.tf diff --git a/.github/workflows/tests_terraform_examples.yml b/.github/workflows/tests_terraform_examples.yml index 56e8f4daf..751f98c06 100644 --- a/.github/workflows/tests_terraform_examples.yml +++ b/.github/workflows/tests_terraform_examples.yml @@ -11,7 +11,7 @@ jobs: strategy: fail-fast: false matrix: - service: ["acm", "cloudfront", "elb", "route53"] + service: ["acm", "awslambda", "cloudfront", "elb", "route53", "sqs"] steps: - uses: actions/checkout@v4 diff --git a/.gitignore b/.gitignore index e34bcfd32..956ec003f 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ other_langs/tests_dotnet/ExampleTestProject/obj other_langs/tests_ruby/Gemfile.lock other_langs/terraform/*/.terraform* other_langs/terraform/*/terraform* +other_langs/terraform/awslambda/lambda_function_payload.zip diff --git a/moto/awslambda/models.py b/moto/awslambda/models.py index fe19adced..aef9c49b4 100644 --- a/moto/awslambda/models.py +++ b/moto/awslambda/models.py @@ -33,11 +33,12 @@ from moto.ecr.exceptions import ImageNotFoundException from moto.ecr.models import ecr_backends from moto.iam.exceptions import IAMNotFoundException from moto.iam.models import iam_backends +from moto.kinesis.models import KinesisBackend, kinesis_backends from moto.logs.models import logs_backends from moto.moto_api._internal import mock_random as random from moto.s3.exceptions import MissingBucket, MissingKey from moto.s3.models import FakeKey, s3_backends -from moto.sqs import sqs_backends +from moto.sqs.models import sqs_backends from moto.utilities.docker_utilities import DockerModel from moto.utilities.utils import load_resource_as_bytes @@ -1258,7 +1259,7 @@ class EventSourceMapping(CloudFormationModel): self.enabled = spec.get("Enabled", True) self.starting_position_timestamp = spec.get("StartingPositionTimestamp", None) - self.function_arn = spec["FunctionArn"] + self.function_arn: str = spec["FunctionArn"] self.uuid = str(random.uuid4()) self.last_modified = time.mktime(utcnow().timetuple()) @@ -1266,6 +1267,7 @@ class EventSourceMapping(CloudFormationModel): return event_source_arn.split(":")[2].lower() def _validate_event_source(self, event_source_arn: str) -> bool: + valid_services = ("dynamodb", "kinesis", "sqs") service = self._get_service_source_from_arn(event_source_arn) return service in valid_services @@ -1314,9 +1316,10 @@ class EventSourceMapping(CloudFormationModel): "EventSourceArn": self.event_source_arn, "FunctionArn": self.function_arn, "LastModified": self.last_modified, - "LastProcessingResult": "", + "LastProcessingResult": None, "State": "Enabled" if self.enabled else "Disabled", "StateTransitionReason": "User initiated", + "StartingPosition": self.starting_position, } def delete(self, account_id: str, region_name: str) -> None: @@ -2037,6 +2040,18 @@ class LambdaBackend(BaseBackend): table = ddb_backend.get_table(table_name) table.lambda_event_source_mappings[esm.function_arn] = esm return esm + + kinesis_backend: KinesisBackend = kinesis_backends[self.account_id][ + self.region_name + ] + for stream in kinesis_backend.streams.values(): + if stream.arn == spec["EventSourceArn"]: + spec.update({"FunctionArn": func.function_arn}) + esm = EventSourceMapping(spec) + self._event_source_mappings[esm.uuid] = esm + stream.lambda_event_source_mappings[esm.event_source_arn] = esm + return esm + raise RESTError("ResourceNotFoundException", "Invalid EventSourceArn") def publish_layer_version(self, spec: Dict[str, Any]) -> LayerVersion: @@ -2186,6 +2201,40 @@ class LambdaBackend(BaseBackend): ) return "x-amz-function-error" not in response_headers + def send_kinesis_message( + self, + function_name: str, + kinesis_stream: str, + kinesis_partition_key: str, + kinesis_sequence_number: str, + kinesis_data: str, + kinesis_shard_id: str, + ) -> None: + func = self._lambdas.get_function_by_name_or_arn_with_qualifier( + function_name, qualifier=None + ) + event = { + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": kinesis_partition_key, + "sequenceNumber": kinesis_sequence_number, + "data": kinesis_data, + "approximateArrivalTimestamp": round(time.time(), 3), + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": f"{kinesis_shard_id}:{kinesis_sequence_number}", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": func.role, + "awsRegion": self.region_name, + "eventSourceARN": kinesis_stream, + } + ] + } + func.invoke(json.dumps(event), {}, {}) + def send_sns_message( self, function_name: str, diff --git a/moto/awslambda/responses.py b/moto/awslambda/responses.py index ac5544bb3..3951c0c76 100644 --- a/moto/awslambda/responses.py +++ b/moto/awslambda/responses.py @@ -370,7 +370,12 @@ class LambdaResponse(BaseResponse): if result: return 200, {}, json.dumps(result.get_configuration()) else: - return 404, {}, "{}" + err = { + "Type": "User", + "Message": "The resource you requested does not exist.", + } + headers = {"x-amzn-errortype": "ResourceNotFoundException"} + return 404, headers, json.dumps(err) def _update_event_source_mapping(self, uuid: str) -> TYPE_RESPONSE: result = self.backend.update_event_source_mapping(uuid, self.json_body) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index 2590ca39f..bab8acda0 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -7,7 +7,7 @@ from base64 import b64decode, b64encode from collections import OrderedDict from gzip import GzipFile from operator import attrgetter -from typing import Any, Dict, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel, CloudFormationModel @@ -39,6 +39,9 @@ from .utils import ( decompose_shard_iterator, ) +if TYPE_CHECKING: + from moto.awslambda.models import EventSourceMapping + class Consumer(BaseModel): def __init__( @@ -218,6 +221,7 @@ class Stream(CloudFormationModel): self.encryption_type = "NONE" self.key_id: Optional[str] = None self.consumers: List[Consumer] = [] + self.lambda_event_source_mappings: Dict[str, "EventSourceMapping"] = {} def delete_consumer(self, consumer_arn: str) -> None: self.consumers = [c for c in self.consumers if c.consumer_arn != consumer_arn] @@ -406,10 +410,25 @@ class Stream(CloudFormationModel): def put_record( self, partition_key: str, explicit_hash_key: str, data: str ) -> Tuple[str, str]: - shard = self.get_shard_for_key(partition_key, explicit_hash_key) + shard: Shard = self.get_shard_for_key(partition_key, explicit_hash_key) # type: ignore - sequence_number = shard.put_record(partition_key, data, explicit_hash_key) # type: ignore - return sequence_number, shard.shard_id # type: ignore + sequence_number = shard.put_record(partition_key, data, explicit_hash_key) + + from moto.awslambda.utils import get_backend + + for arn, esm in self.lambda_event_source_mappings.items(): + region = arn.split(":")[3] + + get_backend(self.account_id, region).send_kinesis_message( + function_name=esm.function_arn, + kinesis_stream=self.arn, + kinesis_data=data, + kinesis_shard_id=shard.shard_id, + kinesis_partition_key=partition_key, + kinesis_sequence_number=sequence_number, + ) + + return sequence_number, shard.shard_id def to_json(self, shard_limit: Optional[int] = None) -> Dict[str, Any]: all_shards = list(self.shards.values()) diff --git a/other_langs/terraform/awslambda/event_source_mapping.tf b/other_langs/terraform/awslambda/event_source_mapping.tf new file mode 100644 index 000000000..6e43b9419 --- /dev/null +++ b/other_langs/terraform/awslambda/event_source_mapping.tf @@ -0,0 +1,67 @@ +resource "aws_kinesis_stream" "test_stream" { + name = "terraform-kinesis-test" + shard_count = 1 + retention_period = 48 + + shard_level_metrics = [ + "IncomingBytes", + "OutgoingBytes", + ] + + stream_mode_details { + stream_mode = "PROVISIONED" + } + + tags = { + Environment = "test" + } +} + +data "aws_iam_policy_document" "assume_role" { + statement { + effect = "Allow" + + principals { + type = "Service" + identifiers = ["lambda.amazonaws.com"] + } + + actions = ["sts:AssumeRole"] + } +} + +resource "aws_iam_role" "iam_for_lambda" { + name = "iam_for_lambda" + assume_role_policy = data.aws_iam_policy_document.assume_role.json +} + +data "archive_file" "lambda" { + type = "zip" + source_file = "lambda.js" + output_path = "lambda_function_payload.zip" +} + +resource "aws_lambda_function" "test_lambda" { + # If the file is not in the current working directory you will need to include a + # path.module in the filename. + filename = "lambda_function_payload.zip" + function_name = "lambda_function_name" + role = aws_iam_role.iam_for_lambda.arn + handler = "index.test" + + source_code_hash = data.archive_file.lambda.output_base64sha256 + + runtime = "nodejs18.x" + + environment { + variables = { + foo = "bar" + } + } +} + +resource "aws_lambda_event_source_mapping" "kinesis_to_sqs" { + event_source_arn = aws_kinesis_stream.test_stream.arn + function_name = aws_lambda_function.test_lambda.arn + starting_position = "LATEST" +} \ No newline at end of file diff --git a/other_langs/terraform/awslambda/lambda.js b/other_langs/terraform/awslambda/lambda.js new file mode 100644 index 000000000..e69de29bb diff --git a/other_langs/terraform/awslambda/provider.tf b/other_langs/terraform/awslambda/provider.tf new file mode 100644 index 000000000..8b87a1c0e --- /dev/null +++ b/other_langs/terraform/awslambda/provider.tf @@ -0,0 +1,44 @@ +provider "aws" { + region = "us-east-1" + s3_use_path_style = true + skip_credentials_validation = true + skip_metadata_api_check = true + skip_requesting_account_id = true + + endpoints { + acm = "http://localhost:5000" + apigateway = "http://localhost:5000" + cloudformation = "http://localhost:5000" + cloudwatch = "http://localhost:5000" + dynamodb = "http://localhost:5000" + es = "http://localhost:5000" + firehose = "http://localhost:5000" + iam = "http://localhost:5000" + kinesis = "http://localhost:5000" + lambda = "http://localhost:5000" + route53 = "http://localhost:5000" + redshift = "http://localhost:5000" + s3 = "http://localhost:5000" + secretsmanager = "http://localhost:5000" + ses = "http://localhost:5000" + sns = "http://localhost:5000" + sqs = "http://localhost:5000" + ssm = "http://localhost:5000" + stepfunctions = "http://localhost:5000" + sts = "http://localhost:5000" + ec2 = "http://localhost:5000" + } + + access_key = "my-access-key" + secret_key = "my-secret-key" +} + +terraform { + required_providers { + + aws = { + source = "hashicorp/aws" + version = "4.67.0" + } + } +} \ No newline at end of file diff --git a/tests/test_awslambda/test_lambda_eventsourcemapping.py b/tests/test_awslambda/test_lambda_eventsourcemapping.py index c441af71d..e5cefaf98 100644 --- a/tests/test_awslambda/test_lambda_eventsourcemapping.py +++ b/tests/test_awslambda/test_lambda_eventsourcemapping.py @@ -298,7 +298,7 @@ def test_invoke_function_from_sns(): TopicArn=topic_arn, Protocol="lambda", Endpoint=result["FunctionArn"] ) - result = sns_conn.publish(TopicArn=topic_arn, Message=json.dumps({})) + sns_conn.publish(TopicArn=topic_arn, Message=json.dumps({})) start = time.time() events = [] @@ -326,6 +326,62 @@ def test_invoke_function_from_sns(): assert False, "Expected message not found in logs:" + str(events) +@pytest.mark.network +@mock_aws +@requires_docker +def test_invoke_function_from_kinesis(): + logs_conn = boto3.client("logs", region_name=_lambda_region) + kinesis = boto3.client("kinesis", region_name=_lambda_region) + stream_name = "my_stream" + + kinesis.create_stream(StreamName=stream_name, ShardCount=2) + resp = kinesis.describe_stream(StreamName=stream_name) + kinesis_arn = resp["StreamDescription"]["StreamARN"] + + conn = boto3.client("lambda", _lambda_region) + function_name = str(uuid.uuid4())[0:6] + func = conn.create_function( + FunctionName=function_name, + Runtime=PYTHON_VERSION, + Role=get_role_name(), + Handler="lambda_function.lambda_handler", + Code={"ZipFile": get_test_zip_file3()}, + ) + + conn.create_event_source_mapping( + EventSourceArn=kinesis_arn, + FunctionName=func["FunctionArn"], + ) + + # Send Data + kinesis.put_record(StreamName=stream_name, Data="data", PartitionKey="1") + + start = time.time() + events = [] + while (time.time() - start) < 10: + result = logs_conn.describe_log_streams( + logGroupName=f"/aws/lambda/{function_name}" + ) + log_streams = result.get("logStreams") + if not log_streams: + time.sleep(1) + continue + + assert len(log_streams) == 1 + result = logs_conn.get_log_events( + logGroupName=f"/aws/lambda/{function_name}", + logStreamName=log_streams[0]["logStreamName"], + ) + events = result.get("events") + for event in events: + if event["message"] == "get_test_zip_file3 success": + return + + time.sleep(0.5) + + assert False, "Expected message not found in logs:" + str(events) + + @mock_aws def test_list_event_source_mappings(): function_name = str(uuid.uuid4())[0:6]