CloudFormation - support AWS::CloudFormation::Stack-resources (#6086)

This commit is contained in:
Bert Blommers 2023-03-18 15:19:06 -01:00 committed by GitHub
parent 8bf55cbe0e
commit 75d1018c28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 302 additions and 93 deletions

View File

@ -25,6 +25,7 @@ from .utils import (
generate_stackset_id,
yaml_tag_constructor,
validate_template_cfn_lint,
get_stack_from_s3_url,
)
from .exceptions import ValidationError, StackSetNotEmpty, StackSetNotFoundException
@ -356,7 +357,7 @@ class FakeStackInstances(BaseModel):
return self.stack_instances[i]
class FakeStack(BaseModel):
class FakeStack(CloudFormationModel):
def __init__(
self,
stack_id: str,
@ -532,6 +533,68 @@ class FakeStack(BaseModel):
self._add_stack_event("DELETE_COMPLETE")
self.status = "DELETE_COMPLETE"
@staticmethod
def cloudformation_type() -> str:
return "AWS::CloudFormation::Stack"
@classmethod
def has_cfn_attr(cls, attr: str) -> bool: # pylint: disable=unused-argument
return True
@property
def physical_resource_id(self) -> str:
return self.name
@classmethod
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Dict[str, Any],
account_id: str,
region_name: str,
**kwargs: Any,
) -> "FakeStack":
cf_backend: CloudFormationBackend = cloudformation_backends[account_id][
region_name
]
properties = cloudformation_json["Properties"]
template_body = get_stack_from_s3_url(properties["TemplateURL"], account_id)
parameters = properties.get("Parameters", {})
return cf_backend.create_stack(
name=resource_name, template=template_body, parameters=parameters
)
@classmethod
def update_from_cloudformation_json( # type: ignore[misc]
cls,
original_resource: Any,
new_resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
) -> "FakeStack":
cls.delete_from_cloudformation_json(
original_resource.name, cloudformation_json, account_id, region_name
)
return cls.create_from_cloudformation_json(
new_resource_name, cloudformation_json, account_id, region_name
)
@classmethod
def delete_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Dict[str, Any],
account_id: str,
region_name: str,
) -> None:
cf_backend: CloudFormationBackend = cloudformation_backends[account_id][
region_name
]
cf_backend.delete_stack(resource_name)
class FakeChange(BaseModel):
def __init__(self, action: str, logical_resource_id: str, resource_type: str):

View File

@ -6,6 +6,7 @@ import warnings
import re
import collections.abc as collections_abc
from functools import lru_cache
from typing import (
Any,
Dict,
@ -70,13 +71,6 @@ from .exceptions import (
UnsupportedAttribute,
)
# List of supported CloudFormation models
MODEL_LIST = CloudFormationModel.__subclasses__()
MODEL_MAP = {model.cloudformation_type(): model for model in MODEL_LIST}
NAME_TYPE_MAP = {
model.cloudformation_type(): model.cloudformation_name_type()
for model in MODEL_LIST
}
CF_MODEL = TypeVar("CF_MODEL", bound=CloudFormationModel)
# Just ignore these models types for now
@ -90,6 +84,25 @@ DEFAULT_REGION = "us-east-1"
logger = logging.getLogger("moto")
# List of supported CloudFormation models
@lru_cache()
def get_model_list() -> List[Type[CloudFormationModel]]:
return CloudFormationModel.__subclasses__()
@lru_cache()
def get_model_map() -> Dict[str, Type[CloudFormationModel]]:
return {model.cloudformation_type(): model for model in get_model_list()}
@lru_cache()
def get_name_type_map() -> Dict[str, str]:
return {
model.cloudformation_type(): model.cloudformation_name_type()
for model in get_model_list()
}
class Output(object):
def __init__(self, key: str, value: str, description: str):
self.description = description
@ -250,18 +263,19 @@ def resource_class_from_type(resource_type: str) -> Type[CloudFormationModel]:
return None # type: ignore[return-value]
if resource_type.startswith("Custom::"):
return CustomModel
if resource_type not in MODEL_MAP:
if resource_type not in get_model_map():
logger.warning("No Moto CloudFormation support for %s", resource_type)
return None # type: ignore[return-value]
return MODEL_MAP.get(resource_type) # type: ignore[return-value]
return get_model_map()[resource_type] # type: ignore[return-value]
def resource_name_property_from_type(resource_type: str) -> Optional[str]:
for model in MODEL_LIST:
for model in get_model_list():
if model.cloudformation_type() == resource_type:
return model.cloudformation_name_type()
return NAME_TYPE_MAP.get(resource_type)
return get_name_type_map().get(resource_type)
def generate_resource_name(resource_type: str, stack_name: str, logical_id: str) -> str:
@ -830,9 +844,9 @@ class ResourceMap(collections_abc.Mapping): # type: ignore[type-arg]
not isinstance(parsed_resource, str)
and parsed_resource is not None
):
if parsed_resource and hasattr(parsed_resource, "delete"):
try:
parsed_resource.delete(self._account_id, self._region_name)
else:
except (TypeError, AttributeError):
if hasattr(parsed_resource, "physical_resource_id"):
resource_name = parsed_resource.physical_resource_id
else:

