From 0dd41d4c32e5ae5f5a8a1c2dccc0a271d883b139 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Sun, 14 Jun 2020 12:03:00 -0300 Subject: [PATCH] Cloudformation support for EventSourceMapping (#3045) * change line position for uuid and last_modified because they're not input parameters * add event_source_arn validator and setter * refactor batch_size as setter * add helper function to parse arn and return source service * fix for EventSource's create_from_cfn, there was no reference in the lambda object for the esm if created by cfn * add esm deletion by cloudformation * remove unused variable in test * add cfn's update * add complete implementation of delete_from_cfn * blacked changed files * fix test with invalid batchsize for sqs * Dynamodb2 Table - Bugfix for localindex and implemented get_cfn_attributes * Dynamodb2 eventsource - fix test to use StreamArn attribute * Lambda Test - fix test_update_event_source_mapping --- moto/awslambda/models.py | 142 +++++++----- moto/dynamodb2/models/__init__.py | 10 + tests/test_awslambda/test_lambda.py | 3 +- .../test_lambda_cloudformation.py | 207 +++++++++++++++++- .../test_cloudformation_stack_crud.py | 3 +- 5 files changed, 313 insertions(+), 52 deletions(-) diff --git a/moto/awslambda/models.py b/moto/awslambda/models.py index 967944b91..91ecc4287 100644 --- a/moto/awslambda/models.py +++ b/moto/awslambda/models.py @@ -555,40 +555,63 @@ class LambdaFunction(BaseModel): class EventSourceMapping(BaseModel): def __init__(self, spec): # required - self.function_arn = spec["FunctionArn"] + self.function_name = spec["FunctionName"] self.event_source_arn = spec["EventSourceArn"] + + # optional + self.batch_size = spec.get("BatchSize") + self.starting_position = spec.get("StartingPosition", "TRIM_HORIZON") + self.enabled = spec.get("Enabled", True) + self.starting_position_timestamp = spec.get("StartingPositionTimestamp", None) + + self.function_arn = spec["FunctionArn"] self.uuid = str(uuid.uuid4()) self.last_modified = time.mktime(datetime.datetime.utcnow().timetuple()) - # BatchSize service default/max mapping - batch_size_map = { + def _get_service_source_from_arn(self, event_source_arn): + return event_source_arn.split(":")[2].lower() + + def _validate_event_source(self, event_source_arn): + valid_services = ("dynamodb", "kinesis", "sqs") + service = self._get_service_source_from_arn(event_source_arn) + return True if service in valid_services else False + + @property + def event_source_arn(self): + return self._event_source_arn + + @event_source_arn.setter + def event_source_arn(self, event_source_arn): + if not self._validate_event_source(event_source_arn): + raise ValueError( + "InvalidParameterValueException", "Unsupported event source type" + ) + self._event_source_arn = event_source_arn + + @property + def batch_size(self): + return self._batch_size + + @batch_size.setter + def batch_size(self, batch_size): + batch_size_service_map = { "kinesis": (100, 10000), "dynamodb": (100, 1000), "sqs": (10, 10), } - source_type = self.event_source_arn.split(":")[2].lower() - batch_size_entry = batch_size_map.get(source_type) - if batch_size_entry: - # Use service default if not provided - batch_size = int(spec.get("BatchSize", batch_size_entry[0])) - if batch_size > batch_size_entry[1]: - raise ValueError( - "InvalidParameterValueException", - "BatchSize {} exceeds the max of {}".format( - batch_size, batch_size_entry[1] - ), - ) - else: - self.batch_size = batch_size - else: - raise ValueError( - "InvalidParameterValueException", "Unsupported event source type" - ) - # optional - self.starting_position = spec.get("StartingPosition", "TRIM_HORIZON") - self.enabled = spec.get("Enabled", True) - self.starting_position_timestamp = spec.get("StartingPositionTimestamp", None) + source_type = self._get_service_source_from_arn(self.event_source_arn) + batch_size_for_source = batch_size_service_map[source_type] + + if batch_size is None: + self._batch_size = batch_size_for_source[0] + elif batch_size > batch_size_for_source[1]: + error_message = "BatchSize {} exceeds the max of {}".format( + batch_size, batch_size_for_source[1] + ) + raise ValueError("InvalidParameterValueException", error_message) + else: + self._batch_size = int(batch_size) def get_configuration(self): return { @@ -602,23 +625,42 @@ class EventSourceMapping(BaseModel): "StateTransitionReason": "User initiated", } + def delete(self, region_name): + lambda_backend = lambda_backends[region_name] + lambda_backend.delete_event_source_mapping(self.uuid) + @classmethod def create_from_cloudformation_json( cls, resource_name, cloudformation_json, region_name ): properties = cloudformation_json["Properties"] - func = lambda_backends[region_name].get_function(properties["FunctionName"]) - spec = { - "FunctionArn": func.function_arn, - "EventSourceArn": properties["EventSourceArn"], - "StartingPosition": properties["StartingPosition"], - "BatchSize": properties.get("BatchSize", 100), - } - optional_properties = "BatchSize Enabled StartingPositionTimestamp".split() - for prop in optional_properties: - if prop in properties: - spec[prop] = properties[prop] - return EventSourceMapping(spec) + lambda_backend = lambda_backends[region_name] + return lambda_backend.create_event_source_mapping(properties) + + @classmethod + def update_from_cloudformation_json( + cls, new_resource_name, cloudformation_json, original_resource, region_name + ): + properties = cloudformation_json["Properties"] + event_source_uuid = original_resource.uuid + lambda_backend = lambda_backends[region_name] + return lambda_backend.update_event_source_mapping(event_source_uuid, properties) + + @classmethod + def delete_from_cloudformation_json( + cls, resource_name, cloudformation_json, region_name + ): + properties = cloudformation_json["Properties"] + lambda_backend = lambda_backends[region_name] + esms = lambda_backend.list_event_source_mappings( + event_source_arn=properties["EventSourceArn"], + function_name=properties["FunctionName"], + ) + + for esm in esms: + if esm.logical_resource_id in resource_name: + lambda_backend.delete_event_source_mapping + esm.delete(region_name) class LambdaVersion(BaseModel): @@ -819,7 +861,7 @@ class LambdaBackend(BaseBackend): ) # Validate function name - func = self._lambdas.get_function_by_name_or_arn(spec.pop("FunctionName", "")) + func = self._lambdas.get_function_by_name_or_arn(spec.get("FunctionName", "")) if not func: raise RESTError("ResourceNotFoundException", "Invalid FunctionName") @@ -877,18 +919,20 @@ class LambdaBackend(BaseBackend): def update_event_source_mapping(self, uuid, spec): esm = self.get_event_source_mapping(uuid) - if esm: - if spec.get("FunctionName"): - func = self._lambdas.get_function_by_name_or_arn( - spec.get("FunctionName") - ) + if not esm: + return False + + for key, value in spec.items(): + if key == "FunctionName": + func = self._lambdas.get_function_by_name_or_arn(spec[key]) esm.function_arn = func.function_arn - if "BatchSize" in spec: - esm.batch_size = spec["BatchSize"] - if "Enabled" in spec: - esm.enabled = spec["Enabled"] - return esm - return False + elif key == "BatchSize": + esm.batch_size = spec[key] + elif key == "Enabled": + esm.enabled = spec[key] + + esm.last_modified = time.mktime(datetime.datetime.utcnow().timetuple()) + return esm def list_event_source_mappings(self, event_source_arn, function_name): esms = list(self._event_source_mappings.values()) diff --git a/moto/dynamodb2/models/__init__.py b/moto/dynamodb2/models/__init__.py index 48b4bbbfd..13ee94948 100644 --- a/moto/dynamodb2/models/__init__.py +++ b/moto/dynamodb2/models/__init__.py @@ -386,6 +386,16 @@ class Table(BaseModel): }, } + def get_cfn_attribute(self, attribute_name): + from moto.cloudformation.exceptions import UnformattedGetAttTemplateException + + if attribute_name == "Arn": + return self.table_arn + elif attribute_name == "StreamArn" and self.stream_specification: + return self.describe()["TableDescription"]["LatestStreamArn"] + + raise UnformattedGetAttTemplateException() + @classmethod def create_from_cloudformation_json( cls, resource_name, cloudformation_json, region_name diff --git a/tests/test_awslambda/test_lambda.py b/tests/test_awslambda/test_lambda.py index 8879ad7e3..1cd943f04 100644 --- a/tests/test_awslambda/test_lambda.py +++ b/tests/test_awslambda/test_lambda.py @@ -1446,11 +1446,12 @@ def test_update_event_source_mapping(): assert response["State"] == "Enabled" mapping = conn.update_event_source_mapping( - UUID=response["UUID"], Enabled=False, BatchSize=15, FunctionName="testFunction2" + UUID=response["UUID"], Enabled=False, BatchSize=2, FunctionName="testFunction2" ) assert mapping["UUID"] == response["UUID"] assert mapping["FunctionArn"] == func2["FunctionArn"] assert mapping["State"] == "Disabled" + assert mapping["BatchSize"] == 2 @mock_lambda diff --git a/tests/test_awslambda/test_lambda_cloudformation.py b/tests/test_awslambda/test_lambda_cloudformation.py index f57354d69..c3061ff3a 100644 --- a/tests/test_awslambda/test_lambda_cloudformation.py +++ b/tests/test_awslambda/test_lambda_cloudformation.py @@ -3,7 +3,7 @@ import io import sure # noqa import zipfile from botocore.exceptions import ClientError -from moto import mock_cloudformation, mock_iam, mock_lambda, mock_s3 +from moto import mock_cloudformation, mock_iam, mock_lambda, mock_s3, mock_sqs from nose.tools import assert_raises from string import Template from uuid import uuid4 @@ -48,6 +48,23 @@ template = Template( }""" ) +event_source_mapping_template = Template( + """{ + "AWSTemplateFormatVersion": "2010-09-09", + "Resources": { + "$resource_name": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "BatchSize": $batch_size, + "EventSourceArn": $event_source_arn, + "FunctionName": $function_name, + "Enabled": $enabled + } + } + } +}""" +) + @mock_cloudformation @mock_lambda @@ -97,6 +114,194 @@ def test_lambda_can_be_deleted_by_cloudformation(): e.exception.response["Error"]["Code"].should.equal("ResourceNotFoundException") +@mock_cloudformation +@mock_lambda +@mock_s3 +@mock_sqs +def test_event_source_mapping_create_from_cloudformation_json(): + sqs = boto3.resource("sqs", region_name="us-east-1") + s3 = boto3.client("s3", "us-east-1") + cf = boto3.client("cloudformation", region_name="us-east-1") + lmbda = boto3.client("lambda", region_name="us-east-1") + + queue = sqs.create_queue(QueueName="test-sqs-queue1") + + # Creates lambda + _, lambda_stack = create_stack(cf, s3) + created_fn_name = get_created_function_name(cf, lambda_stack) + created_fn_arn = lmbda.get_function(FunctionName=created_fn_name)["Configuration"][ + "FunctionArn" + ] + + template = event_source_mapping_template.substitute( + { + "resource_name": "Foo", + "batch_size": 1, + "event_source_arn": queue.attributes["QueueArn"], + "function_name": created_fn_name, + "enabled": True, + } + ) + + cf.create_stack(StackName="test-event-source", TemplateBody=template) + event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name) + + event_sources["EventSourceMappings"].should.have.length_of(1) + event_source = event_sources["EventSourceMappings"][0] + event_source["EventSourceArn"].should.be.equal(queue.attributes["QueueArn"]) + event_source["FunctionArn"].should.be.equal(created_fn_arn) + + +@mock_cloudformation +@mock_lambda +@mock_s3 +@mock_sqs +def test_event_source_mapping_delete_stack(): + sqs = boto3.resource("sqs", region_name="us-east-1") + s3 = boto3.client("s3", "us-east-1") + cf = boto3.client("cloudformation", region_name="us-east-1") + lmbda = boto3.client("lambda", region_name="us-east-1") + + queue = sqs.create_queue(QueueName="test-sqs-queue1") + + # Creates lambda + _, lambda_stack = create_stack(cf, s3) + created_fn_name = get_created_function_name(cf, lambda_stack) + + template = event_source_mapping_template.substitute( + { + "resource_name": "Foo", + "batch_size": 1, + "event_source_arn": queue.attributes["QueueArn"], + "function_name": created_fn_name, + "enabled": True, + } + ) + + esm_stack = cf.create_stack(StackName="test-event-source", TemplateBody=template) + event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name) + + event_sources["EventSourceMappings"].should.have.length_of(1) + + cf.delete_stack(StackName=esm_stack["StackId"]) + event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name) + + event_sources["EventSourceMappings"].should.have.length_of(0) + + +@mock_cloudformation +@mock_lambda +@mock_s3 +@mock_sqs +def test_event_source_mapping_update_from_cloudformation_json(): + sqs = boto3.resource("sqs", region_name="us-east-1") + s3 = boto3.client("s3", "us-east-1") + cf = boto3.client("cloudformation", region_name="us-east-1") + lmbda = boto3.client("lambda", region_name="us-east-1") + + queue = sqs.create_queue(QueueName="test-sqs-queue1") + + # Creates lambda + _, lambda_stack = create_stack(cf, s3) + created_fn_name = get_created_function_name(cf, lambda_stack) + created_fn_arn = lmbda.get_function(FunctionName=created_fn_name)["Configuration"][ + "FunctionArn" + ] + + original_template = event_source_mapping_template.substitute( + { + "resource_name": "Foo", + "batch_size": 1, + "event_source_arn": queue.attributes["QueueArn"], + "function_name": created_fn_name, + "enabled": True, + } + ) + + cf.create_stack(StackName="test-event-source", TemplateBody=original_template) + event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name) + original_esm = event_sources["EventSourceMappings"][0] + + original_esm["State"].should.equal("Enabled") + original_esm["BatchSize"].should.equal(1) + + # Update + new_template = event_source_mapping_template.substitute( + { + "resource_name": "Foo", + "batch_size": 10, + "event_source_arn": queue.attributes["QueueArn"], + "function_name": created_fn_name, + "enabled": False, + } + ) + + cf.update_stack(StackName="test-event-source", TemplateBody=new_template) + event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name) + updated_esm = event_sources["EventSourceMappings"][0] + + updated_esm["State"].should.equal("Disabled") + updated_esm["BatchSize"].should.equal(10) + + +@mock_cloudformation +@mock_lambda +@mock_s3 +@mock_sqs +def test_event_source_mapping_delete_from_cloudformation_json(): + sqs = boto3.resource("sqs", region_name="us-east-1") + s3 = boto3.client("s3", "us-east-1") + cf = boto3.client("cloudformation", region_name="us-east-1") + lmbda = boto3.client("lambda", region_name="us-east-1") + + queue = sqs.create_queue(QueueName="test-sqs-queue1") + + # Creates lambda + _, lambda_stack = create_stack(cf, s3) + created_fn_name = get_created_function_name(cf, lambda_stack) + created_fn_arn = lmbda.get_function(FunctionName=created_fn_name)["Configuration"][ + "FunctionArn" + ] + + original_template = event_source_mapping_template.substitute( + { + "resource_name": "Foo", + "batch_size": 1, + "event_source_arn": queue.attributes["QueueArn"], + "function_name": created_fn_name, + "enabled": True, + } + ) + + cf.create_stack(StackName="test-event-source", TemplateBody=original_template) + event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name) + original_esm = event_sources["EventSourceMappings"][0] + + original_esm["State"].should.equal("Enabled") + original_esm["BatchSize"].should.equal(1) + + # Update with deletion of old resources + new_template = event_source_mapping_template.substitute( + { + "resource_name": "Bar", # changed name + "batch_size": 10, + "event_source_arn": queue.attributes["QueueArn"], + "function_name": created_fn_name, + "enabled": False, + } + ) + + cf.update_stack(StackName="test-event-source", TemplateBody=new_template) + event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name) + + event_sources["EventSourceMappings"].should.have.length_of(1) + updated_esm = event_sources["EventSourceMappings"][0] + + updated_esm["State"].should.equal("Disabled") + updated_esm["BatchSize"].should.equal(10) + updated_esm["UUID"].shouldnt.equal(original_esm["UUID"]) + + def create_stack(cf, s3): bucket_name = str(uuid4()) s3.create_bucket(Bucket=bucket_name) diff --git a/tests/test_cloudformation/test_cloudformation_stack_crud.py b/tests/test_cloudformation/test_cloudformation_stack_crud.py index 3d1b2ab8c..8a0a0b11c 100644 --- a/tests/test_cloudformation/test_cloudformation_stack_crud.py +++ b/tests/test_cloudformation/test_cloudformation_stack_crud.py @@ -541,13 +541,14 @@ def test_create_stack_lambda_and_dynamodb(): "ReadCapacityUnits": 10, "WriteCapacityUnits": 10, }, + "StreamSpecification": {"StreamViewType": "KEYS_ONLY"}, }, }, "func1mapping": { "Type": "AWS::Lambda::EventSourceMapping", "Properties": { "FunctionName": {"Ref": "func1"}, - "EventSourceArn": "arn:aws:dynamodb:region:XXXXXX:table/tab1/stream/2000T00:00:00.000", + "EventSourceArn": {"Fn::GetAtt": ["tab1", "StreamArn"]}, "StartingPosition": "0", "BatchSize": 100, "Enabled": True,