Feature: Custom cloudformation resources (#4512)

This commit is contained in:
Bert Blommers 2021-11-03 20:00:42 -01:00 committed by GitHub
parent ff6d7a13c0
commit f923d0d1e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 928 additions and 165 deletions

View File

@ -72,7 +72,7 @@ class Deployment(CloudFormationModel, dict):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
rest_api_id = properties["RestApiId"]
@ -189,7 +189,7 @@ class Method(CloudFormationModel, dict):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
rest_api_id = properties["RestApiId"]
@ -268,7 +268,7 @@ class Resource(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
api_id = properties["RestApiId"]
@ -810,6 +810,10 @@ class RestAPI(CloudFormationModel):
if to_path(self.PROP_DESCRIPTON) in path:
self.description = ""
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["RootResourceId"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -834,7 +838,7 @@ class RestAPI(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
name = properties["Name"]

View File

@ -170,7 +170,7 @@ class FakeLaunchConfiguration(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -392,7 +392,7 @@ class FakeAutoScalingGroup(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]

View File

@ -201,7 +201,7 @@ class Permission(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
backend = lambda_backends[region_name]
@ -270,7 +270,7 @@ class LayerVersion(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
optional_properties = (
@ -649,6 +649,8 @@ class LambdaFunction(CloudFormationModel, DockerModel):
if body:
body = json.loads(body)
else:
body = "{}"
# Get the invocation type:
res, errored, logs = self._invoke_lambda(code=self.code, event=body)
@ -675,7 +677,7 @@ class LambdaFunction(CloudFormationModel, DockerModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
optional_properties = (
@ -715,6 +717,10 @@ class LambdaFunction(CloudFormationModel, DockerModel):
fn = backend.create_function(spec)
return fn
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Arn"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -735,7 +741,12 @@ class LambdaFunction(CloudFormationModel, DockerModel):
def _create_zipfile_from_plaintext_code(code):
zip_output = io.BytesIO()
zip_file = zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED)
zip_file.writestr("lambda_function.zip", code)
zip_file.writestr("index.py", code)
# This should really be part of the 'lambci' docker image
from moto.packages.cfnresponse import cfnresponse
with open(cfnresponse.__file__) as cfn:
zip_file.writestr("cfnresponse.py", cfn.read())
zip_file.close()
zip_output.seek(0)
return zip_output.read()
@ -832,7 +843,7 @@ class EventSourceMapping(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
lambda_backend = lambda_backends[region_name]
@ -885,7 +896,7 @@ class LambdaVersion(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
function_name = properties["FunctionName"]

View File

@ -93,7 +93,7 @@ class ComputeEnvironment(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
backend = batch_backends[region_name]
properties = cloudformation_json["Properties"]
@ -165,7 +165,7 @@ class JobQueue(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
backend = batch_backends[region_name]
properties = cloudformation_json["Properties"]
@ -349,7 +349,7 @@ class JobDefinition(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
backend = batch_backends[region_name]
properties = cloudformation_json["Properties"]

View File

@ -0,0 +1,90 @@
import json
import threading
from moto import settings
from moto.core.models import CloudFormationModel
from moto.awslambda import lambda_backends
from uuid import uuid4
class CustomModel(CloudFormationModel):
def __init__(self, region_name, request_id, logical_id, resource_name):
self.region_name = region_name
self.request_id = request_id
self.logical_id = logical_id
self.resource_name = resource_name
self.data = dict()
self._finished = False
def set_data(self, data):
self.data = data
self._finished = True
def is_created(self):
return self._finished
@property
def physical_resource_id(self):
return self.resource_name
@staticmethod
def cloudformation_type():
return "?"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name, **kwargs
):
logical_id = kwargs["LogicalId"]
stack_id = kwargs["StackId"]
resource_type = kwargs["ResourceType"]
properties = cloudformation_json["Properties"]
service_token = properties["ServiceToken"]
backend = lambda_backends[region_name]
fn = backend.get_function(service_token)
request_id = str(uuid4())
custom_resource = CustomModel(
region_name, request_id, logical_id, resource_name
)
from moto.cloudformation import cloudformation_backends
stack = cloudformation_backends[region_name].get_stack(stack_id)
stack.add_custom_resource(custom_resource)
event = {
"RequestType": "Create",
"ServiceToken": service_token,
# A request will be send to this URL to indicate success/failure
# This request will be coming from inside a Docker container
# Note that, in order to reach the Moto host, the Moto-server should be listening on 0.0.0.0
#
# Alternative: Maybe we should let the user pass in a container-name where Moto is running?
# Similar to how we know for sure that the container in our CI is called 'motoserver'
"ResponseURL": f"{settings.moto_server_host()}/cloudformation_{region_name}/cfnresponse?stack={stack_id}",
"StackId": stack_id,
"RequestId": request_id,
"LogicalResourceId": logical_id,
"ResourceType": resource_type,
"ResourceProperties": properties,
}
invoke_thread = threading.Thread(
target=fn.invoke, args=(json.dumps(event), {}, {})
)
invoke_thread.start()
return custom_resource
@classmethod
def has_cfn_attr(cls, attribute):
# We don't know which attributes are supported for third-party resources
return True
def get_cfn_attribute(self, attribute_name):
if attribute_name in self.data:
return self.data[attribute_name]
return None

View File

@ -41,6 +41,16 @@ class ExportNotFound(BadRequest):
)
class UnsupportedAttribute(ValidationError):
def __init__(self, resource, attr):
template = Template(ERROR_RESPONSE)
super(UnsupportedAttribute, self).__init__()
self.description = template.render(
code="ValidationError",
message=f"Template error: resource {resource} does not support attribute type {attr} in Fn::GetAtt",
)
ERROR_RESPONSE = """<ErrorResponse xmlns="http://cloudformation.amazonaws.com/doc/2010-05-15/">
<Error>
<Type>Sender</Type>

View File

@ -237,8 +237,12 @@ class FakeStack(BaseModel):
self.cross_stack_resources = cross_stack_resources or {}
self.resource_map = self._create_resource_map()
self.custom_resources = dict()
self.output_map = self._create_output_map()
self.creation_time = datetime.utcnow()
self.status = "CREATE_PENDING"
def _create_resource_map(self):
resource_map = ResourceMap(
@ -254,9 +258,7 @@ class FakeStack(BaseModel):
return resource_map
def _create_output_map(self):
output_map = OutputMap(self.resource_map, self.template_dict, self.stack_id)
output_map.create()
return output_map
return OutputMap(self.resource_map, self.template_dict, self.stack_id)
@property
def creation_time_iso_8601(self):
@ -319,17 +321,33 @@ class FakeStack(BaseModel):
@property
def stack_outputs(self):
return self.output_map.values()
return [v for v in self.output_map.values() if v]
@property
def exports(self):
return self.output_map.exports
def add_custom_resource(self, custom_resource):
self.custom_resources[custom_resource.logical_id] = custom_resource
def get_custom_resource(self, custom_resource):
return self.custom_resources[custom_resource]
def create_resources(self):
self.resource_map.create(self.template_dict)
self.status = "CREATE_IN_PROGRESS"
all_resources_ready = self.resource_map.create(self.template_dict)
# Set the description of the stack
self.description = self.template_dict.get("Description")
if all_resources_ready:
self.mark_creation_complete()
def verify_readiness(self):
if self.resource_map.creation_complete():
self.mark_creation_complete()
def mark_creation_complete(self):
self.status = "CREATE_COMPLETE"
self._add_stack_event("CREATE_COMPLETE")
def update(self, template, role_arn=None, parameters=None, tags=None):
self._add_stack_event(
@ -651,7 +669,6 @@ class CloudFormationBackend(BaseBackend):
"CREATE_IN_PROGRESS", resource_status_reason="User Initiated"
)
new_stack.create_resources()
new_stack._add_stack_event("CREATE_COMPLETE")
return new_stack
def create_change_set(

View File

@ -18,6 +18,7 @@ from moto.apigateway import models as apigateway_models # noqa
from moto.autoscaling import models as autoscaling_models # noqa
from moto.awslambda import models as awslambda_models # noqa
from moto.batch import models as batch_models # noqa
from moto.cloudformation.custom_model import CustomModel
from moto.cloudwatch import models as cloudwatch_models # noqa
from moto.datapipeline import models as datapipeline_models # noqa
from moto.dynamodb2 import models as dynamodb2_models # noqa
@ -52,6 +53,7 @@ from .exceptions import (
MissingParameterError,
UnformattedGetAttTemplateException,
ValidationError,
UnsupportedAttribute,
)
from moto.packages.boto.cloudformation.stack import Output
@ -215,6 +217,8 @@ def clean_json(resource_json, resources_map):
def resource_class_from_type(resource_type):
if resource_type in NULL_MODELS:
return None
if resource_type.startswith("Custom::"):
return CustomModel
if resource_type not in MODEL_MAP:
logger.warning("No Moto CloudFormation support for %s", resource_type)
return None
@ -317,8 +321,13 @@ def parse_and_create_resource(logical_id, resource_json, resources_map, region_n
if not resource_tuple:
return None
resource_class, resource_json, resource_physical_name = resource_tuple
kwargs = {
"LogicalId": logical_id,
"StackId": resources_map.stack_id,
"ResourceType": resource_type,
}
resource = resource_class.create_from_cloudformation_json(
resource_physical_name, resource_json, region_name
resource_physical_name, resource_json, region_name, **kwargs
)
resource.type = resource_type
resource.logical_resource_id = logical_id
@ -393,6 +402,8 @@ def parse_condition(condition, resources_map, condition_map):
def parse_output(output_logical_id, output_json, resources_map):
output_json = clean_json(output_json, resources_map)
if "Value" not in output_json:
return None
output = Output()
output.key = output_logical_id
output.value = clean_json(output_json["Value"], resources_map)
@ -424,6 +435,7 @@ class ResourceMap(collections_abc.Mapping):
self.tags = copy.deepcopy(tags)
self.resolved_parameters = {}
self.cross_stack_resources = cross_stack_resources
self.stack_id = stack_id
# Create the default resources
self._parsed_resources = {
@ -581,11 +593,27 @@ class ResourceMap(collections_abc.Mapping):
for condition_name in self.lazy_condition_map:
self.lazy_condition_map[condition_name]
def validate_outputs(self):
outputs = self._template.get("Outputs") or {}
for key, value in outputs.items():
value = value.get("Value", {})
if "Fn::GetAtt" in value:
resource_type = self._resource_json_map.get(value["Fn::GetAtt"][0])[
"Type"
]
attr = value["Fn::GetAtt"][1]
resource_class = resource_class_from_type(resource_type)
if not resource_class.has_cfn_attr(attr):
# AWS::SQS::Queue --> Queue
short_type = resource_type[resource_type.rindex(":") + 1 :]
raise UnsupportedAttribute(resource=short_type, attr=attr)
def load(self):
self.load_mapping()
self.transform_mapping()
self.load_parameters()
self.load_conditions()
self.validate_outputs()
def create(self, template):
# Since this is a lazy map, to create every object we just need to
@ -599,20 +627,31 @@ class ResourceMap(collections_abc.Mapping):
"aws:cloudformation:stack-id": self.get("AWS::StackId"),
}
)
all_resources_ready = True
for resource in self.__get_resources_in_dependency_order():
if isinstance(self[resource], ec2_models.TaggedEC2Resource):
instance = self[resource]
if isinstance(instance, ec2_models.TaggedEC2Resource):
self.tags["aws:cloudformation:logical-id"] = resource
ec2_models.ec2_backends[self._region_name].create_tags(
[self[resource].physical_resource_id], self.tags
[instance.physical_resource_id], self.tags
)
if instance and not instance.is_created():
all_resources_ready = False
return all_resources_ready
def creation_complete(self):
all_resources_ready = True
for resource in self.__get_resources_in_dependency_order():
instance = self[resource]
if instance and not instance.is_created():
all_resources_ready = False
return all_resources_ready
def build_resource_diff(self, other_template):
old = self._resource_json_map
new = other_template["Resources"]
resource_names_by_action = {"Add": {}, "Modify": {}, "Remove": {}}
resource_names_by_action = {
"Add": set(new) - set(old),
"Modify": set(
@ -766,7 +805,8 @@ class OutputMap(collections_abc.Mapping):
new_output = parse_output(
output_logical_id, output_json, self._resource_map
)
self._parsed_outputs[output_logical_id] = new_output
if new_output:
self._parsed_outputs[output_logical_id] = new_output
return new_output
def __iter__(self):
@ -792,10 +832,6 @@ class OutputMap(collections_abc.Mapping):
exports.append(Export(self._stack_id, cleaned_name, cleaned_value))
return exports
def create(self):
for output in self.outputs:
self[output]
class Export(object):
def __init__(self, exporting_stack_id, name, value):

View File

@ -40,6 +40,12 @@ class CloudFormationResponse(BaseResponse):
def cloudformation_backend(self):
return cloudformation_backends[self.region]
@classmethod
def cfnresponse(cls, *args, **kwargs):
request, full_url, headers = args
full_url += "&Action=ProcessCfnResponse"
return cls.dispatch(request=request, full_url=full_url, headers=headers)
def _get_stack_from_s3_url(self, template_url):
template_url_parts = urlparse(template_url)
if "localhost" in template_url:
@ -87,6 +93,19 @@ class CloudFormationResponse(BaseResponse):
raise MissingParameterError(parameter["parameter_key"])
return result
def process_cfn_response(self):
status = self._get_param("Status")
if status == "SUCCESS":
stack_id = self._get_param("StackId")
logical_resource_id = self._get_param("LogicalResourceId")
outputs = self._get_param("Data")
stack = self.cloudformation_backend.get_stack(stack_id)
custom_resource = stack.get_custom_resource(logical_resource_id)
custom_resource.set_data(outputs)
stack.verify_readiness()
return 200, {"status": 200}, json.dumps("{}")
def create_stack(self):
stack_name = self._get_param("StackName")
stack_body = self._get_param("TemplateBody")

View File

@ -2,4 +2,7 @@ from .responses import CloudFormationResponse
url_bases = [r"https?://cloudformation\.(.+)\.amazonaws\.com"]
url_paths = {"{0}/$": CloudFormationResponse.dispatch}
url_paths = {
"{0}/$": CloudFormationResponse.dispatch,
"{0}/cloudformation_(?P<region>[^/]+)/cfnresponse$": CloudFormationResponse.cfnresponse,
}

View File

@ -6,8 +6,7 @@ from moto.core.utils import (
iso_8601_datetime_without_milliseconds,
iso_8601_datetime_with_nanoseconds,
)
from moto.core import BaseBackend, BaseModel, CloudFormationModel
from moto.logs import logs_backends
from moto.core import BaseBackend, BaseModel
from datetime import datetime, timedelta
from dateutil.tz import tzutc
from uuid import uuid4
@ -677,33 +676,6 @@ class CloudWatchBackend(BaseBackend):
return None, metrics
class LogGroup(CloudFormationModel):
def __init__(self, spec):
# required
self.name = spec["LogGroupName"]
# optional
self.tags = spec.get("Tags", [])
@staticmethod
def cloudformation_name_type():
return "LogGroupName"
@staticmethod
def cloudformation_type():
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-logs-loggroup.html
return "AWS::Logs::LogGroup"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
properties = cloudformation_json["Properties"]
tags = properties.get("Tags", {})
return logs_backends[region_name].create_log_group(
resource_name, tags, **properties
)
cloudwatch_backends = {}
for region in Session().get_available_regions("cloudwatch"):
cloudwatch_backends[region] = CloudWatchBackend(region)

View File

@ -590,10 +590,17 @@ class CloudFormationModel(BaseModel):
# See for example https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-dynamodb-table.html
return "AWS::SERVICE::RESOURCE"
@classmethod
@abstractmethod
def has_cfn_attr(cls, attr):
# Used for validation
# If a template creates an Output for an attribute that does not exist, an error should be thrown
return True
@classmethod
@abstractmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
# This must be implemented as a classmethod with parameters:
# cls, resource_name, cloudformation_json, region_name
@ -624,6 +631,13 @@ class CloudFormationModel(BaseModel):
# and delete the resource. Do not include a return statement.
pass
@abstractmethod
def is_created(self):
# Verify whether the resource was created successfully
# Assume True after initialization
# Custom resources may need time after init before they are created successfully
return True
class BaseBackend:
def _reset_model_refs(self):

View File

@ -83,7 +83,7 @@ class Pipeline(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
datapipeline_backend = datapipeline_backends[region_name]
properties = cloudformation_json["Properties"]

View File

@ -151,7 +151,7 @@ class Table(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
key_attr = [
@ -300,6 +300,10 @@ class Table(CloudFormationModel):
item.attrs[attr].add(DynamoType(update["Value"]))
return item
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["StreamArn"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException

View File

@ -450,6 +450,10 @@ class Table(CloudFormationModel):
},
}
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Arn", "StreamArn"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -487,7 +491,7 @@ class Table(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
params = {}

View File

@ -376,7 +376,7 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -421,6 +421,10 @@ class NetworkInterface(TaggedEC2Resource, CloudFormationModel):
else:
return self._group_set
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["PrimaryPrivateIpAddress", "SecondaryPrivateIpAddresses"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -759,7 +763,7 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -993,6 +997,16 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
eni.attachment_id = None
eni.device_index = None
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in [
"AvailabilityZone",
"PrivateDnsName",
"PublicDnsName",
"PrivateIp",
"PublicIp",
]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -2251,7 +2265,7 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -2486,6 +2500,10 @@ class SecurityGroup(TaggedEC2Resource, CloudFormationModel):
return False
return True
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["GroupId"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -3118,7 +3136,7 @@ class SecurityGroupIngress(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -3195,7 +3213,7 @@ class VolumeAttachment(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -3245,7 +3263,7 @@ class Volume(TaggedEC2Resource, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -3590,7 +3608,7 @@ class VPC(TaggedEC2Resource, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -4263,7 +4281,7 @@ class VPCPeeringConnection(TaggedEC2Resource, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -4412,7 +4430,7 @@ class Subnet(TaggedEC2Resource, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -4487,6 +4505,10 @@ class Subnet(TaggedEC2Resource, CloudFormationModel):
else:
return super().get_filter_value(filter_name, "DescribeSubnets")
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["AvailabilityZone"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -4759,7 +4781,7 @@ class FlowLogs(TaggedEC2Resource, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -5021,7 +5043,7 @@ class SubnetRouteTableAssociation(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -5072,7 +5094,7 @@ class RouteTable(TaggedEC2Resource, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -5275,7 +5297,7 @@ class Route(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -5695,7 +5717,7 @@ class InternetGateway(TaggedEC2Resource, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
ec2_backend = ec2_backends[region_name]
return ec2_backend.create_internet_gateway()
@ -5880,9 +5902,11 @@ class EgressOnlyInternetGatewayBackend(object):
class VPCGatewayAttachment(CloudFormationModel):
def __init__(self, gateway_id, vpc_id):
self.gateway_id = gateway_id
# Represents both VPNGatewayAttachment and VPCGatewayAttachment
def __init__(self, vpc_id, gateway_id=None, state=None):
self.vpc_id = vpc_id
self.gateway_id = gateway_id
self.state = state
@staticmethod
def cloudformation_name_type():
@ -5895,7 +5919,7 @@ class VPCGatewayAttachment(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -5923,7 +5947,7 @@ class VPCGatewayAttachmentBackend(object):
super().__init__()
def create_vpc_gateway_attachment(self, vpc_id, gateway_id):
attachment = VPCGatewayAttachment(vpc_id, gateway_id)
attachment = VPCGatewayAttachment(vpc_id, gateway_id=gateway_id)
self.gateway_attachments[gateway_id] = attachment
return attachment
@ -6178,7 +6202,7 @@ class SpotFleetRequest(TaggedEC2Resource, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]["SpotFleetRequestConfigData"]
ec2_backend = ec2_backends[region_name]
@ -6449,7 +6473,7 @@ class ElasticAddress(TaggedEC2Resource, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
ec2_backend = ec2_backends[region_name]
@ -6473,6 +6497,10 @@ class ElasticAddress(TaggedEC2Resource, CloudFormationModel):
def physical_resource_id(self):
return self.public_ip
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["AllocationId"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -7143,7 +7171,7 @@ class VpnGateway(CloudFormationModel, TaggedEC2Resource):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
_type = properties["Type"]
@ -7168,13 +7196,6 @@ class VpnGateway(CloudFormationModel, TaggedEC2Resource):
return super().get_filter_value(filter_name, "DescribeVpnGateways")
class VpnGatewayAttachment(object):
def __init__(self, vpc_id, state):
self.vpc_id = vpc_id
self.state = state
super().__init__()
class VpnGatewayBackend(object):
def __init__(self):
self.vpn_gateways = {}
@ -7205,7 +7226,7 @@ class VpnGatewayBackend(object):
def attach_vpn_gateway(self, vpn_gateway_id, vpc_id):
vpn_gateway = self.get_vpn_gateway(vpn_gateway_id)
self.get_vpc(vpc_id)
attachment = VpnGatewayAttachment(vpc_id, state="attached")
attachment = VPCGatewayAttachment(vpc_id, state="attached")
for key in vpn_gateway.attachments.copy():
if key.startswith("vpc-"):
vpn_gateway.attachments.pop(key)
@ -7358,7 +7379,7 @@ class TransitGateway(TaggedEC2Resource, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
ec2_backend = ec2_backends[region_name]
properties = cloudformation_json["Properties"]
@ -8159,7 +8180,7 @@ class NatGateway(CloudFormationModel, TaggedEC2Resource):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
ec2_backend = ec2_backends[region_name]
nat_gateway = ec2_backend.create_nat_gateway(

View File

@ -152,6 +152,10 @@ class Repository(BaseObject, CloudFormationModel):
ecr_backend = ecr_backends[region_name]
ecr_backend.delete_repository(self.name)
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Arn", "RepositoryUri"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -173,7 +177,7 @@ class Repository(BaseObject, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
ecr_backend = ecr_backends[region_name]
properties = cloudformation_json["Properties"]

View File

@ -91,7 +91,7 @@ class Cluster(BaseObject, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
ecs_backend = ecs_backends[region_name]
return ecs_backend.create_cluster(
@ -116,6 +116,10 @@ class Cluster(BaseObject, CloudFormationModel):
# no-op when nothing changed between old and new resources
return original_resource
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Arn"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -226,7 +230,7 @@ class TaskDefinition(BaseObject, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -420,7 +424,7 @@ class Service(BaseObject, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
if isinstance(properties["Cluster"], Cluster):
@ -472,6 +476,10 @@ class Service(BaseObject, CloudFormationModel):
cluster_name, service_name, task_definition, desired_count
)
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Name"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException

View File

@ -173,7 +173,7 @@ class FileSystem(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-efs-filesystem.html
props = deepcopy(cloudformation_json["Properties"])
@ -285,7 +285,7 @@ class MountTarget(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-efs-mounttarget.html
props = deepcopy(cloudformation_json["Properties"])

View File

@ -141,7 +141,7 @@ class FakeLoadBalancer(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -212,6 +212,16 @@ class FakeLoadBalancer(CloudFormationModel):
def physical_resource_id(self):
return self.name
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in [
"CanonicalHostedZoneName",
"CanonicalHostedZoneNameID",
"DNSName",
"SourceSecurityGroup.GroupName",
"SourceSecurityGroup.OwnerAlias",
]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException

View File

@ -158,7 +158,7 @@ class FakeTargetGroup(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -253,7 +253,7 @@ class FakeListener(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -317,7 +317,7 @@ class FakeListenerRule(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
elbv2_backend = elbv2_backends[region_name]
@ -496,7 +496,7 @@ class FakeLoadBalancer(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -511,6 +511,16 @@ class FakeLoadBalancer(CloudFormationModel):
)
return load_balancer
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in [
"DNSName",
"LoadBalancerName",
"CanonicalHostedZoneID",
"LoadBalancerFullName",
"SecurityGroups",
]
def get_cfn_attribute(self, attribute_name):
"""
Implemented attributes:

View File

@ -217,6 +217,10 @@ class Rule(CloudFormationModel):
group_id=group_id,
)
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Arn"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -236,7 +240,7 @@ class Rule(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
properties.setdefault("EventBusName", "default")
@ -332,6 +336,10 @@ class EventBus(CloudFormationModel):
event_backend = events_backends[region_name]
event_backend.delete_event_bus(name=self.name)
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Arn", "Name", "Policy"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -355,7 +363,7 @@ class EventBus(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
event_backend = events_backends[region_name]
@ -530,6 +538,10 @@ class Archive(CloudFormationModel):
event_backend = events_backends[region_name]
event_backend.archives.pop(self.name)
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Arn", "ArchiveName"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -551,7 +563,7 @@ class Archive(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
event_backend = events_backends[region_name]

View File

@ -331,7 +331,7 @@ class ManagedPolicy(Policy, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_physical_name, cloudformation_json, region_name
cls, resource_physical_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json.get("Properties", {})
policy_document = json.dumps(properties.get("PolicyDocument"))
@ -440,7 +440,7 @@ class InlinePolicy(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_physical_name, cloudformation_json, region_name
cls, resource_physical_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json.get("Properties", {})
policy_document = properties.get("PolicyDocument")
@ -582,7 +582,7 @@ class Role(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_physical_name, cloudformation_json, region_name
cls, resource_physical_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
role_name = (
@ -704,6 +704,10 @@ class Role(CloudFormationModel):
def physical_resource_id(self):
return self.name
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Arn"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -745,7 +749,7 @@ class InstanceProfile(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_physical_name, cloudformation_json, region_name
cls, resource_physical_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -775,6 +779,10 @@ class InstanceProfile(CloudFormationModel):
def physical_resource_id(self):
return self.name
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Arn"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -869,6 +877,10 @@ class AccessKey(CloudFormationModel):
def last_used_iso_8601(self):
return iso_8601_datetime_without_milliseconds(self.last_used)
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["SecretAccessKey"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -886,7 +898,7 @@ class AccessKey(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_physical_name, cloudformation_json, region_name
cls, resource_physical_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json.get("Properties", {})
user_name = properties.get("UserName")
@ -966,6 +978,10 @@ class Group(BaseModel):
def created_iso_8601(self):
return iso_8601_datetime_with_milliseconds(self.create_date)
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Arn"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -1126,6 +1142,10 @@ class User(CloudFormationModel):
key = self.get_ssh_public_key(ssh_public_key_id)
self.ssh_public_keys.remove(key)
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Arn"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -1215,7 +1235,7 @@ class User(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_physical_name, cloudformation_json, region_name
cls, resource_physical_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json.get("Properties", {})
path = properties.get("Path")

View File

@ -243,7 +243,7 @@ class Stream(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json.get("Properties", {})
shard_count = properties.get("ShardCount", 1)
@ -311,6 +311,10 @@ class Stream(CloudFormationModel):
]
)
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Arn"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException

View File

@ -131,7 +131,7 @@ class Key(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
self, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
kms_backend = kms_backends[region_name]
properties = cloudformation_json["Properties"]
@ -149,6 +149,10 @@ class Key(CloudFormationModel):
return key
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Arn"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException

View File

@ -276,7 +276,7 @@ class LogStream(BaseModel):
return events
class LogGroup(BaseModel):
class LogGroup(CloudFormationModel):
def __init__(self, region, name, tags, **kwargs):
self.name = name
self.region = region
@ -294,6 +294,25 @@ class LogGroup(BaseModel):
# https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html
self.kms_key_id = kwargs.get("kmsKeyId")
@staticmethod
def cloudformation_name_type():
return "LogGroupName"
@staticmethod
def cloudformation_type():
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-logs-loggroup.html
return "AWS::Logs::LogGroup"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
tags = properties.get("Tags", {})
return logs_backends[region_name].create_log_group(
resource_name, tags, **properties
)
def create_log_stream(self, log_stream_name):
if log_stream_name in self.streams:
raise ResourceAlreadyExistsException()
@ -573,7 +592,7 @@ class LogResourcePolicy(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
policy_name = properties["PolicyName"]

View File

View File

@ -0,0 +1,59 @@
# Sourced from https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cfn-lambda-function-code-cfnresponsemodule.html
# 01/Nov/2021
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
from __future__ import print_function
import urllib3
import json
SUCCESS = "SUCCESS"
FAILED = "FAILED"
http = urllib3.PoolManager()
def send(
event,
context,
responseStatus,
responseData,
physicalResourceId=None,
noEcho=False,
reason=None,
):
responseUrl = event["ResponseURL"]
print(responseUrl)
responseBody = {
"Status": responseStatus,
"Reason": reason
or "See the details in CloudWatch Log Stream: {}".format(
context.log_stream_name
),
"PhysicalResourceId": physicalResourceId or context.log_stream_name,
"StackId": event["StackId"],
"RequestId": event["RequestId"],
"LogicalResourceId": event["LogicalResourceId"],
"NoEcho": noEcho,
"Data": responseData,
}
json_responseBody = json.dumps(responseBody)
print("Response body:")
print(json_responseBody)
headers = {"content-type": "", "content-length": str(len(json_responseBody))}
try:
response = http.request(
"PUT", responseUrl, headers=headers, body=json_responseBody
)
print("Status code:", response.status)
except Exception as e:
print("send(..) failed executing http.request(..):", e)

View File

@ -8,6 +8,10 @@ from moto.rds2.models import rds2_backends
class Database(CloudFormationModel):
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Endpoint.Address", "Endpoint.Port"]
def get_cfn_attribute(self, attribute_name):
if attribute_name == "Endpoint.Address":
return self.address
@ -26,7 +30,7 @@ class Database(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -220,7 +224,7 @@ class SecurityGroup(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
group_name = resource_name.lower()
@ -294,7 +298,7 @@ class SubnetGroup(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
subnet_name = resource_name.lower()

View File

@ -450,6 +450,10 @@ class Database(CloudFormationModel):
if value is not None:
setattr(self, key, value)
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Endpoint.Address", "Endpoint.Port"]
def get_cfn_attribute(self, attribute_name):
# Local import to avoid circular dependency with cloudformation.parsing
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -511,7 +515,7 @@ class Database(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -801,7 +805,7 @@ class SecurityGroup(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
group_name = resource_name.lower()
@ -909,7 +913,7 @@ class SubnetGroup(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -1774,7 +1778,7 @@ class DBParameterGroup(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]

View File

@ -172,7 +172,7 @@ class Cluster(TaggableResourceMixin, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
redshift_backend = redshift_backends[region_name]
properties = cloudformation_json["Properties"]
@ -212,6 +212,10 @@ class Cluster(TaggableResourceMixin, CloudFormationModel):
)
return cluster
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["Endpoint.Address", "Endpoint.Port"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -369,7 +373,7 @@ class SubnetGroup(TaggableResourceMixin, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
redshift_backend = redshift_backends[region_name]
properties = cloudformation_json["Properties"]
@ -466,7 +470,7 @@ class ParameterGroup(TaggableResourceMixin, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
redshift_backend = redshift_backends[region_name]
properties = cloudformation_json["Properties"]

View File

@ -61,7 +61,7 @@ class HealthCheck(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]["HealthCheckConfig"]
health_check_args = {
@ -151,7 +151,7 @@ class RecordSet(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
@ -337,7 +337,7 @@ class FakeZone(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
hosted_zone = route53_backend.create_hosted_zone(
resource_name, private_zone=False
@ -365,7 +365,7 @@ class RecordSetGroup(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]

View File

@ -1114,6 +1114,16 @@ class FakeBucket(CloudFormationModel):
self.accelerate_configuration = accelerate_config
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in [
"Arn",
"DomainName",
"DualStackDomainName",
"RegionalDomainName",
"WebsiteURL",
]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -1169,7 +1179,7 @@ class FakeBucket(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
bucket = s3_backend.create_bucket(resource_name, region_name)

View File

@ -196,6 +196,10 @@ class FakeEndpoint(BaseObject, CloudFormationModel):
def physical_resource_id(self):
return self.endpoint_arn
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["EndpointName"]
def get_cfn_attribute(self, attribute_name):
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sagemaker-endpoint.html#aws-resource-sagemaker-endpoint-return-values
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -215,7 +219,7 @@ class FakeEndpoint(BaseObject, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
sagemaker_backend = sagemaker_backends[region_name]
@ -382,6 +386,10 @@ class FakeEndpointConfig(BaseObject, CloudFormationModel):
def physical_resource_id(self):
return self.endpoint_config_arn
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["EndpointConfigName"]
def get_cfn_attribute(self, attribute_name):
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sagemaker-endpointconfig.html#aws-resource-sagemaker-endpointconfig-return-values
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -401,7 +409,7 @@ class FakeEndpointConfig(BaseObject, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
sagemaker_backend = sagemaker_backends[region_name]
@ -490,6 +498,10 @@ class Model(BaseObject, CloudFormationModel):
def physical_resource_id(self):
return self.model_arn
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["ModelName"]
def get_cfn_attribute(self, attribute_name):
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sagemaker-model.html#aws-resource-sagemaker-model-return-values
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -509,7 +521,7 @@ class Model(BaseObject, CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
sagemaker_backend = sagemaker_backends[region_name]
@ -706,6 +718,10 @@ class FakeSagemakerNotebookInstance(CloudFormationModel):
def physical_resource_id(self):
return self.arn
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["NotebookInstanceName"]
def get_cfn_attribute(self, attribute_name):
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sagemaker-notebookinstance.html#aws-resource-sagemaker-notebookinstance-return-values
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -725,7 +741,7 @@ class FakeSagemakerNotebookInstance(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
# Get required properties from provided CloudFormation template
properties = cloudformation_json["Properties"]
@ -809,6 +825,10 @@ class FakeSageMakerNotebookInstanceLifecycleConfig(BaseObject, CloudFormationMod
def physical_resource_id(self):
return self.notebook_instance_lifecycle_config_arn
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["NotebookInstanceLifecycleConfigName"]
def get_cfn_attribute(self, attribute_name):
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sagemaker-notebookinstancelifecycleconfig.html#aws-resource-sagemaker-notebookinstancelifecycleconfig-return-values
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -828,7 +848,7 @@ class FakeSageMakerNotebookInstanceLifecycleConfig(BaseObject, CloudFormationMod
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]

View File

@ -4,6 +4,7 @@ import json
import os
import signal
import sys
from functools import partial
from threading import Lock
from flask import Flask
@ -106,8 +107,10 @@ class DomainDispatcherApplication(object):
# See if we can match the Action to a known service
service, region = UNSIGNED_ACTIONS.get(action)
else:
# S3 is the last resort when the target is also unknown
service, region = DEFAULT_SERVICE_REGION
service, region = self.get_service_from_path(environ)
if not service:
# S3 is the last resort when the target is also unknown
service, region = DEFAULT_SERVICE_REGION
if service == "mediastore" and not target:
# All MediaStore API calls have a target header
@ -187,6 +190,16 @@ class DomainDispatcherApplication(object):
environ["wsgi.input"] = io.StringIO(body)
return None
def get_service_from_path(self, environ):
# Moto sometimes needs to send a HTTP request to itself
# In which case it will send a request to 'http://localhost/service_region/whatever'
try:
path_info = environ.get("PATH_INFO", "/")
service, region = path_info[1 : path_info.index("/", 1)].split("_")
return service, region
except (KeyError, ValueError):
return None, None
def __call__(self, environ, start_response):
backend_app = self.get_application(environ)
return backend_app(environ, start_response)
@ -268,8 +281,9 @@ def create_backend_app(service):
return backend_app
def signal_handler(signum, frame):
print("Received signal %d" % signum)
def signal_handler(reset_server_port, signum, frame):
if reset_server_port:
del os.environ["MOTO_SERVER_PORT"]
sys.exit(0)
@ -312,9 +326,14 @@ def main(argv=sys.argv[1:]):
args = parser.parse_args(argv)
reset_server_port = False
if "MOTO_SERVER_PORT" not in os.environ:
reset_server_port = True
os.environ["MOTO_SERVER_PORT"] = f"{args.port}"
try:
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, partial(signal_handler, reset_server_port))
signal.signal(signal.SIGTERM, partial(signal_handler, reset_server_port))
except Exception:
pass # ignore "ValueError: signal only works in main thread"

View File

@ -1,3 +1,4 @@
import json
import os
TEST_SERVER_MODE = os.environ.get("TEST_SERVER_MODE", "0").lower() == "true"
@ -36,3 +37,35 @@ def get_s3_default_key_buffer_size():
def ecs_new_arn_format():
return os.environ.get("MOTO_ECS_NEW_ARN", "false").lower() == "true"
def moto_server_port():
return os.environ.get("MOTO_SERVER_PORT") or "5000"
def moto_server_host():
_port = moto_server_port()
if is_docker():
host = get_docker_host()
else:
host = "http://host.docker.internal"
return f"{host}:{_port}"
def is_docker():
path = "/proc/self/cgroup"
return (
os.path.exists("/.dockerenv")
or os.path.isfile(path)
and any("docker" in line for line in open(path))
)
def get_docker_host():
try:
cmd = "curl -s --unix-socket /run/docker.sock http://docker/containers/$HOSTNAME/json"
container_info = os.popen(cmd).read()
_ip = json.loads(container_info)["NetworkSettings"]["IPAddress"]
return f"http://{_ip}"
except: # noqa
return "http://host.docker.internal"

View File

@ -68,6 +68,10 @@ class Topic(CloudFormationModel):
)
return message_id
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in ["TopicName"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -98,7 +102,7 @@ class Topic(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
sns_backend = sns_backends[region_name]
properties = cloudformation_json["Properties"]

View File

@ -393,7 +393,7 @@ class Queue(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = deepcopy(cloudformation_json["Properties"])
# remove Tags from properties and convert tags list to dict
@ -536,6 +536,10 @@ class Queue(CloudFormationModel):
# Make messages visible again
[m.change_visibility(visibility_timeout=0) for m in messages]
@classmethod
def has_cfn_attr(cls, attribute_name):
return attribute_name in ["Arn", "QueueName"]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException

View File

@ -125,6 +125,16 @@ class StateMachine(CloudFormationModel):
properties["Tags"] = original_tags_to_include + prop_overrides.get("Tags", [])
return properties
@classmethod
def has_cfn_attr(cls, attribute):
return attribute in [
"Name",
"DefinitionString",
"RoleArn",
"StateMachineName",
"Tags",
]
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -151,7 +161,7 @@ class StateMachine(CloudFormationModel):
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
cls, resource_name, cloudformation_json, region_name, **kwargs
):
properties = cloudformation_json["Properties"]
name = properties.get("StateMachineName", resource_name)

View File

@ -126,8 +126,11 @@ def wait_for_log_msg(expected_msg, log_group):
received_messages = []
start = time.time()
while (time.time() - start) < 30:
result = logs_conn.describe_log_streams(logGroupName=log_group)
log_streams = result.get("logStreams")
try:
result = logs_conn.describe_log_streams(logGroupName=log_group)
log_streams = result.get("logStreams")
except ClientError:
log_streams = None # LogGroupName does not yet exist
if not log_streams:
time.sleep(1)
continue
@ -139,7 +142,8 @@ def wait_for_log_msg(expected_msg, log_group):
received_messages.extend(
[event["message"] for event in result.get("events")]
)
if expected_msg in received_messages:
return True, received_messages
for line in received_messages:
if expected_msg in line:
return True, set(received_messages)
time.sleep(1)
return False, received_messages
return False, set(received_messages)

View File

@ -0,0 +1,71 @@
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/walkthrough-custom-resources-lambda-lookup-amiids.html
def get_template(lambda_code):
return {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Sample template using Custom Resource",
"Resources": {
"CustomInfo": {
"Type": "Custom::Info",
"Properties": {
"ServiceToken": {"Fn::GetAtt": ["InfoFunction", "Arn"]},
"Region": {"Ref": "AWS::Region"},
"MyProperty": "stuff",
},
},
"InfoFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": {"Fn::Join": ["\n", lambda_code.splitlines()]},
},
"Handler": "index.lambda_handler",
"Role": {"Fn::GetAtt": ["LambdaExecutionRole", "Arn"]},
"Runtime": "python3.8",
"Timeout": "30",
},
},
"LambdaExecutionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": ["lambda.amazonaws.com"]},
"Action": ["sts:AssumeRole"],
}
],
},
"Path": "/",
"Policies": [
{
"PolicyName": "root",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
],
"Resource": "arn:aws:logs:*:*:*",
}
],
},
}
],
},
},
},
"Outputs": {
"infokey": {
"Description": "A very important value",
"Value": {"Fn::GetAtt": ["CustomInfo", "info_value"]},
}
},
}

View File

@ -0,0 +1,208 @@
import boto3
import json
import requests
import sure # noqa # pylint: disable=unused-import
import time
from moto import mock_lambda, mock_cloudformation, mock_logs, mock_s3, settings
from unittest import SkipTest
from uuid import uuid4
from tests.test_awslambda.utilities import wait_for_log_msg
from .fixtures.custom_lambda import get_template
def get_lambda_code():
pfunc = """
def lambda_handler(event, context):
# Need to print this, one of the tests verifies the correct input
print(event)
response = dict()
response["Status"] = "SUCCESS"
response["StackId"] = event["StackId"]
response["RequestId"] = event["RequestId"]
response["LogicalResourceId"] = event["LogicalResourceId"]
response["PhysicalResourceId"] = "{resource_id}"
response_data = dict()
response_data["info_value"] = "special value"
if event["RequestType"] == "Create":
response["Data"] = response_data
import cfnresponse
cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data)
""".format(
resource_id=f"CustomResource{str(uuid4())[0:6]}"
)
return pfunc
@mock_cloudformation
@mock_lambda
@mock_logs
@mock_s3
def test_create_custom_lambda_resource():
#########
# Integration test using a Custom Resource
# Create a Lambda
# CF will call the Lambda
# The Lambda should call CF, to indicate success (using the cfnresponse-module)
# This HTTP request will include any outputs that are now stored against the stack
# TEST: verify that this output is persisted
##########
if not settings.TEST_SERVER_MODE:
raise SkipTest(
"Needs a standalone MotoServer, as cfnresponse needs to connect to something"
)
# Create cloudformation stack
stack_name = f"stack{str(uuid4())[0:6]}"
template_body = get_template(get_lambda_code())
cf = boto3.client("cloudformation", region_name="us-east-1")
cf.create_stack(
StackName=stack_name,
TemplateBody=json.dumps(template_body),
Capabilities=["CAPABILITY_IAM"],
)
# Verify CloudWatch contains the correct logs
log_group_name = get_log_group_name(cf, stack_name)
success, logs = wait_for_log_msg(
expected_msg="Status code: 200", log_group=log_group_name
)
with sure.ensure(f"Logs should indicate success: \n{logs}"):
success.should.equal(True)
# Verify the correct Output was returned
outputs = get_outputs(cf, stack_name)
outputs.should.have.length_of(1)
outputs[0].should.have.key("OutputKey").equals("infokey")
outputs[0].should.have.key("OutputValue").equals("special value")
@mock_cloudformation
@mock_lambda
@mock_logs
@mock_s3
def test_create_custom_lambda_resource__verify_cfnresponse_failed():
#########
# Integration test using a Custom Resource
# Create a Lambda
# CF will call the Lambda
# The Lambda should call CF --- this will fail, as we cannot make a HTTP request to the in-memory moto decorators
# TEST: verify that the original event was send to the Lambda correctly
# TEST: verify that a failure message appears in the CloudwatchLogs
##########
if settings.TEST_SERVER_MODE:
raise SkipTest("Verify this fails if MotoServer is not running")
# Create cloudformation stack
stack_name = f"stack{str(uuid4())[0:6]}"
template_body = get_template(get_lambda_code())
cf = boto3.client("cloudformation", region_name="us-east-1")
cf.create_stack(
StackName=stack_name,
TemplateBody=json.dumps(template_body),
Capabilities=["CAPABILITY_IAM"],
)
# Verify CloudWatch contains the correct logs
log_group_name = get_log_group_name(cf, stack_name)
execution_failed, logs = wait_for_log_msg(
expected_msg="failed executing http.request", log_group=log_group_name
)
execution_failed.should.equal(True)
printed_events = [l for l in logs if l.startswith("{'RequestType': 'Create'")]
printed_events.should.have.length_of(1)
original_event = json.loads(printed_events[0].replace("'", '"'))
original_event.should.have.key("RequestType").equals("Create")
original_event.should.have.key("ServiceToken") # Should equal Lambda ARN
original_event.should.have.key("ResponseURL")
original_event.should.have.key("StackId")
original_event.should.have.key("RequestId") # type UUID
original_event.should.have.key("LogicalResourceId").equals("CustomInfo")
original_event.should.have.key("ResourceType").equals("Custom::Info")
original_event.should.have.key("ResourceProperties")
original_event["ResourceProperties"].should.have.key(
"ServiceToken"
) # Should equal Lambda ARN
original_event["ResourceProperties"].should.have.key("MyProperty").equals("stuff")
@mock_cloudformation
@mock_lambda
@mock_logs
@mock_s3
def test_create_custom_lambda_resource__verify_manual_request():
#########
# Integration test using a Custom Resource
# Create a Lambda
# CF will call the Lambda
# The Lambda should call CF --- this will fail, as we cannot make a HTTP request to the in-memory moto decorators
# So we'll make this HTTP request manually
# TEST: verify that the stack has a CREATE_IN_PROGRESS status before making the HTTP request
# TEST: verify that the stack has a CREATE_COMPLETE status afterwards
##########
if settings.TEST_SERVER_MODE:
raise SkipTest(
"Verify HTTP request can be made manually if MotoServer is not running"
)
# Create cloudformation stack
stack_name = f"stack{str(uuid4())[0:6]}"
template_body = get_template(get_lambda_code())
region_name = "eu-north-1"
cf = boto3.client("cloudformation", region_name=region_name)
stack = cf.create_stack(
StackName=stack_name,
TemplateBody=json.dumps(template_body),
Capabilities=["CAPABILITY_IAM"],
)
stack_id = stack["StackId"]
stack = cf.describe_stacks(StackName=stack_id)["Stacks"][0]
stack["Outputs"].should.equal([])
stack["StackStatus"].should.equal("CREATE_IN_PROGRESS")
callback_url = f"http://cloudformation.{region_name}.amazonaws.com/cloudformation_{region_name}/cfnresponse?stack={stack_id}"
data = {
"Status": "SUCCESS",
"StackId": stack_id,
"LogicalResourceId": "CustomInfo",
"Data": {"info_value": "resultfromthirdpartysystem"},
}
requests.post(callback_url, json=data)
stack = cf.describe_stacks(StackName=stack_id)["Stacks"][0]
stack["StackStatus"].should.equal("CREATE_COMPLETE")
stack["Outputs"].should.equal(
[{"OutputKey": "infokey", "OutputValue": "resultfromthirdpartysystem"}]
)
def get_log_group_name(cf, stack_name):
resources = cf.describe_stack_resources(StackName=stack_name)["StackResources"]
start = time.time()
while (time.time() - start) < 5:
fns = [
r
for r in resources
if r["ResourceType"] == "AWS::Lambda::Function"
and "PhysicalResourceId" in r
]
if not fns:
time.sleep(1)
resources = cf.describe_stack_resources(StackName=stack_name)[
"StackResources"
]
continue
fn = fns[0]
resource_id = fn["PhysicalResourceId"]
return f"/aws/lambda/{resource_id}"
raise Exception("Could not find log group name in time")
def get_outputs(cf, stack_name):
stack = cf.describe_stacks(StackName=stack_name)["Stacks"][0]
start = time.time()
while (time.time() - start) < 5:
status = stack["StackStatus"]
if status != "CREATE_COMPLETE":
time.sleep(1)
stack = cf.describe_stacks(StackName=stack_name)["Stacks"][0]
continue
outputs = stack["Outputs"]
return outputs

View File

@ -1908,7 +1908,7 @@ def test_lambda_function():
# switch this to python as backend lambda only supports python execution.
lambda_code = """
def lambda_handler(event, context):
return (event, context)
return {"event": event}
"""
template = {
"AWSTemplateFormatVersion": "2010-09-09",
@ -1920,7 +1920,7 @@ def lambda_handler(event, context):
# CloudFormation expects a string as ZipFile, not a ZIP file base64-encoded
"ZipFile": {"Fn::Join": ["\n", lambda_code.splitlines()]}
},
"Handler": "lambda_function.handler",
"Handler": "index.lambda_handler",
"Description": "Test function",
"MemorySize": 128,
"Role": {"Fn::GetAtt": ["MyRole", "Arn"]},
@ -1954,7 +1954,7 @@ def lambda_handler(event, context):
result = conn.list_functions()
result["Functions"].should.have.length_of(1)
result["Functions"][0]["Description"].should.equal("Test function")
result["Functions"][0]["Handler"].should.equal("lambda_function.handler")
result["Functions"][0]["Handler"].should.equal("index.lambda_handler")
result["Functions"][0]["MemorySize"].should.equal(128)
result["Functions"][0]["Runtime"].should.equal("python2.7")
result["Functions"][0]["Environment"].should.equal(
@ -1966,6 +1966,10 @@ def lambda_handler(event, context):
result["Concurrency"]["ReservedConcurrentExecutions"].should.equal(10)
response = conn.invoke(FunctionName=function_name)
result = json.loads(response["Payload"].read())
result.should.equal({"event": "{}"})
def _make_zipfile(func_str):
zip_output = io.BytesIO()

View File

@ -2,7 +2,9 @@ import boto3
import json
import yaml
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
from unittest.mock import patch
from moto.cloudformation.exceptions import ValidationError
@ -11,7 +13,7 @@ from moto.cloudformation.parsing import (
resource_class_from_type,
parse_condition,
)
from moto import mock_ssm, settings
from moto import mock_cloudformation, mock_sqs, mock_ssm, settings
from moto.sqs.models import Queue
from moto.s3.models import FakeBucket
from moto.cloudformation.utils import yaml_tag_constructor
@ -20,7 +22,7 @@ from moto.packages.boto.cloudformation.stack import Output
dummy_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Create a multi-az, load balanced, Auto Scaled sample web site. The Auto Scaling trigger is based on the CPU utilization of the web servers. The AMI is chosen based on the region in which the stack is run. This example creates a web service running across all availability zones in a region. The instances are load balanced with a simple health check. The web site is available on port 80, however, the instances can be configured to listen on any port (8888 by default). **WARNING** This template creates one or more Amazon EC2 instances. You will be billed for the AWS resources used if you create a stack from this template.",
"Description": "sample template",
"Resources": {
"Queue": {
"Type": "AWS::SQS::Queue",
@ -32,7 +34,7 @@ dummy_template = {
name_type_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Create a multi-az, load balanced, Auto Scaled sample web site. The Auto Scaling trigger is based on the CPU utilization of the web servers. The AMI is chosen based on the region in which the stack is run. This example creates a web service running across all availability zones in a region. The instances are load balanced with a simple health check. The web site is available on port 80, however, the instances can be configured to listen on any port (8888 by default). **WARNING** This template creates one or more Amazon EC2 instances. You will be billed for the AWS resources used if you create a stack from this template.",
"Description": "sample template",
"Resources": {
"Queue": {"Type": "AWS::SQS::Queue", "Properties": {"VisibilityTimeout": 60}}
},
@ -41,7 +43,7 @@ name_type_template = {
name_type_template_with_tabs_json = """
\t{
\t\t"AWSTemplateFormatVersion": "2010-09-09",
\t\t"Description": "Create a multi-az, load balanced, Auto Scaled sample web site. The Auto Scaling trigger is based on the CPU utilization of the web servers. The AMI is chosen based on the region in which the stack is run. This example creates a web service running across all availability zones in a region. The instances are load balanced with a simple health check. The web site is available on port 80, however, the instances can be configured to listen on any port (8888 by default). **WARNING** This template creates one or more Amazon EC2 instances. You will be billed for the AWS resources used if you create a stack from this template.",
\t\t"Description": "sample template",
\t\t"Resources": {
\t\t\t"Queue": {"Type": "AWS::SQS::Queue", "Properties": {"VisibilityTimeout": 60}}
\t\t}
@ -312,10 +314,17 @@ def test_parse_stack_with_get_availability_zones():
output.value.should.equal(["us-east-1a", "us-east-1b", "us-east-1c", "us-east-1d"])
def test_parse_stack_with_bad_get_attribute_outputs():
FakeStack.when.called_with(
"test_id", "test_stack", bad_output_template_json, {}, "us-west-1"
).should.throw(ValidationError)
@mock_sqs
@mock_cloudformation
def test_parse_stack_with_bad_get_attribute_outputs_using_boto3():
conn = boto3.client("cloudformation", region_name="us-west-1")
with pytest.raises(ClientError) as exc:
conn.create_stack(StackName="teststack", TemplateBody=bad_output_template_json)
err = exc.value.response["Error"]
err["Code"].should.equal("ValidationError")
err["Message"].should.equal(
"Template error: resource Queue does not support attribute type InvalidAttribute in Fn::GetAtt"
)
def test_parse_stack_with_null_outputs_section():