Cloudformation support for EventSourceMapping (#3045)
* change line position for uuid and last_modified because they're not input parameters * add event_source_arn validator and setter * refactor batch_size as setter * add helper function to parse arn and return source service * fix for EventSource's create_from_cfn, there was no reference in the lambda object for the esm if created by cfn * add esm deletion by cloudformation * remove unused variable in test * add cfn's update * add complete implementation of delete_from_cfn * blacked changed files * fix test with invalid batchsize for sqs * Dynamodb2 Table - Bugfix for localindex and implemented get_cfn_attributes * Dynamodb2 eventsource - fix test to use StreamArn attribute * Lambda Test - fix test_update_event_source_mapping
This commit is contained in:
parent
849f16ff2d
commit
0dd41d4c32
@ -555,40 +555,63 @@ class LambdaFunction(BaseModel):
|
||||
class EventSourceMapping(BaseModel):
|
||||
def __init__(self, spec):
|
||||
# required
|
||||
self.function_arn = spec["FunctionArn"]
|
||||
self.function_name = spec["FunctionName"]
|
||||
self.event_source_arn = spec["EventSourceArn"]
|
||||
|
||||
# optional
|
||||
self.batch_size = spec.get("BatchSize")
|
||||
self.starting_position = spec.get("StartingPosition", "TRIM_HORIZON")
|
||||
self.enabled = spec.get("Enabled", True)
|
||||
self.starting_position_timestamp = spec.get("StartingPositionTimestamp", None)
|
||||
|
||||
self.function_arn = spec["FunctionArn"]
|
||||
self.uuid = str(uuid.uuid4())
|
||||
self.last_modified = time.mktime(datetime.datetime.utcnow().timetuple())
|
||||
|
||||
# BatchSize service default/max mapping
|
||||
batch_size_map = {
|
||||
def _get_service_source_from_arn(self, event_source_arn):
|
||||
return event_source_arn.split(":")[2].lower()
|
||||
|
||||
def _validate_event_source(self, event_source_arn):
|
||||
valid_services = ("dynamodb", "kinesis", "sqs")
|
||||
service = self._get_service_source_from_arn(event_source_arn)
|
||||
return True if service in valid_services else False
|
||||
|
||||
@property
|
||||
def event_source_arn(self):
|
||||
return self._event_source_arn
|
||||
|
||||
@event_source_arn.setter
|
||||
def event_source_arn(self, event_source_arn):
|
||||
if not self._validate_event_source(event_source_arn):
|
||||
raise ValueError(
|
||||
"InvalidParameterValueException", "Unsupported event source type"
|
||||
)
|
||||
self._event_source_arn = event_source_arn
|
||||
|
||||
@property
|
||||
def batch_size(self):
|
||||
return self._batch_size
|
||||
|
||||
@batch_size.setter
|
||||
def batch_size(self, batch_size):
|
||||
batch_size_service_map = {
|
||||
"kinesis": (100, 10000),
|
||||
"dynamodb": (100, 1000),
|
||||
"sqs": (10, 10),
|
||||
}
|
||||
source_type = self.event_source_arn.split(":")[2].lower()
|
||||
batch_size_entry = batch_size_map.get(source_type)
|
||||
if batch_size_entry:
|
||||
# Use service default if not provided
|
||||
batch_size = int(spec.get("BatchSize", batch_size_entry[0]))
|
||||
if batch_size > batch_size_entry[1]:
|
||||
raise ValueError(
|
||||
"InvalidParameterValueException",
|
||||
"BatchSize {} exceeds the max of {}".format(
|
||||
batch_size, batch_size_entry[1]
|
||||
),
|
||||
)
|
||||
else:
|
||||
self.batch_size = batch_size
|
||||
else:
|
||||
raise ValueError(
|
||||
"InvalidParameterValueException", "Unsupported event source type"
|
||||
)
|
||||
|
||||
# optional
|
||||
self.starting_position = spec.get("StartingPosition", "TRIM_HORIZON")
|
||||
self.enabled = spec.get("Enabled", True)
|
||||
self.starting_position_timestamp = spec.get("StartingPositionTimestamp", None)
|
||||
source_type = self._get_service_source_from_arn(self.event_source_arn)
|
||||
batch_size_for_source = batch_size_service_map[source_type]
|
||||
|
||||
if batch_size is None:
|
||||
self._batch_size = batch_size_for_source[0]
|
||||
elif batch_size > batch_size_for_source[1]:
|
||||
error_message = "BatchSize {} exceeds the max of {}".format(
|
||||
batch_size, batch_size_for_source[1]
|
||||
)
|
||||
raise ValueError("InvalidParameterValueException", error_message)
|
||||
else:
|
||||
self._batch_size = int(batch_size)
|
||||
|
||||
def get_configuration(self):
|
||||
return {
|
||||
@ -602,23 +625,42 @@ class EventSourceMapping(BaseModel):
|
||||
"StateTransitionReason": "User initiated",
|
||||
}
|
||||
|
||||
def delete(self, region_name):
|
||||
lambda_backend = lambda_backends[region_name]
|
||||
lambda_backend.delete_event_source_mapping(self.uuid)
|
||||
|
||||
@classmethod
|
||||
def create_from_cloudformation_json(
|
||||
cls, resource_name, cloudformation_json, region_name
|
||||
):
|
||||
properties = cloudformation_json["Properties"]
|
||||
func = lambda_backends[region_name].get_function(properties["FunctionName"])
|
||||
spec = {
|
||||
"FunctionArn": func.function_arn,
|
||||
"EventSourceArn": properties["EventSourceArn"],
|
||||
"StartingPosition": properties["StartingPosition"],
|
||||
"BatchSize": properties.get("BatchSize", 100),
|
||||
}
|
||||
optional_properties = "BatchSize Enabled StartingPositionTimestamp".split()
|
||||
for prop in optional_properties:
|
||||
if prop in properties:
|
||||
spec[prop] = properties[prop]
|
||||
return EventSourceMapping(spec)
|
||||
lambda_backend = lambda_backends[region_name]
|
||||
return lambda_backend.create_event_source_mapping(properties)
|
||||
|
||||
@classmethod
|
||||
def update_from_cloudformation_json(
|
||||
cls, new_resource_name, cloudformation_json, original_resource, region_name
|
||||
):
|
||||
properties = cloudformation_json["Properties"]
|
||||
event_source_uuid = original_resource.uuid
|
||||
lambda_backend = lambda_backends[region_name]
|
||||
return lambda_backend.update_event_source_mapping(event_source_uuid, properties)
|
||||
|
||||
@classmethod
|
||||
def delete_from_cloudformation_json(
|
||||
cls, resource_name, cloudformation_json, region_name
|
||||
):
|
||||
properties = cloudformation_json["Properties"]
|
||||
lambda_backend = lambda_backends[region_name]
|
||||
esms = lambda_backend.list_event_source_mappings(
|
||||
event_source_arn=properties["EventSourceArn"],
|
||||
function_name=properties["FunctionName"],
|
||||
)
|
||||
|
||||
for esm in esms:
|
||||
if esm.logical_resource_id in resource_name:
|
||||
lambda_backend.delete_event_source_mapping
|
||||
esm.delete(region_name)
|
||||
|
||||
|
||||
class LambdaVersion(BaseModel):
|
||||
@ -819,7 +861,7 @@ class LambdaBackend(BaseBackend):
|
||||
)
|
||||
|
||||
# Validate function name
|
||||
func = self._lambdas.get_function_by_name_or_arn(spec.pop("FunctionName", ""))
|
||||
func = self._lambdas.get_function_by_name_or_arn(spec.get("FunctionName", ""))
|
||||
if not func:
|
||||
raise RESTError("ResourceNotFoundException", "Invalid FunctionName")
|
||||
|
||||
@ -877,18 +919,20 @@ class LambdaBackend(BaseBackend):
|
||||
|
||||
def update_event_source_mapping(self, uuid, spec):
|
||||
esm = self.get_event_source_mapping(uuid)
|
||||
if esm:
|
||||
if spec.get("FunctionName"):
|
||||
func = self._lambdas.get_function_by_name_or_arn(
|
||||
spec.get("FunctionName")
|
||||
)
|
||||
if not esm:
|
||||
return False
|
||||
|
||||
for key, value in spec.items():
|
||||
if key == "FunctionName":
|
||||
func = self._lambdas.get_function_by_name_or_arn(spec[key])
|
||||
esm.function_arn = func.function_arn
|
||||
if "BatchSize" in spec:
|
||||
esm.batch_size = spec["BatchSize"]
|
||||
if "Enabled" in spec:
|
||||
esm.enabled = spec["Enabled"]
|
||||
return esm
|
||||
return False
|
||||
elif key == "BatchSize":
|
||||
esm.batch_size = spec[key]
|
||||
elif key == "Enabled":
|
||||
esm.enabled = spec[key]
|
||||
|
||||
esm.last_modified = time.mktime(datetime.datetime.utcnow().timetuple())
|
||||
return esm
|
||||
|
||||
def list_event_source_mappings(self, event_source_arn, function_name):
|
||||
esms = list(self._event_source_mappings.values())
|
||||
|
@ -386,6 +386,16 @@ class Table(BaseModel):
|
||||
},
|
||||
}
|
||||
|
||||
def get_cfn_attribute(self, attribute_name):
|
||||
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
|
||||
|
||||
if attribute_name == "Arn":
|
||||
return self.table_arn
|
||||
elif attribute_name == "StreamArn" and self.stream_specification:
|
||||
return self.describe()["TableDescription"]["LatestStreamArn"]
|
||||
|
||||
raise UnformattedGetAttTemplateException()
|
||||
|
||||
@classmethod
|
||||
def create_from_cloudformation_json(
|
||||
cls, resource_name, cloudformation_json, region_name
|
||||
|
@ -1446,11 +1446,12 @@ def test_update_event_source_mapping():
|
||||
assert response["State"] == "Enabled"
|
||||
|
||||
mapping = conn.update_event_source_mapping(
|
||||
UUID=response["UUID"], Enabled=False, BatchSize=15, FunctionName="testFunction2"
|
||||
UUID=response["UUID"], Enabled=False, BatchSize=2, FunctionName="testFunction2"
|
||||
)
|
||||
assert mapping["UUID"] == response["UUID"]
|
||||
assert mapping["FunctionArn"] == func2["FunctionArn"]
|
||||
assert mapping["State"] == "Disabled"
|
||||
assert mapping["BatchSize"] == 2
|
||||
|
||||
|
||||
@mock_lambda
|
||||
|
@ -3,7 +3,7 @@ import io
|
||||
import sure # noqa
|
||||
import zipfile
|
||||
from botocore.exceptions import ClientError
|
||||
from moto import mock_cloudformation, mock_iam, mock_lambda, mock_s3
|
||||
from moto import mock_cloudformation, mock_iam, mock_lambda, mock_s3, mock_sqs
|
||||
from nose.tools import assert_raises
|
||||
from string import Template
|
||||
from uuid import uuid4
|
||||
@ -48,6 +48,23 @@ template = Template(
|
||||
}"""
|
||||
)
|
||||
|
||||
event_source_mapping_template = Template(
|
||||
"""{
|
||||
"AWSTemplateFormatVersion": "2010-09-09",
|
||||
"Resources": {
|
||||
"$resource_name": {
|
||||
"Type": "AWS::Lambda::EventSourceMapping",
|
||||
"Properties": {
|
||||
"BatchSize": $batch_size,
|
||||
"EventSourceArn": $event_source_arn,
|
||||
"FunctionName": $function_name,
|
||||
"Enabled": $enabled
|
||||
}
|
||||
}
|
||||
}
|
||||
}"""
|
||||
)
|
||||
|
||||
|
||||
@mock_cloudformation
|
||||
@mock_lambda
|
||||
@ -97,6 +114,194 @@ def test_lambda_can_be_deleted_by_cloudformation():
|
||||
e.exception.response["Error"]["Code"].should.equal("ResourceNotFoundException")
|
||||
|
||||
|
||||
@mock_cloudformation
|
||||
@mock_lambda
|
||||
@mock_s3
|
||||
@mock_sqs
|
||||
def test_event_source_mapping_create_from_cloudformation_json():
|
||||
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||
s3 = boto3.client("s3", "us-east-1")
|
||||
cf = boto3.client("cloudformation", region_name="us-east-1")
|
||||
lmbda = boto3.client("lambda", region_name="us-east-1")
|
||||
|
||||
queue = sqs.create_queue(QueueName="test-sqs-queue1")
|
||||
|
||||
# Creates lambda
|
||||
_, lambda_stack = create_stack(cf, s3)
|
||||
created_fn_name = get_created_function_name(cf, lambda_stack)
|
||||
created_fn_arn = lmbda.get_function(FunctionName=created_fn_name)["Configuration"][
|
||||
"FunctionArn"
|
||||
]
|
||||
|
||||
template = event_source_mapping_template.substitute(
|
||||
{
|
||||
"resource_name": "Foo",
|
||||
"batch_size": 1,
|
||||
"event_source_arn": queue.attributes["QueueArn"],
|
||||
"function_name": created_fn_name,
|
||||
"enabled": True,
|
||||
}
|
||||
)
|
||||
|
||||
cf.create_stack(StackName="test-event-source", TemplateBody=template)
|
||||
event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name)
|
||||
|
||||
event_sources["EventSourceMappings"].should.have.length_of(1)
|
||||
event_source = event_sources["EventSourceMappings"][0]
|
||||
event_source["EventSourceArn"].should.be.equal(queue.attributes["QueueArn"])
|
||||
event_source["FunctionArn"].should.be.equal(created_fn_arn)
|
||||
|
||||
|
||||
@mock_cloudformation
|
||||
@mock_lambda
|
||||
@mock_s3
|
||||
@mock_sqs
|
||||
def test_event_source_mapping_delete_stack():
|
||||
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||
s3 = boto3.client("s3", "us-east-1")
|
||||
cf = boto3.client("cloudformation", region_name="us-east-1")
|
||||
lmbda = boto3.client("lambda", region_name="us-east-1")
|
||||
|
||||
queue = sqs.create_queue(QueueName="test-sqs-queue1")
|
||||
|
||||
# Creates lambda
|
||||
_, lambda_stack = create_stack(cf, s3)
|
||||
created_fn_name = get_created_function_name(cf, lambda_stack)
|
||||
|
||||
template = event_source_mapping_template.substitute(
|
||||
{
|
||||
"resource_name": "Foo",
|
||||
"batch_size": 1,
|
||||
"event_source_arn": queue.attributes["QueueArn"],
|
||||
"function_name": created_fn_name,
|
||||
"enabled": True,
|
||||
}
|
||||
)
|
||||
|
||||
esm_stack = cf.create_stack(StackName="test-event-source", TemplateBody=template)
|
||||
event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name)
|
||||
|
||||
event_sources["EventSourceMappings"].should.have.length_of(1)
|
||||
|
||||
cf.delete_stack(StackName=esm_stack["StackId"])
|
||||
event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name)
|
||||
|
||||
event_sources["EventSourceMappings"].should.have.length_of(0)
|
||||
|
||||
|
||||
@mock_cloudformation
|
||||
@mock_lambda
|
||||
@mock_s3
|
||||
@mock_sqs
|
||||
def test_event_source_mapping_update_from_cloudformation_json():
|
||||
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||
s3 = boto3.client("s3", "us-east-1")
|
||||
cf = boto3.client("cloudformation", region_name="us-east-1")
|
||||
lmbda = boto3.client("lambda", region_name="us-east-1")
|
||||
|
||||
queue = sqs.create_queue(QueueName="test-sqs-queue1")
|
||||
|
||||
# Creates lambda
|
||||
_, lambda_stack = create_stack(cf, s3)
|
||||
created_fn_name = get_created_function_name(cf, lambda_stack)
|
||||
created_fn_arn = lmbda.get_function(FunctionName=created_fn_name)["Configuration"][
|
||||
"FunctionArn"
|
||||
]
|
||||
|
||||
original_template = event_source_mapping_template.substitute(
|
||||
{
|
||||
"resource_name": "Foo",
|
||||
"batch_size": 1,
|
||||
"event_source_arn": queue.attributes["QueueArn"],
|
||||
"function_name": created_fn_name,
|
||||
"enabled": True,
|
||||
}
|
||||
)
|
||||
|
||||
cf.create_stack(StackName="test-event-source", TemplateBody=original_template)
|
||||
event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name)
|
||||
original_esm = event_sources["EventSourceMappings"][0]
|
||||
|
||||
original_esm["State"].should.equal("Enabled")
|
||||
original_esm["BatchSize"].should.equal(1)
|
||||
|
||||
# Update
|
||||
new_template = event_source_mapping_template.substitute(
|
||||
{
|
||||
"resource_name": "Foo",
|
||||
"batch_size": 10,
|
||||
"event_source_arn": queue.attributes["QueueArn"],
|
||||
"function_name": created_fn_name,
|
||||
"enabled": False,
|
||||
}
|
||||
)
|
||||
|
||||
cf.update_stack(StackName="test-event-source", TemplateBody=new_template)
|
||||
event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name)
|
||||
updated_esm = event_sources["EventSourceMappings"][0]
|
||||
|
||||
updated_esm["State"].should.equal("Disabled")
|
||||
updated_esm["BatchSize"].should.equal(10)
|
||||
|
||||
|
||||
@mock_cloudformation
|
||||
@mock_lambda
|
||||
@mock_s3
|
||||
@mock_sqs
|
||||
def test_event_source_mapping_delete_from_cloudformation_json():
|
||||
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||
s3 = boto3.client("s3", "us-east-1")
|
||||
cf = boto3.client("cloudformation", region_name="us-east-1")
|
||||
lmbda = boto3.client("lambda", region_name="us-east-1")
|
||||
|
||||
queue = sqs.create_queue(QueueName="test-sqs-queue1")
|
||||
|
||||
# Creates lambda
|
||||
_, lambda_stack = create_stack(cf, s3)
|
||||
created_fn_name = get_created_function_name(cf, lambda_stack)
|
||||
created_fn_arn = lmbda.get_function(FunctionName=created_fn_name)["Configuration"][
|
||||
"FunctionArn"
|
||||
]
|
||||
|
||||
original_template = event_source_mapping_template.substitute(
|
||||
{
|
||||
"resource_name": "Foo",
|
||||
"batch_size": 1,
|
||||
"event_source_arn": queue.attributes["QueueArn"],
|
||||
"function_name": created_fn_name,
|
||||
"enabled": True,
|
||||
}
|
||||
)
|
||||
|
||||
cf.create_stack(StackName="test-event-source", TemplateBody=original_template)
|
||||
event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name)
|
||||
original_esm = event_sources["EventSourceMappings"][0]
|
||||
|
||||
original_esm["State"].should.equal("Enabled")
|
||||
original_esm["BatchSize"].should.equal(1)
|
||||
|
||||
# Update with deletion of old resources
|
||||
new_template = event_source_mapping_template.substitute(
|
||||
{
|
||||
"resource_name": "Bar", # changed name
|
||||
"batch_size": 10,
|
||||
"event_source_arn": queue.attributes["QueueArn"],
|
||||
"function_name": created_fn_name,
|
||||
"enabled": False,
|
||||
}
|
||||
)
|
||||
|
||||
cf.update_stack(StackName="test-event-source", TemplateBody=new_template)
|
||||
event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name)
|
||||
|
||||
event_sources["EventSourceMappings"].should.have.length_of(1)
|
||||
updated_esm = event_sources["EventSourceMappings"][0]
|
||||
|
||||
updated_esm["State"].should.equal("Disabled")
|
||||
updated_esm["BatchSize"].should.equal(10)
|
||||
updated_esm["UUID"].shouldnt.equal(original_esm["UUID"])
|
||||
|
||||
|
||||
def create_stack(cf, s3):
|
||||
bucket_name = str(uuid4())
|
||||
s3.create_bucket(Bucket=bucket_name)
|
||||
|
@ -541,13 +541,14 @@ def test_create_stack_lambda_and_dynamodb():
|
||||
"ReadCapacityUnits": 10,
|
||||
"WriteCapacityUnits": 10,
|
||||
},
|
||||
"StreamSpecification": {"StreamViewType": "KEYS_ONLY"},
|
||||
},
|
||||
},
|
||||
"func1mapping": {
|
||||
"Type": "AWS::Lambda::EventSourceMapping",
|
||||
"Properties": {
|
||||
"FunctionName": {"Ref": "func1"},
|
||||
"EventSourceArn": "arn:aws:dynamodb:region:XXXXXX:table/tab1/stream/2000T00:00:00.000",
|
||||
"EventSourceArn": {"Fn::GetAtt": ["tab1", "StreamArn"]},
|
||||
"StartingPosition": "0",
|
||||
"BatchSize": 100,
|
||||
"Enabled": True,
|
||||
|
Loading…
Reference in New Issue
Block a user