TechDebt: MyPy CloudFormation (#5603)

This commit is contained in:
Bert Blommers 2022-10-26 21:36:02 +00:00 committed by GitHub
parent dc368fbe79
commit 03f5518703
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 559 additions and 455 deletions

View File

@ -1075,10 +1075,6 @@ class EventSourceMapping(CloudFormationModel):
lambda_backend = lambda_backends[account_id][region_name]
lambda_backend.delete_event_source_mapping(self.uuid)
@staticmethod
def cloudformation_name_type() -> None:
return None
@staticmethod
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html
@ -1142,10 +1138,6 @@ class LambdaVersion(CloudFormationModel):
def __repr__(self) -> str:
return str(self.logical_resource_id) # type: ignore[attr-defined]
@staticmethod
def cloudformation_name_type() -> None:
return None
@staticmethod
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-version.html

View File

@ -1,5 +1,6 @@
import json
import threading
from typing import Any, Dict
from moto import settings
from moto.core import CloudFormationModel
@ -8,33 +9,40 @@ from moto.moto_api._internal import mock_random
class CustomModel(CloudFormationModel):
def __init__(self, region_name, request_id, logical_id, resource_name):
def __init__(
self, region_name: str, request_id: str, logical_id: str, resource_name: str
):
self.region_name = region_name
self.request_id = request_id
self.logical_id = logical_id
self.resource_name = resource_name
self.data = dict()
self.data: Dict[str, Any] = dict()
self._finished = False
def set_data(self, data):
def set_data(self, data: Dict[str, Any]) -> None:
self.data = data
self._finished = True
def is_created(self):
def is_created(self) -> bool:
return self._finished
@property
def physical_resource_id(self):
def physical_resource_id(self) -> str:
return self.resource_name
@staticmethod
def cloudformation_type():
def cloudformation_type() -> str:
return "?"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Dict[str, Any],
account_id: str,
region_name: str,
**kwargs: Any,
) -> "CustomModel":
logical_id = kwargs["LogicalId"]
stack_id = kwargs["StackId"]
resource_type = kwargs["ResourceType"]
@ -85,11 +93,11 @@ class CustomModel(CloudFormationModel):
return custom_resource
@classmethod
def has_cfn_attr(cls, attr): # pylint: disable=unused-argument
def has_cfn_attr(cls, attr: str) -> bool: # pylint: disable=unused-argument
# We don't know which attributes are supported for third-party resources
return True
def get_cfn_attribute(self, attribute_name):
def get_cfn_attribute(self, attribute_name: str) -> Any:
if attribute_name in self.data:
return self.data[attribute_name]
return None

View File

