Merge pull request #235 from joekiller/enhance/cloudformation_getatt_tags_and_resource_naming

Enhance/cloudformation getatt tags and resource naming
This commit is contained in:
Steve Pulec 2014-10-21 22:37:01 -04:00
commit c02ed667d5
16 changed files with 486 additions and 36 deletions

View File

@ -0,0 +1,6 @@
from __future__ import unicode_literals
class UnformattedGetAttTemplateException(Exception):
description = 'Template error: resource {0} does not support attribute type {1} in Fn::GetAtt'
status_code = 400

View File

@ -3,7 +3,7 @@ import json
from moto.core import BaseBackend from moto.core import BaseBackend
from .parsing import ResourceMap from .parsing import ResourceMap, OutputMap
from .utils import generate_stack_id from .utils import generate_stack_id
@ -19,10 +19,17 @@ class FakeStack(object):
self.resource_map = ResourceMap(stack_id, name, template_dict) self.resource_map = ResourceMap(stack_id, name, template_dict)
self.resource_map.create() self.resource_map.create()
self.output_map = OutputMap(self.resource_map, template_dict)
self.output_map.create()
@property @property
def stack_resources(self): def stack_resources(self):
return self.resource_map.values() return self.resource_map.values()
@property
def stack_outputs(self):
return self.output_map.values()
class CloudFormationBackend(BaseBackend): class CloudFormationBackend(BaseBackend):

View File

@ -7,6 +7,10 @@ from moto.ec2 import models as ec2_models
from moto.elb import models as elb_models from moto.elb import models as elb_models
from moto.iam import models as iam_models from moto.iam import models as iam_models
from moto.sqs import models as sqs_models from moto.sqs import models as sqs_models
from .utils import random_suffix
from .exceptions import UnformattedGetAttTemplateException
from boto.cloudformation.stack import Output
from boto.exception import BotoServerError
MODEL_MAP = { MODEL_MAP = {
"AWS::AutoScaling::AutoScalingGroup": autoscaling_models.FakeAutoScalingGroup, "AWS::AutoScaling::AutoScalingGroup": autoscaling_models.FakeAutoScalingGroup,
@ -29,6 +33,20 @@ MODEL_MAP = {
"AWS::SQS::Queue": sqs_models.Queue, "AWS::SQS::Queue": sqs_models.Queue,
} }
# http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-name.html
NAME_TYPE_MAP = {
"AWS::CloudWatch::Alarm": "Alarm",
"AWS::DynamoDB::Table": "TableName",
"AWS::ElastiCache::CacheCluster": "ClusterName",
"AWS::ElasticBeanstalk::Application": "ApplicationName",
"AWS::ElasticBeanstalk::Environment": "EnvironmentName",
"AWS::ElasticLoadBalancing::LoadBalancer": "LoadBalancerName",
"AWS::RDS::DBInstance": "DBInstanceIdentifier",
"AWS::S3::Bucket": "BucketName",
"AWS::SNS::Topic": "TopicName",
"AWS::SQS::Queue": "QueueName"
}
# Just ignore these models types for now # Just ignore these models types for now
NULL_MODELS = [ NULL_MODELS = [
"AWS::CloudFormation::WaitCondition", "AWS::CloudFormation::WaitCondition",
@ -54,6 +72,28 @@ def clean_json(resource_json, resources_map):
else: else:
return resource return resource
if 'Fn::GetAtt' in resource_json:
resource = resources_map[resource_json['Fn::GetAtt'][0]]
if resource is None:
return resource_json
try:
return resource.get_cfn_attribute(resource_json['Fn::GetAtt'][1])
except NotImplementedError as n:
logger.warning(n.message.format(resource_json['Fn::GetAtt'][0]))
except UnformattedGetAttTemplateException:
raise BotoServerError(
UnformattedGetAttTemplateException.status_code,
'Bad Request',
UnformattedGetAttTemplateException.description.format(
resource_json['Fn::GetAtt'][0], resource_json['Fn::GetAtt'][1]))
if 'Fn::Join' in resource_json:
join_list = []
for val in resource_json['Fn::Join'][1]:
cleaned_val = clean_json(val, resources_map)
join_list.append(cleaned_val if cleaned_val else '{0}'.format(val))
return resource_json['Fn::Join'][0].join(join_list)
cleaned_json = {} cleaned_json = {}
for key, value in resource_json.items(): for key, value in resource_json.items():
cleaned_json[key] = clean_json(value, resources_map) cleaned_json[key] = clean_json(value, resources_map)
@ -73,19 +113,49 @@ def resource_class_from_type(resource_type):
return MODEL_MAP.get(resource_type) return MODEL_MAP.get(resource_type)
def parse_resource(resource_name, resource_json, resources_map): def resource_name_property_from_type(resource_type):
if resource_type not in NAME_TYPE_MAP:
return None
return NAME_TYPE_MAP.get(resource_type)
def parse_resource(logical_id, resource_json, resources_map):
resource_type = resource_json['Type'] resource_type = resource_json['Type']
resource_class = resource_class_from_type(resource_type) resource_class = resource_class_from_type(resource_type)
if not resource_class: if not resource_class:
return None return None
resource_json = clean_json(resource_json, resources_map) resource_json = clean_json(resource_json, resources_map)
resource_name_property = resource_name_property_from_type(resource_type)
if resource_name_property:
if not 'Properties' in resource_json:
resource_json['Properties'] = dict()
if not resource_name_property in resource_json['Properties']:
resource_json['Properties'][resource_name_property] = '{0}-{1}-{2}'.format(
resources_map.get('AWS::StackName'),
logical_id,
random_suffix())
resource_name = resource_json['Properties'][resource_name_property]
else:
resource_name = '{0}-{1}-{2}'.format(resources_map.get('AWS::StackName'),
logical_id,
random_suffix())
resource = resource_class.create_from_cloudformation_json(resource_name, resource_json) resource = resource_class.create_from_cloudformation_json(resource_name, resource_json)
resource.type = resource_type resource.type = resource_type
resource.logical_resource_id = resource_name resource.logical_resource_id = logical_id
return resource return resource
def parse_output(output_logical_id, output_json, resources_map):
output_json = clean_json(output_json, resources_map)
output = Output()
output.key = output_logical_id
output.value = output_json['Value']
output.description = output_json.get('Description')
return output
class ResourceMap(collections.Mapping): class ResourceMap(collections.Mapping):
""" """
This is a lazy loading map for resources. This allows us to create resources This is a lazy loading map for resources. This allows us to create resources
@ -106,24 +176,24 @@ class ResourceMap(collections.Mapping):
} }
def __getitem__(self, key): def __getitem__(self, key):
resource_name = key resource_logical_id = key
if resource_name in self._parsed_resources: if resource_logical_id in self._parsed_resources:
return self._parsed_resources[resource_name] return self._parsed_resources[resource_logical_id]
else: else:
resource_json = self._resource_json_map.get(resource_name) resource_json = self._resource_json_map.get(resource_logical_id)
new_resource = parse_resource(resource_name, resource_json, self) new_resource = parse_resource(resource_logical_id, resource_json, self)
self._parsed_resources[resource_name] = new_resource self._parsed_resources[resource_logical_id] = new_resource
return new_resource return new_resource
def __iter__(self): def __iter__(self):
return iter(self.resource_names) return iter(self.resources)
def __len__(self): def __len__(self):
return len(self._resource_json_map) return len(self._resource_json_map)
@property @property
def resource_names(self): def resources(self):
return self._resource_json_map.keys() return self._resource_json_map.keys()
def load_parameters(self): def load_parameters(self):
@ -137,5 +207,45 @@ class ResourceMap(collections.Mapping):
# Since this is a lazy map, to create every object we just need to # Since this is a lazy map, to create every object we just need to
# iterate through self. # iterate through self.
for resource_name in self.resource_names: tags = {'aws:cloudformation:stack-name': self.get('AWS::StackName'),
self[resource_name] 'aws:cloudformation:stack-id': self.get('AWS::StackId')}
for resource in self.resources:
self[resource]
if isinstance(self[resource], ec2_models.TaggedEC2Resource):
tags['aws:cloudformation:logical-id'] = resource
ec2_models.ec2_backend.create_tags([self[resource].physical_resource_id],tags)
class OutputMap(collections.Mapping):
def __init__(self, resources, template):
self._template = template
self._output_json_map = template.get('Outputs')
# Create the default resources
self._resource_map = resources
self._parsed_outputs = dict()
def __getitem__(self, key):
output_logical_id = key
if output_logical_id in self._parsed_outputs:
return self._parsed_outputs[output_logical_id]
else:
output_json = self._output_json_map.get(output_logical_id)
new_output = parse_output(output_logical_id, output_json, self._resource_map)
self._parsed_outputs[output_logical_id] = new_output
return new_output
def __iter__(self):
return iter(self.outputs)
def __len__(self):
return len(self._output_json_map)
@property
def outputs(self):
return self._output_json_map.keys() if self._output_json_map else []
def create(self):
for output in self.outputs:
self[output]

View File

@ -88,7 +88,14 @@ DESCRIBE_STACKS_TEMPLATE = """<DescribeStacksResult>
<CreationTime>2010-07-27T22:28:28Z</CreationTime> <CreationTime>2010-07-27T22:28:28Z</CreationTime>
<StackStatus>CREATE_COMPLETE</StackStatus> <StackStatus>CREATE_COMPLETE</StackStatus>
<DisableRollback>false</DisableRollback> <DisableRollback>false</DisableRollback>
<Outputs></Outputs> <Outputs>
{% for output in stack.stack_outputs %}
<member>
<OutputKey>{{ output.key }}</OutputKey>
<OutputValue>{{ output.value }}</OutputValue>
</member>
{% endfor %}
</Outputs>
</member> </member>
{% endfor %} {% endfor %}
</Stacks> </Stacks>

View File

@ -1,7 +1,15 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import uuid import uuid
import six
import random
def generate_stack_id(stack_name): def generate_stack_id(stack_name):
random_id = uuid.uuid4() random_id = uuid.uuid4()
return "arn:aws:cloudformation:us-east-1:123456789:stack/{0}/{1}".format(stack_name, random_id) return "arn:aws:cloudformation:us-east-1:123456789:stack/{0}/{1}".format(stack_name, random_id)
def random_suffix():
size = 12
chars = list(range(10)) + ['A-Z']
return ''.join(six.text_type(random.choice(chars)) for x in range(size))

View File

@ -192,6 +192,14 @@ class NetworkInterface(object):
else: else:
return self._group_set return self._group_set
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'PrimaryPrivateIpAddress':
return self.private_ip_address
elif attribute_name == 'SecondaryPrivateIpAddresses':
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "SecondaryPrivateIpAddresses" ]"')
raise UnformattedGetAttTemplateException()
class NetworkInterfaceBackend(object): class NetworkInterfaceBackend(object):
def __init__(self): def __init__(self):
@ -435,6 +443,20 @@ class Instance(BotoInstance, TaggedEC2Resource):
eni.attachment_id = None eni.attachment_id = None
eni.device_index = None eni.device_index = None
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'AvailabilityZone':
return self.placement
elif attribute_name == 'PrivateDnsName':
return self.private_dns_name
elif attribute_name == 'PublicDnsName':
return self.public_dns_name
elif attribute_name == 'PrivateIp':
return self.private_ip_address
elif attribute_name == 'PublicIp':
return self.ip_address
raise UnformattedGetAttTemplateException()
class InstanceBackend(object): class InstanceBackend(object):
@ -994,6 +1016,12 @@ class SecurityGroup(object):
return False return False
return True return True
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'GroupId':
return self.id
raise UnformattedGetAttTemplateException()
class SecurityGroupBackend(object): class SecurityGroupBackend(object):
@ -1517,6 +1545,12 @@ class Subnet(TaggedEC2Resource):
return filter_value return filter_value
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'AvailabilityZone':
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "AvailabilityZone" ]"')
raise UnformattedGetAttTemplateException()
class SubnetBackend(object): class SubnetBackend(object):
def __init__(self): def __init__(self):
@ -1981,13 +2015,16 @@ class ElasticAddress(object):
@classmethod @classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json): def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
properties = cloudformation_json['Properties'] properties = cloudformation_json.get('Properties')
instance_id = None
if properties:
domain=properties.get('Domain')
eip = ec2_backend.allocate_address(
domain=domain if domain else 'standard')
instance_id = properties.get('InstanceId')
else:
eip = ec2_backend.allocate_address(domain='standard')
eip = ec2_backend.allocate_address(
domain=properties['Domain']
)
instance_id = properties.get('InstanceId')
if instance_id: if instance_id:
instance = ec2_backend.get_instance_by_id(instance_id) instance = ec2_backend.get_instance_by_id(instance_id)
ec2_backend.associate_address(instance, address=eip.public_ip) ec2_backend.associate_address(instance, address=eip.public_ip)
@ -1996,7 +2033,13 @@ class ElasticAddress(object):
@property @property
def physical_resource_id(self): def physical_resource_id(self):
return self.allocation_id return self.allocation_id if self.allocation_id else self.public_ip
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'AllocationId':
return self.allocation_id
raise UnformattedGetAttTemplateException()
class ElasticAddressBackend(object): class ElasticAddressBackend(object):

View File

@ -56,6 +56,20 @@ class FakeLoadBalancer(object):
def physical_resource_id(self): def physical_resource_id(self):
return self.name return self.name
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'CanonicalHostedZoneName':
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "CanonicalHostedZoneName" ]"')
elif attribute_name == 'CanonicalHostedZoneNameID':
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "CanonicalHostedZoneNameID" ]"')
elif attribute_name == 'DNSName':
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "DNSName" ]"')
elif attribute_name == 'SourceSecurityGroup.GroupName':
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "SourceSecurityGroup.GroupName" ]"')
elif attribute_name == 'SourceSecurityGroup.OwnerAlias':
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "SourceSecurityGroup.OwnerAlias" ]"')
raise UnformattedGetAttTemplateException()
class ELBBackend(BaseBackend): class ELBBackend(BaseBackend):

View File

@ -30,6 +30,12 @@ class Role(object):
def physical_resource_id(self): def physical_resource_id(self):
return self.id return self.id
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'Arn':
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "Arn" ]"')
raise UnformattedGetAttTemplateException()
class InstanceProfile(object): class InstanceProfile(object):
def __init__(self, instance_profile_id, name, path, roles): def __init__(self, instance_profile_id, name, path, roles):
@ -53,6 +59,12 @@ class InstanceProfile(object):
def physical_resource_id(self): def physical_resource_id(self):
return self.name return self.name
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'Arn':
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "Arn" ]"')
raise UnformattedGetAttTemplateException()
class Certificate(object): class Certificate(object):
def __init__(self, cert_name, cert_body, private_key, cert_chain=None, path=None): def __init__(self, cert_name, cert_body, private_key, cert_chain=None, path=None):
@ -78,6 +90,12 @@ class AccessKey(object):
"%Y-%m-%d-%H-%M-%S" "%Y-%m-%d-%H-%M-%S"
) )
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'SecretAccessKey':
return self.secret_access_key
raise UnformattedGetAttTemplateException()
class Group(object): class Group(object):
def __init__(self, name, path='/'): def __init__(self, name, path='/'):
@ -91,6 +109,12 @@ class Group(object):
self.users = [] self.users = []
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'Arn':
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "Arn" ]"')
raise UnformattedGetAttTemplateException()
class User(object): class User(object):
def __init__(self, name, path='/'): def __init__(self, name, path='/'):
@ -143,6 +167,12 @@ class User(object):
else: else:
raise BotoServerError(404, 'Not Found') raise BotoServerError(404, 'Not Found')
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'Arn':
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "Arn" ]"')
raise UnformattedGetAttTemplateException()
class IAMBackend(BaseBackend): class IAMBackend(BaseBackend):

View File

@ -170,6 +170,14 @@ class FakeBucket(object):
def is_versioned(self): def is_versioned(self):
return self.versioning_status == 'Enabled' return self.versioning_status == 'Enabled'
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'DomainName':
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "DomainName" ]"')
elif attribute_name == 'WebsiteURL':
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "WebsiteURL" ]"')
raise UnformattedGetAttTemplateException()
class S3Backend(BaseBackend): class S3Backend(BaseBackend):

View File

@ -33,6 +33,12 @@ class Topic(object):
subscription.publish(message, message_id) subscription.publish(message, message_id)
return message_id return message_id
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'TopicName':
return self.name
raise UnformattedGetAttTemplateException()
class Subscription(object): class Subscription(object):
def __init__(self, topic, endpoint, protocol): def __init__(self, topic, endpoint, protocol):

View File

@ -4,6 +4,7 @@ import hashlib
import time import time
import re import re
from moto.core import BaseBackend from moto.core import BaseBackend
from moto.core.utils import camelcase_to_underscores, get_random_message_id from moto.core.utils import camelcase_to_underscores, get_random_message_id
from .utils import generate_receipt_handle, unix_time_millis from .utils import generate_receipt_handle, unix_time_millis
@ -152,6 +153,14 @@ class Queue(object):
def add_message(self, message): def add_message(self, message):
self._messages.append(message) self._messages.append(message)
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'Arn':
return self.queue_arn
elif attribute_name == 'QueueName':
return self.name
raise UnformattedGetAttTemplateException()
class SQSBackend(BaseBackend): class SQSBackend(BaseBackend):
def __init__(self): def __init__(self):

View File

@ -0,0 +1,9 @@
from __future__ import unicode_literals
template = {
"Resources": {
"EC2EIP": {
"Type": "AWS::EC2::EIP"
}
}
}

View File

@ -0,0 +1,23 @@
from __future__ import unicode_literals
template = {
"Resources": {
"EC2EIP": {
"Type": "AWS::EC2::EIP"
}
},
"Outputs": {
"EIP": {
"Description": "EIP for joining",
"Value": {
"Fn::Join": [
":",
[
"test eip",
{"Ref": "EC2EIP"}
]
]
}
}
}
}

View File

@ -0,0 +1,12 @@
from __future__ import unicode_literals
template = {
"Resources": {
"VPCEIP": {
"Type": "AWS::EC2::EIP",
"Properties": {
"Domain": "vpc"
}
}
}
}

View File

@ -12,7 +12,13 @@ from moto import (
mock_iam, mock_iam,
) )
from .fixtures import single_instance_with_ebs_volume, vpc_single_instance_in_subnet from .fixtures import (
single_instance_with_ebs_volume,
vpc_single_instance_in_subnet,
ec2_classic_eip,
vpc_eip,
fn_join
)
@mock_cloudformation() @mock_cloudformation()
@ -75,7 +81,7 @@ def test_stack_ec2_integration():
stack = conn.describe_stacks()[0] stack = conn.describe_stacks()[0]
instance = stack.describe_resources()[0] instance = stack.describe_resources()[0]
instance.resource_type.should.equal('AWS::EC2::Instance') instance.resource_type.should.equal('AWS::EC2::Instance')
instance.logical_resource_id.should.equal("WebServerGroup") instance.logical_resource_id.should.contain("WebServerGroup")
instance.physical_resource_id.should.equal(ec2_instance.id) instance.physical_resource_id.should.equal(ec2_instance.id)
@ -186,7 +192,7 @@ def test_stack_security_groups():
ec2_conn = boto.connect_ec2() ec2_conn = boto.connect_ec2()
security_groups = ec2_conn.get_all_security_groups() security_groups = ec2_conn.get_all_security_groups()
for group in security_groups: for group in security_groups:
if group.name == "InstanceSecurityGroup": if "InstanceSecurityGroup" in group.name:
instance_group = group instance_group = group
else: else:
other_group = group other_group = group
@ -245,6 +251,7 @@ def test_autoscaling_group_with_elb():
"InstancePort": "80", "InstancePort": "80",
"Protocol": "HTTP" "Protocol": "HTTP"
}], }],
"LoadBalancerName": "my-elb",
"HealthCheck": { "HealthCheck": {
"Target": "80", "Target": "80",
"HealthyThreshold": "3", "HealthyThreshold": "3",
@ -267,7 +274,7 @@ def test_autoscaling_group_with_elb():
autoscale_conn = boto.connect_autoscale() autoscale_conn = boto.connect_autoscale()
autoscale_group = autoscale_conn.get_all_groups()[0] autoscale_group = autoscale_conn.get_all_groups()[0]
autoscale_group.launch_config_name.should.equal("my-launch-config") autoscale_group.launch_config_name.should.contain("my-launch-config")
autoscale_group.load_balancers[0].should.equal('my-elb') autoscale_group.load_balancers[0].should.equal('my-elb')
# Confirm the Launch config was actually created # Confirm the Launch config was actually created
@ -280,13 +287,13 @@ def test_autoscaling_group_with_elb():
stack = conn.describe_stacks()[0] stack = conn.describe_stacks()[0]
resources = stack.describe_resources() resources = stack.describe_resources()
as_group_resource = [resource for resource in resources if resource.resource_type == 'AWS::AutoScaling::AutoScalingGroup'][0] as_group_resource = [resource for resource in resources if resource.resource_type == 'AWS::AutoScaling::AutoScalingGroup'][0]
as_group_resource.physical_resource_id.should.equal("my-as-group") as_group_resource.physical_resource_id.should.contain("my-as-group")
launch_config_resource = [resource for resource in resources if resource.resource_type == 'AWS::AutoScaling::LaunchConfiguration'][0] launch_config_resource = [resource for resource in resources if resource.resource_type == 'AWS::AutoScaling::LaunchConfiguration'][0]
launch_config_resource.physical_resource_id.should.equal("my-launch-config") launch_config_resource.physical_resource_id.should.contain("my-launch-config")
elb_resource = [resource for resource in resources if resource.resource_type == 'AWS::ElasticLoadBalancing::LoadBalancer'][0] elb_resource = [resource for resource in resources if resource.resource_type == 'AWS::ElasticLoadBalancing::LoadBalancer'][0]
elb_resource.physical_resource_id.should.equal("my-elb") elb_resource.physical_resource_id.should.contain("my-elb")
@mock_ec2() @mock_ec2()
@ -427,18 +434,21 @@ def test_iam_roles():
iam_conn = boto.connect_iam() iam_conn = boto.connect_iam()
role = iam_conn.get_role("my-role") role_result = iam_conn.list_roles()['list_roles_response']['list_roles_result']['roles'][0]
role.role_name.should.equal("my-role") role = iam_conn.get_role(role_result.role_name)
role.role_name.should.contain("my-role")
role.path.should.equal("my-path") role.path.should.equal("my-path")
instance_profile = iam_conn.get_instance_profile("my-instance-profile") instance_profile_response = iam_conn.list_instance_profiles()['list_instance_profiles_response']
instance_profile.instance_profile_name.should.equal("my-instance-profile") cfn_instance_profile = instance_profile_response['list_instance_profiles_result']['instance_profiles'][0]
instance_profile = iam_conn.get_instance_profile(cfn_instance_profile.instance_profile_name)
instance_profile.instance_profile_name.should.contain("my-instance-profile")
instance_profile.path.should.equal("my-path") instance_profile.path.should.equal("my-path")
instance_profile.role_id.should.equal(role.role_id) instance_profile.role_id.should.equal(role.role_id)
autoscale_conn = boto.connect_autoscale() autoscale_conn = boto.connect_autoscale()
launch_config = autoscale_conn.get_all_launch_configurations()[0] launch_config = autoscale_conn.get_all_launch_configurations()[0]
launch_config.instance_profile_name.should.equal("my-instance-profile") launch_config.instance_profile_name.should.contain("my-instance-profile")
stack = conn.describe_stacks()[0] stack = conn.describe_stacks()[0]
resources = stack.describe_resources() resources = stack.describe_resources()
@ -472,3 +482,59 @@ def test_single_instance_with_ebs_volume():
resources = stack.describe_resources() resources = stack.describe_resources()
ebs_volume = [resource for resource in resources if resource.resource_type == 'AWS::EC2::Volume'][0] ebs_volume = [resource for resource in resources if resource.resource_type == 'AWS::EC2::Volume'][0]
ebs_volume.physical_resource_id.should.equal(volume.id) ebs_volume.physical_resource_id.should.equal(volume.id)
@mock_ec2()
@mock_cloudformation()
def test_classic_eip():
template_json = json.dumps(ec2_classic_eip.template)
conn = boto.connect_cloudformation()
conn.create_stack(
"test_stack",
template_body=template_json,
)
ec2_conn = boto.connect_ec2()
eip = ec2_conn.get_all_addresses()[0]
stack = conn.describe_stacks()[0]
resources = stack.describe_resources()
cfn_eip = [resource for resource in resources if resource.resource_type == 'AWS::EC2::EIP'][0]
cfn_eip.physical_resource_id.should.equal(eip.public_ip)
@mock_ec2()
@mock_cloudformation()
def test_vpc_eip():
template_json = json.dumps(vpc_eip.template)
conn = boto.connect_cloudformation()
conn.create_stack(
"test_stack",
template_body=template_json,
)
ec2_conn = boto.connect_ec2()
eip = ec2_conn.get_all_addresses()[0]
stack = conn.describe_stacks()[0]
resources = stack.describe_resources()
cfn_eip = [resource for resource in resources if resource.resource_type == 'AWS::EC2::EIP'][0]
cfn_eip.physical_resource_id.should.equal(eip.allocation_id)
@mock_ec2()
@mock_cloudformation()
def test_fn_join():
template_json = json.dumps(fn_join.template)
conn = boto.connect_cloudformation()
conn.create_stack(
"test_stack",
template_body=template_json,
)
ec2_conn = boto.connect_ec2()
eip = ec2_conn.get_all_addresses()[0]
stack = conn.describe_stacks()[0]
fn_join_output = stack.outputs[0]
fn_join_output.value.should.equal('test eip:{0}'.format(eip.public_ip))

View File

@ -7,6 +7,8 @@ import sure # noqa
from moto.cloudformation.models import FakeStack from moto.cloudformation.models import FakeStack
from moto.cloudformation.parsing import resource_class_from_type from moto.cloudformation.parsing import resource_class_from_type
from moto.sqs.models import Queue from moto.sqs.models import Queue
from boto.cloudformation.stack import Output
from boto.exception import BotoServerError
dummy_template = { dummy_template = {
"AWSTemplateFormatVersion": "2010-09-09", "AWSTemplateFormatVersion": "2010-09-09",
@ -14,8 +16,7 @@ dummy_template = {
"Description": "Create a multi-az, load balanced, Auto Scaled sample web site. The Auto Scaling trigger is based on the CPU utilization of the web servers. The AMI is chosen based on the region in which the stack is run. This example creates a web service running across all availability zones in a region. The instances are load balanced with a simple health check. The web site is available on port 80, however, the instances can be configured to listen on any port (8888 by default). **WARNING** This template creates one or more Amazon EC2 instances. You will be billed for the AWS resources used if you create a stack from this template.", "Description": "Create a multi-az, load balanced, Auto Scaled sample web site. The Auto Scaling trigger is based on the CPU utilization of the web servers. The AMI is chosen based on the region in which the stack is run. This example creates a web service running across all availability zones in a region. The instances are load balanced with a simple health check. The web site is available on port 80, however, the instances can be configured to listen on any port (8888 by default). **WARNING** This template creates one or more Amazon EC2 instances. You will be billed for the AWS resources used if you create a stack from this template.",
"Resources": { "Resources": {
"WebServerGroup": { "Queue": {
"Type": "AWS::SQS::Queue", "Type": "AWS::SQS::Queue",
"Properties": { "Properties": {
"QueueName": "my-queue", "QueueName": "my-queue",
@ -25,7 +26,55 @@ dummy_template = {
}, },
} }
name_type_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Create a multi-az, load balanced, Auto Scaled sample web site. The Auto Scaling trigger is based on the CPU utilization of the web servers. The AMI is chosen based on the region in which the stack is run. This example creates a web service running across all availability zones in a region. The instances are load balanced with a simple health check. The web site is available on port 80, however, the instances can be configured to listen on any port (8888 by default). **WARNING** This template creates one or more Amazon EC2 instances. You will be billed for the AWS resources used if you create a stack from this template.",
"Resources": {
"Queue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"VisibilityTimeout": 60,
}
},
},
}
output_dict = {
"Outputs": {
"Output1": {
"Value": {"Ref": "Queue"},
"Description": "This is a description."
}
}
}
bad_output = {
"Outputs": {
"Output1": {
"Value": {"Fn::GetAtt": ["Queue", "InvalidAttribute"]}
}
}
}
get_attribute_output = {
"Outputs": {
"Output1": {
"Value": {"Fn::GetAtt": ["Queue", "QueueName"]}
}
}
}
outputs_template = dict(list(dummy_template.items()) + list(output_dict.items()))
bad_outputs_template = dict(list(dummy_template.items()) + list(bad_output.items()))
get_attribute_outputs_template = dict(list(dummy_template.items()) + list(get_attribute_output.items()))
dummy_template_json = json.dumps(dummy_template) dummy_template_json = json.dumps(dummy_template)
name_type_template_json = json.dumps(name_type_template)
output_type_template_json = json.dumps(outputs_template)
bad_output_template_json = json.dumps(bad_outputs_template)
get_attribute_outputs_template_json = json.dumps(get_attribute_outputs_template)
def test_parse_stack_resources(): def test_parse_stack_resources():
@ -36,7 +85,7 @@ def test_parse_stack_resources():
) )
stack.resource_map.should.have.length_of(1) stack.resource_map.should.have.length_of(1)
list(stack.resource_map.keys())[0].should.equal('WebServerGroup') list(stack.resource_map.keys())[0].should.equal('Queue')
queue = list(stack.resource_map.values())[0] queue = list(stack.resource_map.values())[0]
queue.should.be.a(Queue) queue.should.be.a(Queue)
queue.name.should.equal("my-queue") queue.name.should.equal("my-queue")
@ -46,3 +95,46 @@ def test_parse_stack_resources():
def test_missing_resource_logs(logger): def test_missing_resource_logs(logger):
resource_class_from_type("foobar") resource_class_from_type("foobar")
logger.warning.assert_called_with('No Moto CloudFormation support for %s', 'foobar') logger.warning.assert_called_with('No Moto CloudFormation support for %s', 'foobar')
def test_parse_stack_with_name_type_resource():
stack = FakeStack(
stack_id="test_id",
name="test_stack",
template=name_type_template_json)
stack.resource_map.should.have.length_of(1)
list(stack.resource_map.keys())[0].should.equal('Queue')
queue = list(stack.resource_map.values())[0]
queue.should.be.a(Queue)
def test_parse_stack_with_outputs():
stack = FakeStack(
stack_id="test_id",
name="test_stack",
template=output_type_template_json)
stack.output_map.should.have.length_of(1)
list(stack.output_map.keys())[0].should.equal('Output1')
output = list(stack.output_map.values())[0]
output.should.be.a(Output)
output.description.should.equal("This is a description.")
def test_parse_stack_with_get_attribute_outputs():
stack = FakeStack(
stack_id="test_id",
name="test_stack",
template=get_attribute_outputs_template_json)
stack.output_map.should.have.length_of(1)
list(stack.output_map.keys())[0].should.equal('Output1')
output = list(stack.output_map.values())[0]
output.should.be.a(Output)
output.value.should.equal("my-queue")
def test_parse_stack_with_bad_get_attribute_outputs():
FakeStack.when.called_with(
"test_id", "test_stack", bad_output_template_json).should.throw(BotoServerError)