View File

@ -2,17 +2,15 @@ import json
import re
import yaml
from typing import Any, Dict, Tuple, List, Optional, Union
from urllib.parse import urlparse
from yaml.parser import ParserError # pylint:disable=c-extension-no-member
from yaml.scanner import ScannerError # pylint:disable=c-extension-no-member
from moto.core.responses import BaseResponse
from moto.s3.models import s3_backends
from moto.s3.exceptions import S3ClientError
from moto.utilities.aws_headers import amzn_request_id
from .models import cloudformation_backends, CloudFormationBackend, FakeStack
from .exceptions import ValidationError, MissingParameterError
from .utils import yaml_tag_constructor
from .utils import yaml_tag_constructor, get_stack_from_s3_url
def get_template_summary_response_from_template(template_body: str) -> Dict[str, Any]:
@ -54,28 +52,7 @@ class CloudFormationResponse(BaseResponse):
return cls.dispatch(request=request, full_url=full_url, headers=headers)
def _get_stack_from_s3_url(self, template_url: str) -> str:
template_url_parts = urlparse(template_url)
if "localhost" in template_url:
bucket_name, key_name = template_url_parts.path.lstrip("/").split("/", 1)
else:
if template_url_parts.netloc.endswith(
"amazonaws.com"
) and template_url_parts.netloc.startswith("s3"):
# Handle when S3 url uses amazon url with bucket in path
# Also handles getting region as technically s3 is region'd
# region = template_url.netloc.split('.')[1]
bucket_name, key_name = template_url_parts.path.lstrip("/").split(
"/", 1
)
else:
bucket_name = template_url_parts.netloc.split(".")[0]
key_name = template_url_parts.path.lstrip("/")
key = s3_backends[self.current_account]["global"].get_object(
bucket_name, key_name
)
return key.value.decode("utf-8")
return get_stack_from_s3_url(template_url, account_id=self.current_account)
def _get_params_from_list(
self, parameters_list: List[Dict[str, Any]]

View File

@ -3,6 +3,7 @@ import os
import string
from moto.moto_api._internal import mock_random as random
from typing import Any, List
from urllib.parse import urlparse
def generate_stack_id(stack_name: str, region: str, account: str) -> str:
@ -84,3 +85,26 @@ def validate_template_cfn_lint(template: str) -> List[Any]:
matches = core.run_checks(abs_filename, template, rules, regions)
return matches
def get_stack_from_s3_url(template_url: str, account_id: str) -> str:
from moto.s3.models import s3_backends
template_url_parts = urlparse(template_url)
if "localhost" in template_url:
bucket_name, key_name = template_url_parts.path.lstrip("/").split("/", 1)
else:
if template_url_parts.netloc.endswith(
"amazonaws.com"
) and template_url_parts.netloc.startswith("s3"):
# Handle when S3 url uses amazon url with bucket in path
# Also handles getting region as technically s3 is region'd
# region = template_url.netloc.split('.')[1]
bucket_name, key_name = template_url_parts.path.lstrip("/").split("/", 1)
else:
bucket_name = template_url_parts.netloc.split(".")[0]
key_name = template_url_parts.path.lstrip("/")
key = s3_backends[account_id]["global"].get_object(bucket_name, key_name)
return key.value.decode("utf-8")

View File

@ -4,7 +4,7 @@ import datetime
import json
from collections import OrderedDict
from moto.core import BaseBackend, BackendDict, BaseModel, CloudFormationModel
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.core.utils import unix_time
from .comparisons import get_comparison_func
@ -94,7 +94,7 @@ class Item(BaseModel):
return {"Item": included}
class Table(CloudFormationModel):
class Table(BaseModel):
def __init__(
self,
account_id: str,
@ -151,45 +151,6 @@ class Table(CloudFormationModel):
}
return results
@staticmethod
def cloudformation_name_type() -> str:
return "TableName"
@staticmethod
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-dynamodb-table.html
return "AWS::DynamoDB::Table"
@classmethod
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Dict[str, Any],
account_id: str,
region_name: str,
**kwargs: Any,
) -> "Table":
properties = cloudformation_json["Properties"]
key_attr = [
i["AttributeName"]
for i in properties["KeySchema"]
if i["KeyType"] == "HASH"
][0]
key_type = [
i["AttributeType"]
for i in properties["AttributeDefinitions"]
if i["AttributeName"] == key_attr
][0]
spec = {
"account_id": account_id,
"name": properties["TableName"],
"hash_key_attr": key_attr,
"hash_key_type": key_type,
}
# TODO: optional properties still missing:
# range_key_attr, range_key_type, read_capacity, write_capacity
return Table(**spec)
def __len__(self) -> int:
return sum(
[(len(value) if self.has_range_key else 1) for value in self.items.values()] # type: ignore
@ -324,19 +285,6 @@ class Table(CloudFormationModel):
item.attrs[attr].add(DynamoType(update["Value"]))
return item
@classmethod
def has_cfn_attr(cls, attr: str) -> bool:
return attr in ["StreamArn"]
def get_cfn_attribute(self, attribute_name: str) -> str:
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == "StreamArn":
region = "us-east-1"
time = "2000-01-01T00:00:00.000"
return f"arn:aws:dynamodb:{region}:{self.account_id}:table/{self.name}/stream/{time}"
raise UnformattedGetAttTemplateException()
class DynamoDBBackend(BaseBackend):
def __init__(self, region_name: str, account_id: str):

View File

@ -0,0 +1,183 @@
import boto3
import json
from uuid import uuid4
from moto import mock_cloudformation, mock_s3
@mock_cloudformation
@mock_s3
def test_create_basic_stack():
# Create inner template
cf = boto3.client("cloudformation", "us-east-1")
bucket_created_by_cf = str(uuid4())
template = get_inner_template(bucket_created_by_cf)
# Upload inner template to S3
s3 = boto3.client("s3", "us-east-1")
cf_storage_bucket = str(uuid4())
s3.create_bucket(Bucket=cf_storage_bucket)
s3.put_object(Bucket=cf_storage_bucket, Key="stack.json", Body=json.dumps(template))
# Create template that includes the inner template
stack_name = "a" + str(uuid4())[0:6]
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"NestedStack": {
"Type": "AWS::CloudFormation::Stack",
"Properties": {
"TemplateURL": f"https://s3.amazonaws.com/{cf_storage_bucket}/stack.json",
},
},
},
}
cf.create_stack(StackName=stack_name, TemplateBody=str(template))
# Verify the inner S3 bucket has been created
bucket_names = sorted([b["Name"] for b in s3.list_buckets()["Buckets"]])
assert bucket_names == sorted([cf_storage_bucket, bucket_created_by_cf])
# Verify both stacks are created
stacks = cf.list_stacks()["StackSummaries"]
assert len(stacks) == 2
@mock_cloudformation
@mock_s3
def test_create_stack_with_params():
# Create inner template
cf = boto3.client("cloudformation", "us-east-1")
bucket_created_by_cf = str(uuid4())
inner_template = json.dumps(get_inner_template_with_params())
# Upload inner template to S3
s3 = boto3.client("s3", "us-east-1")
cf_storage_bucket = str(uuid4())
s3.create_bucket(Bucket=cf_storage_bucket)
s3.put_object(Bucket=cf_storage_bucket, Key="stack.json", Body=inner_template)
# Create template that includes the inner template
stack_name = "a" + str(uuid4())[0:6]
template = get_outer_template_with_params(cf_storage_bucket, bucket_created_by_cf)
cf.create_stack(StackName=stack_name, TemplateBody=str(template))
# Verify the inner S3 bucket has been created
bucket_names = sorted([b["Name"] for b in s3.list_buckets()["Buckets"]])
assert bucket_names == sorted([cf_storage_bucket, bucket_created_by_cf])
@mock_cloudformation
@mock_s3
def test_update_stack_with_params():
# Create inner template
cf = boto3.client("cloudformation", "us-east-1")
first_bucket = str(uuid4())
second_bucket = str(uuid4())
inner_template = json.dumps(get_inner_template_with_params())
# Upload inner template to S3
s3 = boto3.client("s3", "us-east-1")
cf_storage_bucket = str(uuid4())
s3.create_bucket(Bucket=cf_storage_bucket)
s3.put_object(Bucket=cf_storage_bucket, Key="stack.json", Body=inner_template)
# Create template that includes the inner template
stack_name = "a" + str(uuid4())[0:6]
template = get_outer_template_with_params(cf_storage_bucket, first_bucket)
cf.create_stack(StackName=stack_name, TemplateBody=str(template))
# Verify the inner S3 bucket has been created
bucket_names = sorted([b["Name"] for b in s3.list_buckets()["Buckets"]])
assert bucket_names == sorted([cf_storage_bucket, first_bucket])
# Update stack
template = get_outer_template_with_params(cf_storage_bucket, second_bucket)
cf.update_stack(StackName=stack_name, TemplateBody=str(template))
# Verify the inner S3 bucket has been created
bucket_names = sorted([b["Name"] for b in s3.list_buckets()["Buckets"]])
assert bucket_names == sorted([cf_storage_bucket, second_bucket])
@mock_cloudformation
@mock_s3
def test_delete_basic_stack():
# Create inner template
cf = boto3.client("cloudformation", "us-east-1")
bucket_created_by_cf = str(uuid4())
template = get_inner_template(bucket_created_by_cf)
# Upload inner template to S3
s3 = boto3.client("s3", "us-east-1")
cf_storage_bucket = str(uuid4())
s3.create_bucket(Bucket=cf_storage_bucket)
s3.put_object(Bucket=cf_storage_bucket, Key="stack.json", Body=json.dumps(template))
# Create template that includes the inner template
stack_name = "a" + str(uuid4())[0:6]
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"NestedStack": {
"Type": "AWS::CloudFormation::Stack",
"Properties": {
"TemplateURL": f"https://s3.amazonaws.com/{cf_storage_bucket}/stack.json",
},
},
},
}
cf.create_stack(StackName=stack_name, TemplateBody=str(template))
cf.delete_stack(StackName=stack_name)
# Verify the stack-controlled S3 bucket has been deleted
bucket_names = sorted([b["Name"] for b in s3.list_buckets()["Buckets"]])
assert bucket_names == [cf_storage_bucket]
# Verify both stacks are deleted
stacks = cf.list_stacks()["StackSummaries"]
assert len(stacks) == 2
for stack in stacks:
assert stack["StackStatus"] == "DELETE_COMPLETE"
def get_inner_template(bucket_created_by_cf):
return {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"bcbcf": {
"Type": "AWS::S3::Bucket",
"Properties": {"BucketName": bucket_created_by_cf},
}
},
"Outputs": {"Bucket": {"Value": {"Ref": "bcbcf"}}},
}
def get_inner_template_with_params():
return {
"AWSTemplateFormatVersion": "2010-09-09",
"Parameters": {
"BName": {"Description": "bucket name", "Type": "String"},
},
"Resources": {
"bcbcf": {
"Type": "AWS::S3::Bucket",
"Properties": {"BucketName": {"Ref": "BName"}},
}
},
}
def get_outer_template_with_params(cf_storage_bucket, first_bucket):
return {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"NestedStack": {
"Type": "AWS::CloudFormation::Stack",
"Properties": {
"TemplateURL": f"https://s3.amazonaws.com/{cf_storage_bucket}/stack.json",
"Parameters": {"BName": first_bucket},
},
},
},
}