@ -1,5 +1,6 @@
from moto.core.exceptions import RESTError
from jinja2 import Template
from typing import Optional
class UnformattedGetAttTemplateException(Exception):
@ -10,7 +11,7 @@ class UnformattedGetAttTemplateException(Exception):
class ValidationError(RESTError):
def __init__(self, name_or_id=None, message=None):
def __init__(self, name_or_id: Optional[str] = None, message: Optional[str] = None):
if message is None:
message = "Stack with id {0} does not exist".format(name_or_id)
@ -20,7 +21,7 @@ class ValidationError(RESTError):
class MissingParameterError(RESTError):
def __init__(self, parameter_name):
def __init__(self, parameter_name: str):
template = Template(ERROR_RESPONSE)
message = "Missing parameter {0}".format(parameter_name)
super().__init__(error_type="ValidationError", message=message)
@ -30,7 +31,7 @@ class MissingParameterError(RESTError):
class ExportNotFound(RESTError):
"""Exception to raise if a template tries to import a non-existent export"""
def __init__(self, export_name):
def __init__(self, export_name: str):
template = Template(ERROR_RESPONSE)
message = "No export named {0} found.".format(export_name)
super().__init__(error_type="ExportNotFound", message=message)
@ -38,7 +39,7 @@ class ExportNotFound(RESTError):
class UnsupportedAttribute(ValidationError):
def __init__(self, resource, attr):
def __init__(self, resource: str, attr: str):
template = Template(ERROR_RESPONSE)
super().__init__()
self.description = template.render(

View File

@ -3,10 +3,11 @@ import json
import yaml
from collections import OrderedDict
from typing import Any, Dict, List, Optional, Iterable, Tuple, Union, Type
from yaml.parser import ParserError # pylint:disable=c-extension-no-member
from yaml.scanner import ScannerError # pylint:disable=c-extension-no-member
from moto.core import BaseBackend, BaseModel
from moto.core import BaseBackend, BaseModel, CloudFormationModel
from moto.core.utils import (
iso_8601_datetime_with_milliseconds,
iso_8601_datetime_without_milliseconds,
@ -15,7 +16,8 @@ from moto.core.utils import (
from moto.moto_api._internal import mock_random
from moto.sns.models import sns_backends
from .parsing import ResourceMap, OutputMap
from .custom_model import CustomModel
from .parsing import ResourceMap, Output, OutputMap, Export
from .utils import (
generate_changeset_id,
generate_stack_id,
@ -30,17 +32,16 @@ from .exceptions import ValidationError
class FakeStackSet(BaseModel):
def __init__(
self,
stackset_id,
account_id,
name,
template,
region="us-east-1",
status="ACTIVE",
description=None,
parameters=None,
tags=None,
admin_role="AWSCloudFormationStackSetAdministrationRole",
execution_role="AWSCloudFormationStackSetExecutionRole",
stackset_id: str,
account_id: str,
name: str,
template: str,
region: str,
description: Optional[str],
parameters: Dict[str, str],
tags: Optional[Dict[str, str]] = None,
admin_role: str = "AWSCloudFormationStackSetAdministrationRole",
execution_role: str = "AWSCloudFormationStackSetExecutionRole",
):
self.id = stackset_id
self.arn = generate_stackset_arn(stackset_id, region, account_id)
@ -52,18 +53,23 @@ class FakeStackSet(BaseModel):
self.admin_role = admin_role
self.admin_role_arn = f"arn:aws:iam::{account_id}:role/{self.admin_role}"
self.execution_role = execution_role
self.status = status
self.status = "ACTIVE"
self.instances = FakeStackInstances(parameters, self.id, self.name)
self.stack_instances = self.instances.stack_instances
self.operations = []
self.operations: List[Dict[str, Any]] = []
def _create_operation(
self, operation_id, action, status, accounts=None, regions=None
):
self,
operation_id: str,
action: str,
status: str,
accounts: Optional[List[str]] = None,
regions: Optional[List[str]] = None,
) -> Dict[str, Any]:
accounts = accounts or []
regions = regions or []
operation = {
"OperationId": str(operation_id),
"OperationId": operation_id,
"Action": action,
"Status": status,
"CreationTimestamp": datetime.now(),
@ -76,41 +82,39 @@ class FakeStackSet(BaseModel):
self.operations += [operation]
return operation
def get_operation(self, operation_id):
def get_operation(self, operation_id: str) -> Dict[str, Any]:
for operation in self.operations:
if operation_id == operation["OperationId"]:
return operation
raise ValidationError(operation_id)
def update_operation(self, operation_id, status):
def update_operation(self, operation_id: str, status: str) -> str:
operation = self.get_operation(operation_id)
operation["Status"] = status
return operation_id
def delete(self):
def delete(self) -> None:
self.status = "DELETED"
def update(
self,
template,
description,
parameters,
tags,
admin_role,
execution_role,
accounts,
regions,
operation_id=None,
):
if not operation_id:
operation_id = mock_random.uuid4()
template: str,
description: str,
parameters: Dict[str, str],
tags: Dict[str, str],
admin_role: str,
execution_role: str,
accounts: List[str],
regions: List[str],
operation_id: str,
) -> Dict[str, Any]:
self.template = template if template else self.template
self.template = template or self.template
self.description = description if description is not None else self.description
self.parameters = parameters if parameters else self.parameters
self.tags = tags if tags else self.tags
self.admin_role = admin_role if admin_role else self.admin_role
self.execution_role = execution_role if execution_role else self.execution_role
self.parameters = parameters or self.parameters
self.tags = tags or self.tags
self.admin_role = admin_role or self.admin_role
self.execution_role = execution_role or self.execution_role
if accounts and regions:
self.update_instances(accounts, regions, self.parameters)
@ -124,11 +128,12 @@ class FakeStackSet(BaseModel):
)
return operation
def create_stack_instances(self, accounts, regions, parameters, operation_id=None):
if not operation_id:
operation_id = mock_random.uuid4()
def create_stack_instances(
self, accounts: List[str], regions: List[str], parameters: List[Dict[str, Any]]
) -> None:
operation_id = str(mock_random.uuid4())
if not parameters:
parameters = self.parameters
parameters = self.parameters # type: ignore[assignment]
self.instances.create_instances(accounts, regions, parameters)
self._create_operation(
@ -139,24 +144,23 @@ class FakeStackSet(BaseModel):
regions=regions,
)
def delete_stack_instances(self, accounts, regions, operation_id=None):
if not operation_id:
operation_id = mock_random.uuid4()
def delete_stack_instances(self, accounts: List[str], regions: List[str]) -> None:
operation_id = str(mock_random.uuid4())
self.instances.delete(accounts, regions)
operation = self._create_operation(
self._create_operation(
operation_id=operation_id,
action="DELETE",
status="SUCCEEDED",
accounts=accounts,
regions=regions,
)
return operation
def update_instances(self, accounts, regions, parameters, operation_id=None):
if not operation_id:
operation_id = mock_random.uuid4()
def update_instances(
self, accounts: List[str], regions: List[str], parameters: Dict[str, str]
) -> Dict[str, Any]:
operation_id = str(mock_random.uuid4())
self.instances.update(accounts, regions, parameters)
operation = self._create_operation(
@ -170,14 +174,21 @@ class FakeStackSet(BaseModel):
class FakeStackInstances(BaseModel):
def __init__(self, parameters, stackset_id, stackset_name):
self.parameters = parameters if parameters else {}
def __init__(
self, parameters: Dict[str, str], stackset_id: str, stackset_name: str
):
self.parameters = parameters or {}
self.stackset_id = stackset_id
self.stack_name = "StackSet-{}".format(stackset_id)
self.stackset_name = stackset_name
self.stack_instances = []
self.stack_instances: List[Dict[str, Any]] = []
def create_instances(self, accounts, regions, parameters):
def create_instances(
self,
accounts: List[str],
regions: List[str],
parameters: Optional[List[Dict[str, Any]]],
) -> List[Dict[str, Any]]:
new_instances = []
for region in regions:
for account in accounts:
@ -187,13 +198,18 @@ class FakeStackInstances(BaseModel):
"Region": region,
"Account": account,
"Status": "CURRENT",
"ParameterOverrides": parameters if parameters else [],
"ParameterOverrides": parameters or [],
}
new_instances.append(instance)
self.stack_instances += new_instances
return new_instances
def update(self, accounts, regions, parameters):
def update(
self,
accounts: List[str],
regions: List[str],
parameters: Optional[Dict[str, str]],
) -> Any:
for account in accounts:
for region in regions:
instance = self.get_instance(account, region)
@ -202,12 +218,12 @@ class FakeStackInstances(BaseModel):
else:
instance["ParameterOverrides"] = []
def delete(self, accounts, regions):
def delete(self, accounts: List[str], regions: List[str]) -> None:
for i, instance in enumerate(self.stack_instances):
if instance["Region"] in regions and instance["Account"] in accounts:
self.stack_instances.pop(i)
def get_instance(self, account, region):
def get_instance(self, account: str, region: str) -> Dict[str, Any]: # type: ignore[return]
for i, instance in enumerate(self.stack_instances):
if instance["Region"] == region and instance["Account"] == account:
return self.stack_instances[i]
@ -216,16 +232,16 @@ class FakeStackInstances(BaseModel):
class FakeStack(BaseModel):
def __init__(
self,
stack_id,
name,
template,
parameters,
account_id,
region_name,
notification_arns=None,
tags=None,
role_arn=None,
cross_stack_resources=None,
stack_id: str,
name: str,
template: Union[str, Dict[str, Any]],
parameters: Dict[str, str],
account_id: str,
region_name: str,
notification_arns: Optional[List[str]] = None,
tags: Optional[Dict[str, str]] = None,
role_arn: Optional[str] = None,
cross_stack_resources: Optional[Dict[str, Export]] = None,
):
self.stack_id = stack_id
self.name = name
@ -235,26 +251,26 @@ class FakeStack(BaseModel):
self._parse_template()
self.description = self.template_dict.get("Description")
else:
self.template_dict = {}
self.template_dict: Dict[str, Any] = {}
self.description = None
self.parameters = parameters
self.region_name = region_name
self.notification_arns = notification_arns if notification_arns else []
self.role_arn = role_arn
self.tags = tags if tags else {}
self.events = []
self.events: List[FakeEvent] = []
self.policy = ""
self.cross_stack_resources = cross_stack_resources or {}
self.cross_stack_resources: Dict[str, Export] = cross_stack_resources or {}
self.resource_map = self._create_resource_map()
self.custom_resources = dict()
self.custom_resources: Dict[str, CustomModel] = dict()
self.output_map = self._create_output_map()
self.creation_time = datetime.utcnow()
self.status = "CREATE_PENDING"
def has_template(self, other_template):
def has_template(self, other_template: str) -> bool:
our_template = (
self.template
if isinstance(self.template, dict)
@ -262,10 +278,10 @@ class FakeStack(BaseModel):
)
return our_template == json.loads(other_template)
def has_parameters(self, other_parameters):
def has_parameters(self, other_parameters: Dict[str, Any]) -> bool:
return self.parameters == other_parameters
def _create_resource_map(self):
def _create_resource_map(self) -> ResourceMap:
resource_map = ResourceMap(
self.stack_id,
self.name,
@ -279,16 +295,19 @@ class FakeStack(BaseModel):
resource_map.load()
return resource_map
def _create_output_map(self):
def _create_output_map(self) -> OutputMap:
return OutputMap(self.resource_map, self.template_dict, self.stack_id)
@property
def creation_time_iso_8601(self):
return iso_8601_datetime_without_milliseconds(self.creation_time)
def creation_time_iso_8601(self) -> str:
return iso_8601_datetime_without_milliseconds(self.creation_time) # type: ignore[return-value]
def _add_stack_event(
self, resource_status, resource_status_reason=None, resource_properties=None
):
self,
resource_status: str,
resource_status_reason: Optional[str] = None,
resource_properties: Optional[str] = None,
) -> None:
event = FakeEvent(
stack_id=self.stack_id,
@ -304,58 +323,36 @@ class FakeStack(BaseModel):
event.sendToSns(self.account_id, self.region_name, self.notification_arns)
self.events.append(event)
def _add_resource_event(
self,
logical_resource_id,
resource_status,
resource_status_reason=None,
resource_properties=None,
):
# not used yet... feel free to help yourself
resource = self.resource_map[logical_resource_id]
self.events.append(
FakeEvent(
stack_id=self.stack_id,
stack_name=self.name,
logical_resource_id=logical_resource_id,
physical_resource_id=resource.physical_resource_id,
resource_type=resource.type,
resource_status=resource_status,
resource_status_reason=resource_status_reason,
resource_properties=resource_properties,
)
)
def _parse_template(self):
def _parse_template(self) -> None:
yaml.add_multi_constructor("", yaml_tag_constructor)
try:
self.template_dict = yaml.load(self.template, Loader=yaml.Loader)
self.template_dict = yaml.load(self.template, Loader=yaml.Loader) # type: ignore[arg-type]
except (ParserError, ScannerError):
self.template_dict = json.loads(self.template)
self.template_dict = json.loads(self.template) # type: ignore[arg-type]
@property
def stack_parameters(self):
def stack_parameters(self) -> Dict[str, Any]: # type: ignore[misc]
return self.resource_map.resolved_parameters
@property
def stack_resources(self):
def stack_resources(self) -> Iterable[Type[CloudFormationModel]]:
return self.resource_map.values()
@property
def stack_outputs(self):
def stack_outputs(self) -> List[Output]:
return [v for v in self.output_map.values() if v]
@property
def exports(self):
def exports(self) -> List[Export]:
return self.output_map.exports
def add_custom_resource(self, custom_resource):
def add_custom_resource(self, custom_resource: CustomModel) -> None:
self.custom_resources[custom_resource.logical_id] = custom_resource
def get_custom_resource(self, custom_resource):
def get_custom_resource(self, custom_resource: str) -> CustomModel:
return self.custom_resources[custom_resource]
def create_resources(self):
def create_resources(self) -> None:
self.status = "CREATE_IN_PROGRESS"
all_resources_ready = self.resource_map.create(self.template_dict)
# Set the description of the stack
@ -363,15 +360,21 @@ class FakeStack(BaseModel):
if all_resources_ready:
self.mark_creation_complete()
def verify_readiness(self):
def verify_readiness(self) -> None:
if self.resource_map.creation_complete():
self.mark_creation_complete()
def mark_creation_complete(self):
def mark_creation_complete(self) -> None:
self.status = "CREATE_COMPLETE"
self._add_stack_event("CREATE_COMPLETE")
def update(self, template, role_arn=None, parameters=None, tags=None):
def update(
self,
template: str,
role_arn: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None,
tags: Optional[Dict[str, str]] = None,
) -> None:
self._add_stack_event(
"UPDATE_IN_PROGRESS", resource_status_reason="User Initiated"
)
@ -387,7 +390,7 @@ class FakeStack(BaseModel):
self.tags = tags
# TODO: update tags in the resource map
def delete(self):
def delete(self) -> None:
self._add_stack_event(
"DELETE_IN_PROGRESS", resource_status_reason="User Initiated"
)
@ -397,7 +400,7 @@ class FakeStack(BaseModel):
class FakeChange(BaseModel):
def __init__(self, action, logical_resource_id, resource_type):
def __init__(self, action: str, logical_resource_id: str, resource_type: str):
self.action = action
self.logical_resource_id = logical_resource_id
self.resource_type = resource_type
@ -406,16 +409,16 @@ class FakeChange(BaseModel):
class FakeChangeSet(BaseModel):
def __init__(
self,
change_set_type,
change_set_id,
change_set_name,
stack,
template,
parameters,
description,
notification_arns=None,
tags=None,
role_arn=None,
change_set_type: str,
change_set_id: str,
change_set_name: str,
stack: FakeStack,
template: str,
parameters: Dict[str, str],
description: str,
notification_arns: Optional[List[str]] = None,
tags: Optional[Dict[str, str]] = None,
role_arn: Optional[str] = None,
):
self.change_set_type = change_set_type
self.change_set_id = change_set_id
@ -435,7 +438,11 @@ class FakeChangeSet(BaseModel):
self.creation_time = datetime.utcnow()
self.changes = self.diff()
def _parse_template(self):
self.status: Optional[str] = None
self.execution_status: Optional[str] = None
self.status_reason: Optional[str] = None
def _parse_template(self) -> None:
yaml.add_multi_constructor("", yaml_tag_constructor)
try:
self.template_dict = yaml.load(self.template, Loader=yaml.Loader)
@ -443,13 +450,13 @@ class FakeChangeSet(BaseModel):
self.template_dict = json.loads(self.template)
@property
def creation_time_iso_8601(self):
return iso_8601_datetime_without_milliseconds(self.creation_time)
def creation_time_iso_8601(self) -> str:
return iso_8601_datetime_without_milliseconds(self.creation_time) # type: ignore[return-value]
def diff(self):
def diff(self) -> List[FakeChange]:
changes = []
resources_by_action = self.stack.resource_map.build_change_set_actions(
self.template_dict, self.parameters
self.template_dict
)
for action, resources in resources_by_action.items():
for resource_name, resource in resources.items():
@ -462,22 +469,21 @@ class FakeChangeSet(BaseModel):
)
return changes
def apply(self):
def apply(self) -> None:
self.stack.resource_map.update(self.template_dict, self.parameters)
class FakeEvent(BaseModel):
def __init__(
self,
stack_id,
stack_name,
logical_resource_id,
physical_resource_id,
resource_type,
resource_status,
resource_status_reason=None,
resource_properties=None,
client_request_token=None,
stack_id: str,
stack_name: str,
logical_resource_id: str,
physical_resource_id: str,
resource_type: str,
resource_status: str,
resource_status_reason: Optional[str],
resource_properties: Optional[str],
):
self.stack_id = stack_id
self.stack_name = stack_name
@ -489,9 +495,11 @@ class FakeEvent(BaseModel):
self.resource_properties = resource_properties
self.timestamp = datetime.utcnow()
self.event_id = mock_random.uuid4()
self.client_request_token = client_request_token
self.client_request_token = None
def sendToSns(self, account_id, region, sns_topic_arns):
def sendToSns(
self, account_id: str, region: str, sns_topic_arns: List[str]
) -> None:
message = """StackId='{stack_id}'
Timestamp='{timestamp}'
EventId='{event_id}'
@ -522,7 +530,9 @@ ClientRequestToken='{client_request_token}'""".format(
)
def filter_stacks(all_stacks, status_filter):
def filter_stacks(
all_stacks: List[FakeStack], status_filter: Optional[List[str]]
) -> List[FakeStack]:
filtered_stacks = []
if not status_filter:
return all_stacks
@ -539,22 +549,28 @@ class CloudFormationBackend(BaseBackend):
This means it has to run inside a Docker-container, or be started using `moto_server -h 0.0.0.0`.
"""
def __init__(self, region_name, account_id):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.stacks = OrderedDict()
self.stacksets = OrderedDict()
self.deleted_stacks = {}
self.exports = OrderedDict()
self.change_sets = OrderedDict()
self.stacks: Dict[str, FakeStack] = OrderedDict()
self.stacksets: Dict[str, FakeStackSet] = OrderedDict()
self.deleted_stacks: Dict[str, FakeStack] = {}
self.exports: Dict[str, Export] = OrderedDict()
self.change_sets: Dict[str, FakeChangeSet] = OrderedDict()
@staticmethod
def default_vpc_endpoint_service(service_region, zones):
def default_vpc_endpoint_service(
service_region: str, zones: List[str]
) -> List[Dict[str, str]]:
"""Default VPC endpoint service."""
return BaseBackend.default_vpc_endpoint_service_factory(
service_region, zones, "cloudformation", policy_supported=False
)
def _resolve_update_parameters(self, instance, incoming_params):
def _resolve_update_parameters(
self,
instance: Union[FakeStack, FakeStackSet],
incoming_params: List[Dict[str, str]],
) -> Dict[str, str]:
parameters = dict(
[
(parameter["parameter_key"], parameter["parameter_value"])
@ -578,30 +594,26 @@ class CloudFormationBackend(BaseBackend):
def create_stack_set(
self,
name,
template,
parameters,
tags=None,
description=None,
admin_role=None,
execution_role=None,
):
name: str,
template: str,
parameters: Dict[str, str],
tags: Dict[str, str],
) -> FakeStackSet:
stackset_id = generate_stackset_id(name)
new_stackset = FakeStackSet(
stackset_id=stackset_id,
account_id=self.account_id,
name=name,
region=self.region_name,
template=template,
parameters=parameters,
description=description,
description=None,
tags=tags,
admin_role=admin_role,
execution_role=execution_role,
)
self.stacksets[stackset_id] = new_stackset
return new_stackset
def get_stack_set(self, name):
def get_stack_set(self, name: str) -> FakeStackSet:
stacksets = self.stacksets.keys()
if name in stacksets:
return self.stacksets[name]
@ -610,7 +622,7 @@ class CloudFormationBackend(BaseBackend):
return self.stacksets[stackset]
raise ValidationError(name)
def delete_stack_set(self, name):
def delete_stack_set(self, name: str) -> None:
stacksets = self.stacksets.keys()
if name in stacksets:
self.stacksets[name].delete()
@ -619,31 +631,44 @@ class CloudFormationBackend(BaseBackend):
self.stacksets[stackset].delete()
def create_stack_instances(
self, stackset_name, accounts, regions, parameters, operation_id=None
):
self,
stackset_name: str,
accounts: List[str],
regions: List[str],
parameters: List[Dict[str, str]],
) -> FakeStackSet:
stackset = self.get_stack_set(stackset_name)
stackset.create_stack_instances(
accounts=accounts,
regions=regions,
parameters=parameters,
operation_id=operation_id,
)
return stackset
def update_stack_instances(
self,
stackset_name: str,
accounts: List[str],
regions: List[str],
parameters: Dict[str, str],
) -> Dict[str, Any]:
stack_set = self.get_stack_set(stackset_name)
return stack_set.update_instances(accounts, regions, parameters)
def update_stack_set(
self,
stackset_name,
template=None,
description=None,
parameters=None,
tags=None,
admin_role=None,
execution_role=None,
accounts=None,
regions=None,
operation_id=None,
):
stackset_name: str,
template: str,
description: str,
parameters: List[Dict[str, str]],
tags: Dict[str, str],
admin_role: str,
execution_role: str,
accounts: List[str],
regions: List[str],
operation_id: str,
) -> Dict[str, Any]:
stackset = self.get_stack_set(stackset_name)
resolved_parameters = self._resolve_update_parameters(
instance=stackset, incoming_params=parameters
@ -662,21 +687,21 @@ class CloudFormationBackend(BaseBackend):
return update
def delete_stack_instances(
self, stackset_name, accounts, regions, operation_id=None
):
self, stackset_name: str, accounts: List[str], regions: List[str]
) -> FakeStackSet:
stackset = self.get_stack_set(stackset_name)
stackset.delete_stack_instances(accounts, regions, operation_id)
stackset.delete_stack_instances(accounts, regions)
return stackset
def create_stack(
self,
name,
template,
parameters,
notification_arns=None,
tags=None,
role_arn=None,
):
name: str,
template: str,
parameters: Dict[str, Any],
notification_arns: Optional[List[str]] = None,
tags: Optional[Dict[str, str]] = None,
role_arn: Optional[str] = None,
) -> FakeStack:
stack_id = generate_stack_id(name, self.region_name, self.account_id)
new_stack = FakeStack(
stack_id=stack_id,
@ -702,16 +727,16 @@ class CloudFormationBackend(BaseBackend):
def create_change_set(
self,
stack_name,
change_set_name,
template,
parameters,
description,
change_set_type,
notification_arns=None,
tags=None,
role_arn=None,
):
stack_name: str,
change_set_name: str,
template: str,
parameters: Dict[str, str],
description: str,
change_set_type: str,
notification_arns: Optional[List[str]] = None,
tags: Optional[Dict[str, str]] = None,
role_arn: Optional[str] = None,
) -> Tuple[str, str]:
if change_set_type == "UPDATE":
for stack in self.stacks.values():
if stack.name == stack_name:
@ -768,7 +793,7 @@ class CloudFormationBackend(BaseBackend):
self.change_sets[change_set_id] = new_change_set
return change_set_id, stack.stack_id
def delete_change_set(self, change_set_name):
def delete_change_set(self, change_set_name: str) -> None:
if change_set_name in self.change_sets:
# This means arn was passed in
del self.change_sets[change_set_name]
@ -779,7 +804,7 @@ class CloudFormationBackend(BaseBackend):
break
del self.change_sets[to_delete]
def describe_change_set(self, change_set_name):
def describe_change_set(self, change_set_name: str) -> Optional[FakeChangeSet]:
change_set = None
if change_set_name in self.change_sets:
# This means arn was passed in
@ -792,7 +817,9 @@ class CloudFormationBackend(BaseBackend):
raise ValidationError(change_set_name)
return change_set
def execute_change_set(self, change_set_name, stack_name=None):
def execute_change_set(
self, change_set_name: str, stack_name: Optional[str] = None
) -> None:
if change_set_name in self.change_sets:
# This means arn was passed in
change_set = self.change_sets[change_set_name]
@ -823,9 +850,8 @@ class CloudFormationBackend(BaseBackend):
# set the status of the stack
stack.status = f"{change_set.change_set_type}_COMPLETE"
stack.template = change_set.template
return True
def describe_stacks(self, name_or_stack_id):
def describe_stacks(self, name_or_stack_id: str) -> List[FakeStack]:
stacks = self.stacks.values()
if name_or_stack_id:
for stack in stacks:
@ -840,16 +866,16 @@ class CloudFormationBackend(BaseBackend):
else:
return list(stacks)
def list_change_sets(self):
def list_change_sets(self) -> Iterable[FakeChangeSet]:
return self.change_sets.values()
def list_stacks(self, status_filter=None):
def list_stacks(self, status_filter: Optional[List[str]] = None) -> List[FakeStack]:
total_stacks = [v for v in self.stacks.values()] + [
v for v in self.deleted_stacks.values()
]
return filter_stacks(total_stacks, status_filter)
def get_stack(self, name_or_stack_id):
def get_stack(self, name_or_stack_id: str) -> FakeStack:
all_stacks = dict(self.deleted_stacks, **self.stacks)
if name_or_stack_id in all_stacks:
# Lookup by stack id - deleted stacks incldued
@ -861,7 +887,14 @@ class CloudFormationBackend(BaseBackend):
return stack
raise ValidationError(name_or_stack_id)
def update_stack(self, name, template, role_arn=None, parameters=None, tags=None):
def update_stack(
self,
name: str,
template: str,
role_arn: Optional[str],
parameters: List[Dict[str, Any]],
tags: Optional[Dict[str, str]],
) -> FakeStack:
stack = self.get_stack(name)
resolved_parameters = self._resolve_update_parameters(
instance=stack, incoming_params=parameters
@ -869,14 +902,14 @@ class CloudFormationBackend(BaseBackend):
stack.update(template, role_arn, parameters=resolved_parameters, tags=tags)
return stack
def get_stack_policy(self, stack_name):
def get_stack_policy(self, stack_name: str) -> str:
try:
stack = self.get_stack(stack_name)
except ValidationError:
raise ValidationError(message=f"Stack: {stack_name} does not exist")
return stack.policy
def set_stack_policy(self, stack_name, policy_body):
def set_stack_policy(self, stack_name: str, policy_body: str) -> None:
"""
Note that Moto does no validation/parsing/enforcement of this policy - we simply persist it.
"""
@ -886,41 +919,45 @@ class CloudFormationBackend(BaseBackend):
raise ValidationError(message=f"Stack: {stack_name} does not exist")
stack.policy = policy_body
def list_stack_resources(self, stack_name_or_id):
def list_stack_resources(
self, stack_name_or_id: str
) -> Iterable[Type[CloudFormationModel]]:
stack = self.get_stack(stack_name_or_id)
return stack.stack_resources
def delete_stack(self, name_or_stack_id):
def delete_stack(self, name_or_stack_id: str) -> None:
if name_or_stack_id in self.stacks:
# Delete by stack id
stack = self.stacks.pop(name_or_stack_id, None)
stack = self.stacks.pop(name_or_stack_id)
export_names = [export.name for export in stack.exports]
stack.delete()
self.deleted_stacks[stack.stack_id] = stack
for export_name in export_names:
self.exports.pop(export_name)
return self.stacks.pop(name_or_stack_id, None)
self.stacks.pop(name_or_stack_id, None)
else:
# Delete by stack name
for stack in list(self.stacks.values()):
if stack.name == name_or_stack_id:
self.delete_stack(stack.stack_id)
def list_exports(self, token):
def list_exports(
self, tokenstr: Optional[str]
) -> Tuple[List[Export], Optional[str]]:
all_exports = list(self.exports.values())
if token is None:
if tokenstr is None:
exports = all_exports[0:100]
next_token = "100" if len(all_exports) > 100 else None
else:
token = int(token)
token = int(tokenstr)
exports = all_exports[token : token + 100]
next_token = str(token + 100) if len(all_exports) > token + 100 else None
return exports, next_token
def validate_template(self, template):
def validate_template(self, template: str) -> List[Any]:
return validate_template_cfn_lint(template)
def _validate_export_uniqueness(self, stack):
def _validate_export_uniqueness(self, stack: FakeStack) -> None:
new_stack_export_names = [x.name for x in stack.exports]
export_names = self.exports.keys()
if not set(export_names).isdisjoint(new_stack_export_names):

View File

@ -6,6 +6,18 @@ import warnings
import re
import collections.abc as collections_abc
from typing import (
Any,
Dict,
List,
Union,
Iterable,
Iterator,
Optional,
Tuple,
TypeVar,
Type,
)
# This ugly section of imports is necessary because we
# build the list of CloudFormationModel subclasses using
@ -14,35 +26,34 @@ import collections.abc as collections_abc
# the subclass's module hasn't been imported yet - then that subclass
# doesn't exist yet, and __subclasses__ won't find it.
# So we import here to populate the list of subclasses.
from moto.apigateway import models # noqa # pylint: disable=all
from moto.autoscaling import models # noqa # pylint: disable=all
from moto.awslambda import models # noqa # pylint: disable=all
from moto.batch import models # noqa # pylint: disable=all
from moto.apigateway import models as apigw_models # noqa # pylint: disable=all
from moto.autoscaling import models as as_models # noqa # pylint: disable=all
from moto.awslambda import models as lambda_models # noqa # pylint: disable=all
from moto.batch import models as batch_models # noqa # pylint: disable=all
from moto.cloudformation.custom_model import CustomModel
from moto.cloudwatch import models # noqa # pylint: disable=all
from moto.datapipeline import models # noqa # pylint: disable=all
from moto.dynamodb import models # noqa # pylint: disable=all
from moto.cloudwatch import models as cw_models # noqa # pylint: disable=all
from moto.datapipeline import models as data_models # noqa # pylint: disable=all
from moto.dynamodb import models as ddb_models # noqa # pylint: disable=all
from moto.ec2 import models as ec2_models
from moto.ec2.models.core import TaggedEC2Resource
from moto.ecr import models # noqa # pylint: disable=all
from moto.ecs import models # noqa # pylint: disable=all
from moto.efs import models # noqa # pylint: disable=all
from moto.elb import models # noqa # pylint: disable=all
from moto.elbv2 import models # noqa # pylint: disable=all
from moto.events import models # noqa # pylint: disable=all
from moto.iam import models # noqa # pylint: disable=all
from moto.kinesis import models # noqa # pylint: disable=all
from moto.kms import models # noqa # pylint: disable=all
from moto.rds import models # noqa # pylint: disable=all
from moto.rds import models # noqa # pylint: disable=all
from moto.redshift import models # noqa # pylint: disable=all
from moto.route53 import models # noqa # pylint: disable=all
from moto.s3 import models # noqa # pylint: disable=all
from moto.sagemaker import models # noqa # pylint: disable=all
from moto.sns import models # noqa # pylint: disable=all
from moto.sqs import models # noqa # pylint: disable=all
from moto.stepfunctions import models # noqa # pylint: disable=all
from moto.ssm import models # noqa # pylint: disable=all
from moto.ecr import models as ecr_models # noqa # pylint: disable=all
from moto.ecs import models as ecs_models # noqa # pylint: disable=all
from moto.efs import models as efs_models # noqa # pylint: disable=all
from moto.elb import models as elb_models # noqa # pylint: disable=all
from moto.elbv2 import models as elbv2_models # noqa # pylint: disable=all
from moto.events import models as events_models # noqa # pylint: disable=all
from moto.iam import models as iam_models # noqa # pylint: disable=all
from moto.kinesis import models as kinesis_models # noqa # pylint: disable=all
from moto.kms import models as kms_models # noqa # pylint: disable=all
from moto.rds import models as rds_models # noqa # pylint: disable=all
from moto.redshift import models as redshift_models # noqa # pylint: disable=all
from moto.route53 import models as route53_models # noqa # pylint: disable=all
from moto.s3 import models as s3_models # noqa # pylint: disable=all
from moto.sagemaker import models as sagemaker_models # noqa # pylint: disable=all
from moto.sns import models as sns_models # noqa # pylint: disable=all
from moto.sqs import models as sqs_models # noqa # pylint: disable=all
from moto.stepfunctions import models as sfn_models # noqa # pylint: disable=all
from moto.ssm import models as ssm_models # noqa # pylint: disable=all
# End ugly list of imports
@ -66,6 +77,7 @@ NAME_TYPE_MAP = {
model.cloudformation_type(): model.cloudformation_name_type()
for model in MODEL_LIST
}
CF_MODEL = TypeVar("CF_MODEL", bound=CloudFormationModel)
# Just ignore these models types for now
NULL_MODELS = [
@ -79,18 +91,17 @@ logger = logging.getLogger("moto")
class Output(object):
def __init__(self, connection=None):
self.connection = connection
self.description = None
self.key = None
self.value = None
def __init__(self, key: str, value: str, description: str):
self.description = description
self.key = key
self.value = value
def __repr__(self):
def __repr__(self) -> str:
return 'Output:"%s"="%s"' % (self.key, self.value)
class LazyDict(dict):
def __getitem__(self, key):
class LazyDict(Dict[str, Any]):
def __getitem__(self, key: str) -> Any:
val = dict.__getitem__(self, key)
if callable(val):
val = val()
@ -98,7 +109,7 @@ class LazyDict(dict):
return val
def clean_json(resource_json, resources_map):
def clean_json(resource_json: Any, resources_map: "ResourceMap") -> Any:
"""
Cleanup the a resource dict. For now, this just means replacing any Ref node
with the corresponding physical_resource_id.
@ -110,7 +121,7 @@ def clean_json(resource_json, resources_map):
# Parse resource reference
resource = resources_map[resource_json["Ref"]]
if hasattr(resource, "physical_resource_id"):
return resource.physical_resource_id
return resource.physical_resource_id # type: ignore[attr-defined]
else:
return resource
@ -119,10 +130,10 @@ def clean_json(resource_json, resources_map):
map_path = resource_json["Fn::FindInMap"][1:]
result = resources_map[map_name]
for path in map_path:
if "Fn::Transform" in result:
if "Fn::Transform" in result: # type: ignore[operator]
result = resources_map[clean_json(path, resources_map)]
else:
result = result[clean_json(path, resources_map)]
result = result[clean_json(path, resources_map)] # type: ignore[index]
return result
if "Fn::GetAtt" in resource_json:
@ -201,7 +212,7 @@ def clean_json(resource_json, resources_map):
cleaned_val = clean_json(resource_json["Fn::ImportValue"], resources_map)
values = [
x.value
for x in resources_map.cross_stack_resources.values()
for x in resources_map.cross_stack_resources.values() # type: ignore[union-attr]
if x.name == cleaned_val
]
if any(values):
@ -231,26 +242,26 @@ def clean_json(resource_json, resources_map):
return resource_json
def resource_class_from_type(resource_type):
def resource_class_from_type(resource_type: str) -> Type[CloudFormationModel]:
if resource_type in NULL_MODELS:
return None
return None # type: ignore[return-value]
if resource_type.startswith("Custom::"):
return CustomModel
if resource_type not in MODEL_MAP:
logger.warning("No Moto CloudFormation support for %s", resource_type)
return None
return None # type: ignore[return-value]
return MODEL_MAP.get(resource_type)
return MODEL_MAP.get(resource_type) # type: ignore[return-value]
def resource_name_property_from_type(resource_type):
def resource_name_property_from_type(resource_type: str) -> Optional[str]:
for model in MODEL_LIST:
if model.cloudformation_type() == resource_type:
return model.cloudformation_name_type()
return NAME_TYPE_MAP.get(resource_type)
def generate_resource_name(resource_type, stack_name, logical_id):
def generate_resource_name(resource_type: str, stack_name: str, logical_id: str) -> str:
if resource_type in [
"AWS::ElasticLoadBalancingV2::TargetGroup",
"AWS::ElasticLoadBalancingV2::LoadBalancer",
@ -277,7 +288,9 @@ def generate_resource_name(resource_type, stack_name, logical_id):
return "{0}-{1}-{2}".format(stack_name, logical_id, random_suffix())
def parse_resource(resource_json, resources_map):
def parse_resource(
resource_json: Dict[str, Any], resources_map: "ResourceMap"
) -> Tuple[Type[CloudFormationModel], Any, str]:
resource_type = resource_json["Type"]
resource_class = resource_class_from_type(resource_type)
if not resource_class:
@ -286,7 +299,7 @@ def parse_resource(resource_json, resources_map):
resource_type
)
)
return None
return None # type: ignore[return-value]
if "Properties" not in resource_json:
resource_json["Properties"] = {}
@ -296,14 +309,18 @@ def parse_resource(resource_json, resources_map):
return resource_class, resource_json, resource_type
def parse_resource_and_generate_name(logical_id, resource_json, resources_map):
resource_tuple = parse_resource(resource_json, resources_map)
def parse_resource_and_generate_name(
logical_id: str, resource_json: Dict[str, Any], resources_map: "ResourceMap"
) -> Tuple[Type[CloudFormationModel], Dict[str, Any], str]:
resource_tuple: Tuple[
Type[CloudFormationModel], Dict[str, Any], str
] = parse_resource(resource_json, resources_map)
if not resource_tuple:
return None
resource_class, resource_json, resource_type = resource_tuple
generated_resource_name = generate_resource_name(
resource_type, resources_map.get("AWS::StackName"), logical_id
resource_type, resources_map["AWS::StackName"], logical_id # type: ignore[arg-type]
)
resource_name_property = resource_name_property_from_type(resource_type)
@ -322,17 +339,21 @@ def parse_resource_and_generate_name(logical_id, resource_json, resources_map):
def parse_and_create_resource(
logical_id, resource_json, resources_map, account_id, region_name
):
logical_id: str,
resource_json: Dict[str, Any],
resources_map: "ResourceMap",
account_id: str,
region_name: str,
) -> Optional[CF_MODEL]:
condition = resource_json.get("Condition")
if condition and not resources_map.lazy_condition_map[condition]:
# If this has a False condition, don't create the resource
return None
resource_type = resource_json["Type"]
resource_tuple = parse_resource_and_generate_name(
logical_id, resource_json, resources_map
)
resource_tuple: Tuple[
Type[CloudFormationModel], Dict[str, Any], str
] = parse_resource_and_generate_name(logical_id, resource_json, resources_map)
if not resource_tuple:
return None
resource_class, resource_json, resource_physical_name = resource_tuple
@ -350,11 +371,15 @@ def parse_and_create_resource(
def parse_and_update_resource(
logical_id, resource_json, resources_map, account_id, region_name
):
resource_tuple = parse_resource_and_generate_name(
logical_id, resource_json, resources_map
)
logical_id: str,
resource_json: Dict[str, Any],
resources_map: "ResourceMap",
account_id: str,
region_name: str,
) -> Optional[CF_MODEL]:
resource_tuple: Optional[
Tuple[Type[CloudFormationModel], Dict[str, Any], str]
] = parse_resource_and_generate_name(logical_id, resource_json, resources_map)
if not resource_tuple:
return None
resource_class, resource_json, new_resource_name = resource_tuple
@ -376,7 +401,9 @@ def parse_and_update_resource(
return None
def parse_and_delete_resource(resource_name, resource_json, account_id, region_name):
def parse_and_delete_resource(
resource_name: str, resource_json: Dict[str, Any], account_id: str, region_name: str
) -> None:
resource_type = resource_json["Type"]
resource_class = resource_class_from_type(resource_type)
if not hasattr(
@ -387,7 +414,7 @@ def parse_and_delete_resource(resource_name, resource_json, account_id, region_n
)
def parse_condition(condition, resources_map, condition_map):
def parse_condition(condition: Union[Dict[str, Any], bool], resources_map: "ResourceMap", condition_map: Dict[str, Any]) -> bool: # type: ignore[return]
if isinstance(condition, bool):
return condition
@ -423,18 +450,21 @@ def parse_condition(condition, resources_map, condition_map):
)
def parse_output(output_logical_id, output_json, resources_map):
def parse_output(
output_logical_id: str, output_json: Any, resources_map: "ResourceMap"
) -> Optional[Output]:
output_json = clean_json(output_json, resources_map)
if "Value" not in output_json:
return None
output = Output()
output.key = output_logical_id
output.value = clean_json(output_json["Value"], resources_map)
output.description = output_json.get("Description")
output = Output(
key=output_logical_id,
value=clean_json(output_json["Value"], resources_map),
description=output_json.get("Description"),
)
return output
class ResourceMap(collections_abc.Mapping):
class ResourceMap(collections_abc.Mapping): # type: ignore[type-arg]
"""
This is a lazy loading map for resources. This allows us to create resources
without needing to create a full dependency tree. Upon creation, each
@ -443,27 +473,29 @@ class ResourceMap(collections_abc.Mapping):
def __init__(
self,
stack_id,
stack_name,
parameters,
tags,
region_name,
account_id,
template,
cross_stack_resources,
stack_id: str,
stack_name: str,
parameters: Dict[str, Any],
tags: Dict[str, Any],
region_name: str,
account_id: str,
template: Dict[str, Any],
cross_stack_resources: Optional[Dict[str, "Export"]],
):
self._template = template
self._resource_json_map = template["Resources"] if template != {} else {}
self._resource_json_map: Dict[str, Any] = (
template["Resources"] if template != {} else {}
)
self._account_id = account_id
self._region_name = region_name
self.input_parameters = parameters
self.tags = copy.deepcopy(tags)
self.resolved_parameters = {}
self.resolved_parameters: Dict[str, Any] = {}
self.cross_stack_resources = cross_stack_resources
self.stack_id = stack_id
# Create the default resources
self._parsed_resources = {
self._parsed_resources: Dict[str, Any] = {
"AWS::AccountId": account_id,
"AWS::Region": self._region_name,
"AWS::StackId": stack_id,
@ -473,7 +505,7 @@ class ResourceMap(collections_abc.Mapping):
"AWS::Partition": "aws",
}
def __getitem__(self, key):
def __getitem__(self, key: str) -> Optional[CF_MODEL]:
resource_logical_id = key
if resource_logical_id in self._parsed_resources:
@ -494,17 +526,17 @@ class ResourceMap(collections_abc.Mapping):
self._parsed_resources[resource_logical_id] = new_resource
return new_resource
def __iter__(self):
def __iter__(self) -> Iterator[str]:
return iter(self.resources)
def __len__(self):
def __len__(self) -> int:
return len(self._resource_json_map)
def __get_resources_in_dependency_order(self):
def __get_resources_in_dependency_order(self) -> List[str]:
resource_map = copy.deepcopy(self._resource_json_map)
resources_in_dependency_order = []
def recursively_get_dependencies(resource):
def recursively_get_dependencies(resource: str) -> None:
resource_info = resource_map[resource]
if "DependsOn" not in resource_info:
@ -529,13 +561,13 @@ class ResourceMap(collections_abc.Mapping):
return resources_in_dependency_order
@property
def resources(self):
def resources(self) -> Iterable[str]:
return self._resource_json_map.keys()
def load_mapping(self):
def load_mapping(self) -> None:
self._parsed_resources.update(self._template.get("Mappings", {}))
def transform_mapping(self):
def transform_mapping(self) -> None:
for v in self._template.get("Mappings", {}).values():
if "Fn::Transform" in v:
name = v["Fn::Transform"]["Name"]
@ -548,7 +580,7 @@ class ResourceMap(collections_abc.Mapping):
)
self._parsed_resources.update(json.loads(key.value))
def parse_ssm_parameter(self, value, value_type):
def parse_ssm_parameter(self, value: str, value_type: str) -> str:
# The Value in SSM parameters is the SSM parameter path
# we need to use ssm_backend to retrieve the
# actual value from parameter store
@ -560,7 +592,7 @@ class ResourceMap(collections_abc.Mapping):
return actual_value.split(",")
return actual_value
def load_parameters(self):
def load_parameters(self) -> None:
parameter_slots = self._template.get("Parameters", {})
for parameter_name, parameter in parameter_slots.items():
# Set the default values.
@ -582,7 +614,7 @@ class ResourceMap(collections_abc.Mapping):
if value_type == "CommaDelimitedList" or value_type.startswith("List"):
value = value.split(",")
def _parse_number_parameter(num_string):
def _parse_number_parameter(num_string: str) -> Union[int, float]:
"""CloudFormation NUMBER types can be an int or float.
Try int first and then fall back to float if that fails
"""
@ -612,7 +644,7 @@ class ResourceMap(collections_abc.Mapping):
self._parsed_resources.update(self.resolved_parameters)
def load_conditions(self):
def load_conditions(self) -> None:
conditions = self._template.get("Conditions", {})
self.lazy_condition_map = LazyDict()
for condition_name, condition in conditions.items():
@ -626,12 +658,12 @@ class ResourceMap(collections_abc.Mapping):
for condition_name in self.lazy_condition_map:
self.lazy_condition_map[condition_name]
def validate_outputs(self):
def validate_outputs(self) -> None:
outputs = self._template.get("Outputs") or {}
for value in outputs.values():
value = value.get("Value", {})
if "Fn::GetAtt" in value:
resource_type = self._resource_json_map.get(value["Fn::GetAtt"][0])[
resource_type = self._resource_json_map.get(value["Fn::GetAtt"][0])[ # type: ignore[index]
"Type"
]
attr = value["Fn::GetAtt"][1]
@ -641,14 +673,14 @@ class ResourceMap(collections_abc.Mapping):
short_type = resource_type[resource_type.rindex(":") + 1 :]
raise UnsupportedAttribute(resource=short_type, attr=attr)
def load(self):
def load(self) -> None:
self.load_mapping()
self.transform_mapping()
self.load_parameters()
self.load_conditions()
self.validate_outputs()
def create(self, template):
def create(self, template: Dict[str, Any]) -> bool:
# Since this is a lazy map, to create every object we just need to
# iterate through self.
# Assumes that self.load() has been called before
@ -656,8 +688,8 @@ class ResourceMap(collections_abc.Mapping):
self._resource_json_map = template["Resources"]
self.tags.update(
{
"aws:cloudformation:stack-name": self.get("AWS::StackName"),
"aws:cloudformation:stack-id": self.get("AWS::StackId"),
"aws:cloudformation:stack-name": self["AWS::StackName"],
"aws:cloudformation:stack-id": self["AWS::StackId"],
}
)
all_resources_ready = True
@ -667,12 +699,14 @@ class ResourceMap(collections_abc.Mapping):
self.tags["aws:cloudformation:logical-id"] = resource
ec2_models.ec2_backends[self._account_id][
self._region_name
].create_tags([instance.physical_resource_id], self.tags)
].create_tags(
[instance.physical_resource_id], self.tags
) # type: ignore[attr-defined]
if instance and not instance.is_created():
all_resources_ready = False
return all_resources_ready
def creation_complete(self):
def creation_complete(self) -> bool:
all_resources_ready = True
for resource in self.__get_resources_in_dependency_order():
instance = self[resource]
@ -680,7 +714,7 @@ class ResourceMap(collections_abc.Mapping):
all_resources_ready = False
return all_resources_ready
def build_resource_diff(self, other_template):
def build_resource_diff(self, other_template: Dict[str, Any]) -> Dict[str, Any]:
old = self._resource_json_map
new = other_template["Resources"]
@ -695,11 +729,17 @@ class ResourceMap(collections_abc.Mapping):
return resource_names_by_action
def build_change_set_actions(self, template, parameters):
def build_change_set_actions(
self, template: Dict[str, Any]
) -> Dict[str, Dict[str, Dict[str, str]]]:
resource_names_by_action = self.build_resource_diff(template)
resources_by_action = {"Add": {}, "Modify": {}, "Remove": {}}
resources_by_action: Dict[str, Dict[str, Dict[str, str]]] = {
"Add": {},
"Modify": {},
"Remove": {},
}
for resource_name in resource_names_by_action["Add"]:
resources_by_action["Add"][resource_name] = {
@ -721,7 +761,9 @@ class ResourceMap(collections_abc.Mapping):
return resources_by_action
def update(self, template, parameters=None):
def update(
self, template: Dict[str, Any], parameters: Optional[Dict[str, Any]] = None
) -> None:
resource_names_by_action = self.build_resource_diff(template)
@ -778,7 +820,7 @@ class ResourceMap(collections_abc.Mapping):
if tries == 5:
raise last_exception
def delete(self):
def delete(self) -> None:
remaining_resources = set(self.resources)
tries = 1
while remaining_resources and tries < 5:
@ -820,8 +862,8 @@ class ResourceMap(collections_abc.Mapping):
raise last_exception
class OutputMap(collections_abc.Mapping):
def __init__(self, resources, template, stack_id):
class OutputMap(collections_abc.Mapping): # type: ignore[type-arg]
def __init__(self, resources: ResourceMap, template: Dict[str, Any], stack_id: str):
self._template = template
self._stack_id = stack_id
@ -831,13 +873,13 @@ class OutputMap(collections_abc.Mapping):
message="[/Outputs] 'null' values are not allowed in templates",
)
self._output_json_map = template.get("Outputs")
self._output_json_map: Dict[str, Any] = template.get("Outputs") # type: ignore[assignment]
# Create the default resources
self._resource_map = resources
self._parsed_outputs = dict()
self._parsed_outputs: Dict[str, Output] = dict()
def __getitem__(self, key):
def __getitem__(self, key: str) -> Optional[Output]:
output_logical_id = key
if output_logical_id in self._parsed_outputs:
@ -851,21 +893,21 @@ class OutputMap(collections_abc.Mapping):
self._parsed_outputs[output_logical_id] = new_output
return new_output
def __iter__(self):
def __iter__(self) -> Iterator[str]:
return iter(self.outputs)
def __len__(self):
return len(self._output_json_map)
def __len__(self) -> int:
return len(self._output_json_map) # type: ignore[arg-type]
@property
def outputs(self):
def outputs(self) -> Iterable[str]:
return self._output_json_map.keys() if self._output_json_map else []
@property
def exports(self):
def exports(self) -> List["Export"]:
exports = []
if self.outputs:
for value in self._output_json_map.values():
for value in self._output_json_map.values(): # type: ignore[union-attr]
if value.get("Export"):
cleaned_name = clean_json(
value["Export"].get("Name"), self._resource_map
@ -876,19 +918,19 @@ class OutputMap(collections_abc.Mapping):
class Export(object):
def __init__(self, exporting_stack_id, name, value):
def __init__(self, exporting_stack_id: str, name: str, value: str):
self._exporting_stack_id = exporting_stack_id
self._name = name
self._value = value
@property
def exporting_stack_id(self):
def exporting_stack_id(self) -> str:
return self._exporting_stack_id
@property
def name(self):
def name(self) -> str:
return self._name
@property
def value(self):
def value(self) -> str:
return self._value

View File

@ -1,5 +1,6 @@
import json
import yaml
from typing import Any, Dict, Tuple, List, Optional, Union
from urllib.parse import urlparse
from yaml.parser import ParserError # pylint:disable=c-extension-no-member
from yaml.scanner import ScannerError # pylint:disable=c-extension-no-member
@ -8,13 +9,13 @@ from moto.core.responses import BaseResponse
from moto.s3.models import s3_backends
from moto.s3.exceptions import S3ClientError
from moto.utilities.aws_headers import amzn_request_id
from .models import cloudformation_backends
from .models import cloudformation_backends, CloudFormationBackend, FakeStack
from .exceptions import ValidationError, MissingParameterError
from .utils import yaml_tag_constructor
def get_template_summary_response_from_template(template_body):
def get_resource_types(template_dict):
def get_template_summary_response_from_template(template_body: str) -> Dict[str, Any]:
def get_resource_types(template_dict: Dict[str, Any]) -> List[Any]:
resources = {}
for key, value in template_dict.items():
if key == "Resources":
@ -38,20 +39,20 @@ def get_template_summary_response_from_template(template_body):
class CloudFormationResponse(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="cloudformation")
@property
def cloudformation_backend(self):
def cloudformation_backend(self) -> CloudFormationBackend:
return cloudformation_backends[self.current_account][self.region]
@classmethod
def cfnresponse(cls, *args, **kwargs): # pylint: disable=unused-argument
def cfnresponse(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc] # pylint: disable=unused-argument
request, full_url, headers = args
full_url += "&Action=ProcessCfnResponse"
return cls.dispatch(request=request, full_url=full_url, headers=headers)
def _get_stack_from_s3_url(self, template_url):
def _get_stack_from_s3_url(self, template_url: str) -> str:
template_url_parts = urlparse(template_url)
if "localhost" in template_url:
bucket_name, key_name = template_url_parts.path.lstrip("/").split("/", 1)
@ -75,7 +76,9 @@ class CloudFormationResponse(BaseResponse):
)
return key.value.decode("utf-8")
def _get_params_from_list(self, parameters_list):
def _get_params_from_list(
self, parameters_list: List[Dict[str, Any]]
) -> Dict[str, Any]:
# Hack dict-comprehension
return dict(
[
@ -84,7 +87,9 @@ class CloudFormationResponse(BaseResponse):
]
)
def _get_param_values(self, parameters_list, existing_params):
def _get_param_values(
self, parameters_list: List[Dict[str, str]], existing_params: Dict[str, str]
) -> Dict[str, Any]:
result = {}
for parameter in parameters_list:
if parameter.keys() >= {"parameter_key", "parameter_value"}:
@ -100,7 +105,7 @@ class CloudFormationResponse(BaseResponse):
raise MissingParameterError(parameter["parameter_key"])
return result
def process_cfn_response(self):
def process_cfn_response(self) -> Tuple[int, Dict[str, int], str]:
status = self._get_param("Status")
if status == "SUCCESS":
stack_id = self._get_param("StackId")
@ -113,7 +118,7 @@ class CloudFormationResponse(BaseResponse):
return 200, {"status": 200}, json.dumps("{}")
def create_stack(self):
def create_stack(self) -> Union[str, Tuple[int, Dict[str, int], str]]:
stack_name = self._get_param("StackName")
stack_body = self._get_param("TemplateBody")
template_url = self._get_param("TemplateURL")
@ -156,14 +161,14 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(CREATE_STACK_RESPONSE_TEMPLATE)
return template.render(stack=stack)
def stack_name_exists(self, new_stack_name):
def stack_name_exists(self, new_stack_name: str) -> bool:
for stack in self.cloudformation_backend.stacks.values():
if stack.name == new_stack_name:
return True
return False
@amzn_request_id
def create_change_set(self):
def create_change_set(self) -> str:
stack_name = self._get_param("StackName")
change_set_name = self._get_param("ChangeSetName")
stack_body = self._get_param("TemplateBody")
@ -209,7 +214,7 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(CREATE_CHANGE_SET_RESPONSE_TEMPLATE)
return template.render(stack_id=stack_id, change_set_id=change_set_id)
def delete_change_set(self):
def delete_change_set(self) -> str:
change_set_name = self._get_param("ChangeSetName")
self.cloudformation_backend.delete_change_set(change_set_name=change_set_name)
@ -221,7 +226,7 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(DELETE_CHANGE_SET_RESPONSE_TEMPLATE)
return template.render()
def describe_change_set(self):
def describe_change_set(self) -> str:
change_set_name = self._get_param("ChangeSetName")
change_set = self.cloudformation_backend.describe_change_set(
change_set_name=change_set_name
@ -230,7 +235,7 @@ class CloudFormationResponse(BaseResponse):
return template.render(change_set=change_set)
@amzn_request_id
def execute_change_set(self):
def execute_change_set(self) -> str:
stack_name = self._get_param("StackName")
change_set_name = self._get_param("ChangeSetName")
self.cloudformation_backend.execute_change_set(
@ -244,10 +249,8 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(EXECUTE_CHANGE_SET_RESPONSE_TEMPLATE)
return template.render()
def describe_stacks(self):
stack_name_or_id = None
if self._get_param("StackName"):
stack_name_or_id = self.querystring.get("StackName")[0]
def describe_stacks(self) -> str:
stack_name_or_id = self._get_param("StackName")
token = self._get_param("NextToken")
stacks = self.cloudformation_backend.describe_stacks(stack_name_or_id)
stack_ids = [stack.stack_id for stack in stacks]
@ -263,14 +266,14 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(DESCRIBE_STACKS_TEMPLATE)
return template.render(stacks=stacks_resp, next_token=next_token)
def describe_stack_resource(self):
def describe_stack_resource(self) -> str:
stack_name = self._get_param("StackName")
stack = self.cloudformation_backend.get_stack(stack_name)
logical_resource_id = self._get_param("LogicalResourceId")
resource = None
for stack_resource in stack.stack_resources:
if stack_resource.logical_resource_id == logical_resource_id:
if stack_resource.logical_resource_id == logical_resource_id: # type: ignore[attr-defined]
resource = stack_resource
break
@ -283,40 +286,40 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(DESCRIBE_STACK_RESOURCE_RESPONSE_TEMPLATE)
return template.render(stack=stack, resource=resource)
def describe_stack_resources(self):
def describe_stack_resources(self) -> str:
stack_name = self._get_param("StackName")
stack = self.cloudformation_backend.get_stack(stack_name)
template = self.response_template(DESCRIBE_STACK_RESOURCES_RESPONSE)
return template.render(stack=stack)
def describe_stack_events(self):
def describe_stack_events(self) -> str:
stack_name = self._get_param("StackName")
stack = self.cloudformation_backend.get_stack(stack_name)
template = self.response_template(DESCRIBE_STACK_EVENTS_RESPONSE)
return template.render(stack=stack)
def list_change_sets(self):
def list_change_sets(self) -> str:
change_sets = self.cloudformation_backend.list_change_sets()
template = self.response_template(LIST_CHANGE_SETS_RESPONSE)
return template.render(change_sets=change_sets)
def list_stacks(self):
def list_stacks(self) -> str:
status_filter = self._get_multi_param("StackStatusFilter.member")
stacks = self.cloudformation_backend.list_stacks(status_filter)
template = self.response_template(LIST_STACKS_RESPONSE)
return template.render(stacks=stacks)
def list_stack_resources(self):
def list_stack_resources(self) -> str:
stack_name_or_id = self._get_param("StackName")
resources = self.cloudformation_backend.list_stack_resources(stack_name_or_id)
template = self.response_template(LIST_STACKS_RESOURCES_RESPONSE)
return template.render(resources=resources)
def get_template(self):
name_or_stack_id = self.querystring.get("StackName")[0]
def get_template(self) -> str:
name_or_stack_id = self.querystring.get("StackName")[0] # type: ignore[index]
stack = self.cloudformation_backend.get_stack(name_or_stack_id)
if self.request_json:
@ -336,7 +339,7 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(GET_TEMPLATE_RESPONSE_TEMPLATE)
return template.render(stack=stack)
def get_template_summary(self):
def get_template_summary(self) -> str:
stack_name = self._get_param("StackName")
template_url = self._get_param("TemplateURL")
stack_body = self._get_param("TemplateBody")
@ -355,7 +358,12 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(GET_TEMPLATE_SUMMARY_TEMPLATE)
return template.render(template_summary=template_summary)
def _validate_different_update(self, incoming_params, stack_body, old_stack):
def _validate_different_update(
self,
incoming_params: Optional[List[Dict[str, Any]]],
stack_body: str,
old_stack: FakeStack,
) -> None:
if incoming_params and stack_body:
new_params = self._get_param_values(incoming_params, old_stack.parameters)
if old_stack.template == stack_body and old_stack.parameters == new_params:
@ -363,7 +371,7 @@ class CloudFormationResponse(BaseResponse):
old_stack.name, message=f"Stack [{old_stack.name}] already exists"
)
def _validate_status(self, stack):
def _validate_status(self, stack: FakeStack) -> None:
if stack.status == "ROLLBACK_COMPLETE":
raise ValidationError(
stack.stack_id,
@ -371,7 +379,7 @@ class CloudFormationResponse(BaseResponse):
"be updated.".format(stack.stack_id),
)
def update_stack(self):
def update_stack(self) -> str:
stack_name = self._get_param("StackName")
role_arn = self._get_param("RoleARN")
template_url = self._get_param("TemplateURL")
@ -386,7 +394,7 @@ class CloudFormationResponse(BaseResponse):
# boto3 is supposed to let you clear the tags by passing an empty value, but the request body doesn't
# end up containing anything we can use to differentiate between passing an empty value versus not
# passing anything. so until that changes, moto won't be able to clear tags, only update them.
tags = dict(
tags: Optional[Dict[str, str]] = dict(
(item["key"], item["value"])
for item in self._get_list_prefix("Tags.member")
)
@ -414,8 +422,8 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(UPDATE_STACK_RESPONSE_TEMPLATE)
return template.render(stack=stack)
def delete_stack(self):
name_or_stack_id = self.querystring.get("StackName")[0]
def delete_stack(self) -> str:
name_or_stack_id = self.querystring.get("StackName")[0] # type: ignore[index]
self.cloudformation_backend.delete_stack(name_or_stack_id)
if self.request_json:
@ -424,13 +432,13 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(DELETE_STACK_RESPONSE_TEMPLATE)
return template.render()
def list_exports(self):
def list_exports(self) -> str:
token = self._get_param("NextToken")
exports, next_token = self.cloudformation_backend.list_exports(token=token)
exports, next_token = self.cloudformation_backend.list_exports(tokenstr=token)
template = self.response_template(LIST_EXPORTS_RESPONSE)
return template.render(exports=exports, next_token=next_token)
def validate_template(self):
def validate_template(self) -> str:
template_body = self._get_param("TemplateBody")
template_url = self._get_param("TemplateURL")
if template_url:
@ -451,7 +459,7 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(VALIDATE_STACK_RESPONSE_TEMPLATE)
return template.render(description=description)
def create_stack_set(self):
def create_stack_set(self) -> str:
stackset_name = self._get_param("StackSetName")
stack_body = self._get_param("TemplateBody")
template_url = self._get_param("TemplateURL")
@ -486,7 +494,7 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(CREATE_STACK_SET_RESPONSE_TEMPLATE)
return template.render(stackset=stackset)
def create_stack_instances(self):
def create_stack_instances(self) -> str:
stackset_name = self._get_param("StackSetName")
accounts = self._get_multi_param("Accounts.member")
regions = self._get_multi_param("Regions.member")
@ -497,13 +505,13 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(CREATE_STACK_INSTANCES_TEMPLATE)
return template.render()
def delete_stack_set(self):
def delete_stack_set(self) -> str:
stackset_name = self._get_param("StackSetName")
self.cloudformation_backend.delete_stack_set(stackset_name)
template = self.response_template(DELETE_STACK_SET_RESPONSE_TEMPLATE)
return template.render()
def delete_stack_instances(self):
def delete_stack_instances(self) -> str:
stackset_name = self._get_param("StackSetName")
accounts = self._get_multi_param("Accounts.member")
regions = self._get_multi_param("Regions.member")
@ -514,7 +522,7 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(DELETE_STACK_INSTANCES_TEMPLATE)
return template.render(operation=operation)
def describe_stack_set(self):
def describe_stack_set(self) -> str:
stackset_name = self._get_param("StackSetName")
stackset = self.cloudformation_backend.get_stack_set(stackset_name)
@ -526,7 +534,7 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(DESCRIBE_STACK_SET_RESPONSE_TEMPLATE)
return template.render(stackset=stackset)
def describe_stack_instance(self):
def describe_stack_instance(self) -> str:
stackset_name = self._get_param("StackSetName")
account = self._get_param("StackInstanceAccount")
region = self._get_param("StackInstanceRegion")
@ -538,24 +546,24 @@ class CloudFormationResponse(BaseResponse):
rendered = template.render(instance=instance)
return rendered
def list_stack_sets(self):
def list_stack_sets(self) -> str:
stacksets = self.cloudformation_backend.stacksets
template = self.response_template(LIST_STACK_SETS_TEMPLATE)
return template.render(stacksets=stacksets)
def list_stack_instances(self):
def list_stack_instances(self) -> str:
stackset_name = self._get_param("StackSetName")
stackset = self.cloudformation_backend.get_stack_set(stackset_name)
template = self.response_template(LIST_STACK_INSTANCES_TEMPLATE)
return template.render(stackset=stackset)
def list_stack_set_operations(self):
def list_stack_set_operations(self) -> str:
stackset_name = self._get_param("StackSetName")
stackset = self.cloudformation_backend.get_stack_set(stackset_name)
template = self.response_template(LIST_STACK_SET_OPERATIONS_RESPONSE_TEMPLATE)
return template.render(stackset=stackset)
def stop_stack_set_operation(self):
def stop_stack_set_operation(self) -> str:
stackset_name = self._get_param("StackSetName")
operation_id = self._get_param("OperationId")
stackset = self.cloudformation_backend.get_stack_set(stackset_name)
@ -563,7 +571,7 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(STOP_STACK_SET_OPERATION_RESPONSE_TEMPLATE)
return template.render()
def describe_stack_set_operation(self):
def describe_stack_set_operation(self) -> str:
stackset_name = self._get_param("StackSetName")
operation_id = self._get_param("OperationId")
stackset = self.cloudformation_backend.get_stack_set(stackset_name)
@ -571,7 +579,7 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(DESCRIBE_STACKSET_OPERATION_RESPONSE_TEMPLATE)
return template.render(stackset=stackset, operation=operation)
def list_stack_set_operation_results(self):
def list_stack_set_operation_results(self) -> str:
stackset_name = self._get_param("StackSetName")
operation_id = self._get_param("OperationId")
stackset = self.cloudformation_backend.get_stack_set(stackset_name)
@ -581,7 +589,7 @@ class CloudFormationResponse(BaseResponse):
)
return template.render(operation=operation)
def update_stack_set(self):
def update_stack_set(self) -> str:
stackset_name = self._get_param("StackSetName")
operation_id = self._get_param("OperationId")
description = self._get_param("Description")
@ -615,24 +623,24 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(UPDATE_STACK_SET_RESPONSE_TEMPLATE)
return template.render(operation=operation)
def update_stack_instances(self):
def update_stack_instances(self) -> str:
stackset_name = self._get_param("StackSetName")
accounts = self._get_multi_param("Accounts.member")
regions = self._get_multi_param("Regions.member")
parameters = self._get_multi_param("ParameterOverrides.member")
operation = self.cloudformation_backend.get_stack_set(
stackset_name
).update_instances(accounts, regions, parameters)
operation = self.cloudformation_backend.update_stack_instances(
stackset_name, accounts, regions, parameters
)
template = self.response_template(UPDATE_STACK_INSTANCES_RESPONSE_TEMPLATE)
return template.render(operation=operation)
def get_stack_policy(self):
def get_stack_policy(self) -> str:
stack_name = self._get_param("StackName")
policy = self.cloudformation_backend.get_stack_policy(stack_name)
template = self.response_template(GET_STACK_POLICY_RESPONSE)
return template.render(policy=policy)
def set_stack_policy(self):
def set_stack_policy(self) -> str:
stack_name = self._get_param("StackName")
policy_url = self._get_param("StackPolicyURL")
policy_body = self._get_param("StackPolicyBody")

View File

@ -2,37 +2,40 @@ import yaml
import os
import string
from moto.moto_api._internal import mock_random as random
from typing import Any, List
def generate_stack_id(stack_name, region, account):
def generate_stack_id(stack_name: str, region: str, account: str) -> str:
random_id = random.uuid4()
return f"arn:aws:cloudformation:{region}:{account}:stack/{stack_name}/{random_id}"
def generate_changeset_id(changeset_name, region_name, account_id):
def generate_changeset_id(
changeset_name: str, region_name: str, account_id: str
) -> str:
random_id = random.uuid4()
return f"arn:aws:cloudformation:{region_name}:{account_id}:changeSet/{changeset_name}/{random_id}"
def generate_stackset_id(stackset_name):
def generate_stackset_id(stackset_name: str) -> str:
random_id = random.uuid4()
return "{}:{}".format(stackset_name, random_id)
def generate_stackset_arn(stackset_id, region_name, account_id):
def generate_stackset_arn(stackset_id: str, region_name: str, account_id: str) -> str:
return f"arn:aws:cloudformation:{region_name}:{account_id}:stackset/{stackset_id}"
def random_suffix():
def random_suffix() -> str:
size = 12
chars = list(range(10)) + list(string.ascii_uppercase)
return "".join(str(random.choice(chars)) for x in range(size))
def yaml_tag_constructor(loader, tag, node):
def yaml_tag_constructor(loader: Any, tag: Any, node: Any) -> Any:
"""convert shorthand intrinsic function to full name"""
def _f(loader, tag, node):
def _f(loader: Any, tag: Any, node: Any) -> Any:
if tag == "!GetAtt":
if isinstance(node.value, list):
return node.value
@ -50,7 +53,7 @@ def yaml_tag_constructor(loader, tag, node):
return {key: _f(loader, tag, node)}
def validate_template_cfn_lint(template):
def validate_template_cfn_lint(template: str) -> List[Any]:
# Importing cfnlint adds a significant overhead, so we keep it local
from cfnlint import decode, core

View File

@ -1,4 +1,5 @@
from abc import abstractmethod
from typing import Any, Dict
from .base_backend import InstanceTrackerMeta
@ -14,22 +15,22 @@ class BaseModel(metaclass=InstanceTrackerMeta):
class CloudFormationModel(BaseModel):
@staticmethod
@abstractmethod
def cloudformation_name_type():
def cloudformation_name_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-name.html
# This must be implemented as a staticmethod with no parameters
# Return None for resources that do not have a name property
pass
# Return "" for resources that do not have a name property
return ""
@staticmethod
@abstractmethod
def cloudformation_type():
def cloudformation_type() -> str:
# This must be implemented as a staticmethod with no parameters
# See for example https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-dynamodb-table.html
return "AWS::SERVICE::RESOURCE"
@classmethod
@abstractmethod
def has_cfn_attr(cls, attr): # pylint: disable=unused-argument
def has_cfn_attr(cls, attr: str) -> bool: # pylint: disable=unused-argument
# Used for validation
# If a template creates an Output for an attribute that does not exist, an error should be thrown
return True
@ -37,44 +38,53 @@ class CloudFormationModel(BaseModel):
@classmethod
@abstractmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
):
cls,
resource_name: str,
cloudformation_json: Dict[str, Any],
account_id: str,
region_name: str,
**kwargs: Any
) -> Any:
# This must be implemented as a classmethod with parameters:
# cls, resource_name, cloudformation_json, account_id, region_name
# Extract the resource parameters from the cloudformation json
# and return an instance of the resource class
pass
...
@classmethod
@abstractmethod
def update_from_cloudformation_json(
cls,
original_resource,
new_resource_name,
cloudformation_json,
account_id,
region_name,
):
original_resource: Any,
new_resource_name: str,
cloudformation_json: Dict[str, Any],
account_id: str,
region_name: str,
) -> Any:
# This must be implemented as a classmethod with parameters:
# cls, original_resource, new_resource_name, cloudformation_json, account_id, region_name
# Extract the resource parameters from the cloudformation json,
# delete the old resource and return the new one. Optionally inspect
# the change in parameters and no-op when nothing has changed.
pass
...
@classmethod
@abstractmethod
def delete_from_cloudformation_json(
cls, resource_name, cloudformation_json, account_id, region_name
):
cls,
resource_name: str,
cloudformation_json: Dict[str, Any],
account_id: str,
region_name: str,
) -> None:
# This must be implemented as a classmethod with parameters:
# cls, resource_name, cloudformation_json, account_id, region_name
# Extract the resource parameters from the cloudformation json
# and delete the resource. Do not include a return statement.
pass
...
@abstractmethod
def is_created(self):
def is_created(self) -> bool:
# Verify whether the resource was created successfully
# Assume True after initialization
# Custom resources may need time after init before they are created successfully

View File

@ -218,7 +218,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
self.service_name = service_name
@classmethod
def dispatch(cls, *args, **kwargs):
def dispatch(cls, *args: Any, **kwargs: Any) -> Any:
return cls()._dispatch(*args, **kwargs)
def setup_class(

View File

@ -152,16 +152,18 @@ def iso_8601_datetime_with_milliseconds(value: datetime) -> str:
# Even Python does not support nanoseconds, other languages like Go do (needed for Terraform)
def iso_8601_datetime_with_nanoseconds(value):
def iso_8601_datetime_with_nanoseconds(value: datetime.datetime) -> str:
return value.strftime("%Y-%m-%dT%H:%M:%S.%f000Z")
def iso_8601_datetime_without_milliseconds(value):
return None if value is None else value.strftime("%Y-%m-%dT%H:%M:%SZ")
def iso_8601_datetime_without_milliseconds(value: datetime.datetime) -> Optional[str]:
return value.strftime("%Y-%m-%dT%H:%M:%SZ") if value else None
def iso_8601_datetime_without_milliseconds_s3(value):
return None if value is None else value.strftime("%Y-%m-%dT%H:%M:%S.000Z")
def iso_8601_datetime_without_milliseconds_s3(
value: datetime.datetime,
) -> Optional[str]:
return value.strftime("%Y-%m-%dT%H:%M:%S.000Z") if value else None
RFC1123 = "%a, %d %b %Y %H:%M:%S GMT"

View File

@ -3,6 +3,7 @@ import logging
import re
from urllib.parse import urlparse, unquote, quote
from requests.structures import CaseInsensitiveDict
from typing import Union, Tuple
import sys
from moto.settings import S3_IGNORE_SUBDOMAIN_BUCKETNAME
@ -44,7 +45,7 @@ def bucket_name_from_url(url):
# 'owi-common-cf', 'snippets/test.json' = bucket_and_name_from_url('s3://owi-common-cf/snippets/test.json')
def bucket_and_name_from_url(url):
def bucket_and_name_from_url(url: str) -> Union[Tuple[str, str], Tuple[None, None]]:
prefix = "s3://"
if url.startswith(prefix):
bucket_name = url[len(prefix) : url.index("/", len(prefix))]

View File

@ -18,7 +18,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/ce
files= moto/a*,moto/b*,moto/ce,moto/cloudformation
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract