From 03f551870325fdce1c789bc43c286486cdddb13d Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Wed, 26 Oct 2022 21:36:02 +0000 Subject: [PATCH] TechDebt: MyPy CloudFormation (#5603) --- moto/awslambda/models.py | 8 - moto/cloudformation/custom_model.py | 30 +- moto/cloudformation/exceptions.py | 9 +- moto/cloudformation/models.py | 477 +++++++++++++++------------- moto/cloudformation/parsing.py | 280 +++++++++------- moto/cloudformation/responses.py | 124 ++++---- moto/cloudformation/utils.py | 19 +- moto/core/common_models.py | 48 +-- moto/core/responses.py | 2 +- moto/core/utils.py | 12 +- moto/s3/utils.py | 3 +- setup.cfg | 2 +- 12 files changed, 559 insertions(+), 455 deletions(-) diff --git a/moto/awslambda/models.py b/moto/awslambda/models.py index 40341c870..6289a1c6c 100644 --- a/moto/awslambda/models.py +++ b/moto/awslambda/models.py @@ -1075,10 +1075,6 @@ class EventSourceMapping(CloudFormationModel): lambda_backend = lambda_backends[account_id][region_name] lambda_backend.delete_event_source_mapping(self.uuid) - @staticmethod - def cloudformation_name_type() -> None: - return None - @staticmethod def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html @@ -1142,10 +1138,6 @@ class LambdaVersion(CloudFormationModel): def __repr__(self) -> str: return str(self.logical_resource_id) # type: ignore[attr-defined] - @staticmethod - def cloudformation_name_type() -> None: - return None - @staticmethod def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-version.html diff --git a/moto/cloudformation/custom_model.py b/moto/cloudformation/custom_model.py index b101bb7d2..d4b5c32a2 100644 --- a/moto/cloudformation/custom_model.py +++ b/moto/cloudformation/custom_model.py @@ -1,5 +1,6 @@ import json import threading +from typing import Any, Dict from moto import settings from moto.core import CloudFormationModel @@ -8,33 +9,40 @@ from moto.moto_api._internal import mock_random class CustomModel(CloudFormationModel): - def __init__(self, region_name, request_id, logical_id, resource_name): + def __init__( + self, region_name: str, request_id: str, logical_id: str, resource_name: str + ): self.region_name = region_name self.request_id = request_id self.logical_id = logical_id self.resource_name = resource_name - self.data = dict() + self.data: Dict[str, Any] = dict() self._finished = False - def set_data(self, data): + def set_data(self, data: Dict[str, Any]) -> None: self.data = data self._finished = True - def is_created(self): + def is_created(self) -> bool: return self._finished @property - def physical_resource_id(self): + def physical_resource_id(self) -> str: return self.resource_name @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: return "?" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Dict[str, Any], + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "CustomModel": logical_id = kwargs["LogicalId"] stack_id = kwargs["StackId"] resource_type = kwargs["ResourceType"] @@ -85,11 +93,11 @@ class CustomModel(CloudFormationModel): return custom_resource @classmethod - def has_cfn_attr(cls, attr): # pylint: disable=unused-argument + def has_cfn_attr(cls, attr: str) -> bool: # pylint: disable=unused-argument # We don't know which attributes are supported for third-party resources return True - def get_cfn_attribute(self, attribute_name): + def get_cfn_attribute(self, attribute_name: str) -> Any: if attribute_name in self.data: return self.data[attribute_name] return None diff --git a/moto/cloudformation/exceptions.py b/moto/cloudformation/exceptions.py index ba4337bdb..08b9c26a1 100644 --- a/moto/cloudformation/exceptions.py +++ b/moto/cloudformation/exceptions.py @@ -1,5 +1,6 @@ from moto.core.exceptions import RESTError from jinja2 import Template +from typing import Optional class UnformattedGetAttTemplateException(Exception): @@ -10,7 +11,7 @@ class UnformattedGetAttTemplateException(Exception): class ValidationError(RESTError): - def __init__(self, name_or_id=None, message=None): + def __init__(self, name_or_id: Optional[str] = None, message: Optional[str] = None): if message is None: message = "Stack with id {0} does not exist".format(name_or_id) @@ -20,7 +21,7 @@ class ValidationError(RESTError): class MissingParameterError(RESTError): - def __init__(self, parameter_name): + def __init__(self, parameter_name: str): template = Template(ERROR_RESPONSE) message = "Missing parameter {0}".format(parameter_name) super().__init__(error_type="ValidationError", message=message) @@ -30,7 +31,7 @@ class MissingParameterError(RESTError): class ExportNotFound(RESTError): """Exception to raise if a template tries to import a non-existent export""" - def __init__(self, export_name): + def __init__(self, export_name: str): template = Template(ERROR_RESPONSE) message = "No export named {0} found.".format(export_name) super().__init__(error_type="ExportNotFound", message=message) @@ -38,7 +39,7 @@ class ExportNotFound(RESTError): class UnsupportedAttribute(ValidationError): - def __init__(self, resource, attr): + def __init__(self, resource: str, attr: str): template = Template(ERROR_RESPONSE) super().__init__() self.description = template.render( diff --git a/moto/cloudformation/models.py b/moto/cloudformation/models.py index 9a65d789d..010ae36b3 100644 --- a/moto/cloudformation/models.py +++ b/moto/cloudformation/models.py @@ -3,10 +3,11 @@ import json import yaml from collections import OrderedDict +from typing import Any, Dict, List, Optional, Iterable, Tuple, Union, Type from yaml.parser import ParserError # pylint:disable=c-extension-no-member from yaml.scanner import ScannerError # pylint:disable=c-extension-no-member -from moto.core import BaseBackend, BaseModel +from moto.core import BaseBackend, BaseModel, CloudFormationModel from moto.core.utils import ( iso_8601_datetime_with_milliseconds, iso_8601_datetime_without_milliseconds, @@ -15,7 +16,8 @@ from moto.core.utils import ( from moto.moto_api._internal import mock_random from moto.sns.models import sns_backends -from .parsing import ResourceMap, OutputMap +from .custom_model import CustomModel +from .parsing import ResourceMap, Output, OutputMap, Export from .utils import ( generate_changeset_id, generate_stack_id, @@ -30,17 +32,16 @@ from .exceptions import ValidationError class FakeStackSet(BaseModel): def __init__( self, - stackset_id, - account_id, - name, - template, - region="us-east-1", - status="ACTIVE", - description=None, - parameters=None, - tags=None, - admin_role="AWSCloudFormationStackSetAdministrationRole", - execution_role="AWSCloudFormationStackSetExecutionRole", + stackset_id: str, + account_id: str, + name: str, + template: str, + region: str, + description: Optional[str], + parameters: Dict[str, str], + tags: Optional[Dict[str, str]] = None, + admin_role: str = "AWSCloudFormationStackSetAdministrationRole", + execution_role: str = "AWSCloudFormationStackSetExecutionRole", ): self.id = stackset_id self.arn = generate_stackset_arn(stackset_id, region, account_id) @@ -52,18 +53,23 @@ class FakeStackSet(BaseModel): self.admin_role = admin_role self.admin_role_arn = f"arn:aws:iam::{account_id}:role/{self.admin_role}" self.execution_role = execution_role - self.status = status + self.status = "ACTIVE" self.instances = FakeStackInstances(parameters, self.id, self.name) self.stack_instances = self.instances.stack_instances - self.operations = [] + self.operations: List[Dict[str, Any]] = [] def _create_operation( - self, operation_id, action, status, accounts=None, regions=None - ): + self, + operation_id: str, + action: str, + status: str, + accounts: Optional[List[str]] = None, + regions: Optional[List[str]] = None, + ) -> Dict[str, Any]: accounts = accounts or [] regions = regions or [] operation = { - "OperationId": str(operation_id), + "OperationId": operation_id, "Action": action, "Status": status, "CreationTimestamp": datetime.now(), @@ -76,41 +82,39 @@ class FakeStackSet(BaseModel): self.operations += [operation] return operation - def get_operation(self, operation_id): + def get_operation(self, operation_id: str) -> Dict[str, Any]: for operation in self.operations: if operation_id == operation["OperationId"]: return operation raise ValidationError(operation_id) - def update_operation(self, operation_id, status): + def update_operation(self, operation_id: str, status: str) -> str: operation = self.get_operation(operation_id) operation["Status"] = status return operation_id - def delete(self): + def delete(self) -> None: self.status = "DELETED" def update( self, - template, - description, - parameters, - tags, - admin_role, - execution_role, - accounts, - regions, - operation_id=None, - ): - if not operation_id: - operation_id = mock_random.uuid4() + template: str, + description: str, + parameters: Dict[str, str], + tags: Dict[str, str], + admin_role: str, + execution_role: str, + accounts: List[str], + regions: List[str], + operation_id: str, + ) -> Dict[str, Any]: - self.template = template if template else self.template + self.template = template or self.template self.description = description if description is not None else self.description - self.parameters = parameters if parameters else self.parameters - self.tags = tags if tags else self.tags - self.admin_role = admin_role if admin_role else self.admin_role - self.execution_role = execution_role if execution_role else self.execution_role + self.parameters = parameters or self.parameters + self.tags = tags or self.tags + self.admin_role = admin_role or self.admin_role + self.execution_role = execution_role or self.execution_role if accounts and regions: self.update_instances(accounts, regions, self.parameters) @@ -124,11 +128,12 @@ class FakeStackSet(BaseModel): ) return operation - def create_stack_instances(self, accounts, regions, parameters, operation_id=None): - if not operation_id: - operation_id = mock_random.uuid4() + def create_stack_instances( + self, accounts: List[str], regions: List[str], parameters: List[Dict[str, Any]] + ) -> None: + operation_id = str(mock_random.uuid4()) if not parameters: - parameters = self.parameters + parameters = self.parameters # type: ignore[assignment] self.instances.create_instances(accounts, regions, parameters) self._create_operation( @@ -139,24 +144,23 @@ class FakeStackSet(BaseModel): regions=regions, ) - def delete_stack_instances(self, accounts, regions, operation_id=None): - if not operation_id: - operation_id = mock_random.uuid4() + def delete_stack_instances(self, accounts: List[str], regions: List[str]) -> None: + operation_id = str(mock_random.uuid4()) self.instances.delete(accounts, regions) - operation = self._create_operation( + self._create_operation( operation_id=operation_id, action="DELETE", status="SUCCEEDED", accounts=accounts, regions=regions, ) - return operation - def update_instances(self, accounts, regions, parameters, operation_id=None): - if not operation_id: - operation_id = mock_random.uuid4() + def update_instances( + self, accounts: List[str], regions: List[str], parameters: Dict[str, str] + ) -> Dict[str, Any]: + operation_id = str(mock_random.uuid4()) self.instances.update(accounts, regions, parameters) operation = self._create_operation( @@ -170,14 +174,21 @@ class FakeStackSet(BaseModel): class FakeStackInstances(BaseModel): - def __init__(self, parameters, stackset_id, stackset_name): - self.parameters = parameters if parameters else {} + def __init__( + self, parameters: Dict[str, str], stackset_id: str, stackset_name: str + ): + self.parameters = parameters or {} self.stackset_id = stackset_id self.stack_name = "StackSet-{}".format(stackset_id) self.stackset_name = stackset_name - self.stack_instances = [] + self.stack_instances: List[Dict[str, Any]] = [] - def create_instances(self, accounts, regions, parameters): + def create_instances( + self, + accounts: List[str], + regions: List[str], + parameters: Optional[List[Dict[str, Any]]], + ) -> List[Dict[str, Any]]: new_instances = [] for region in regions: for account in accounts: @@ -187,13 +198,18 @@ class FakeStackInstances(BaseModel): "Region": region, "Account": account, "Status": "CURRENT", - "ParameterOverrides": parameters if parameters else [], + "ParameterOverrides": parameters or [], } new_instances.append(instance) self.stack_instances += new_instances return new_instances - def update(self, accounts, regions, parameters): + def update( + self, + accounts: List[str], + regions: List[str], + parameters: Optional[Dict[str, str]], + ) -> Any: for account in accounts: for region in regions: instance = self.get_instance(account, region) @@ -202,12 +218,12 @@ class FakeStackInstances(BaseModel): else: instance["ParameterOverrides"] = [] - def delete(self, accounts, regions): + def delete(self, accounts: List[str], regions: List[str]) -> None: for i, instance in enumerate(self.stack_instances): if instance["Region"] in regions and instance["Account"] in accounts: self.stack_instances.pop(i) - def get_instance(self, account, region): + def get_instance(self, account: str, region: str) -> Dict[str, Any]: # type: ignore[return] for i, instance in enumerate(self.stack_instances): if instance["Region"] == region and instance["Account"] == account: return self.stack_instances[i] @@ -216,16 +232,16 @@ class FakeStackInstances(BaseModel): class FakeStack(BaseModel): def __init__( self, - stack_id, - name, - template, - parameters, - account_id, - region_name, - notification_arns=None, - tags=None, - role_arn=None, - cross_stack_resources=None, + stack_id: str, + name: str, + template: Union[str, Dict[str, Any]], + parameters: Dict[str, str], + account_id: str, + region_name: str, + notification_arns: Optional[List[str]] = None, + tags: Optional[Dict[str, str]] = None, + role_arn: Optional[str] = None, + cross_stack_resources: Optional[Dict[str, Export]] = None, ): self.stack_id = stack_id self.name = name @@ -235,26 +251,26 @@ class FakeStack(BaseModel): self._parse_template() self.description = self.template_dict.get("Description") else: - self.template_dict = {} + self.template_dict: Dict[str, Any] = {} self.description = None self.parameters = parameters self.region_name = region_name self.notification_arns = notification_arns if notification_arns else [] self.role_arn = role_arn self.tags = tags if tags else {} - self.events = [] + self.events: List[FakeEvent] = [] self.policy = "" - self.cross_stack_resources = cross_stack_resources or {} + self.cross_stack_resources: Dict[str, Export] = cross_stack_resources or {} self.resource_map = self._create_resource_map() - self.custom_resources = dict() + self.custom_resources: Dict[str, CustomModel] = dict() self.output_map = self._create_output_map() self.creation_time = datetime.utcnow() self.status = "CREATE_PENDING" - def has_template(self, other_template): + def has_template(self, other_template: str) -> bool: our_template = ( self.template if isinstance(self.template, dict) @@ -262,10 +278,10 @@ class FakeStack(BaseModel): ) return our_template == json.loads(other_template) - def has_parameters(self, other_parameters): + def has_parameters(self, other_parameters: Dict[str, Any]) -> bool: return self.parameters == other_parameters - def _create_resource_map(self): + def _create_resource_map(self) -> ResourceMap: resource_map = ResourceMap( self.stack_id, self.name, @@ -279,16 +295,19 @@ class FakeStack(BaseModel): resource_map.load() return resource_map - def _create_output_map(self): + def _create_output_map(self) -> OutputMap: return OutputMap(self.resource_map, self.template_dict, self.stack_id) @property - def creation_time_iso_8601(self): - return iso_8601_datetime_without_milliseconds(self.creation_time) + def creation_time_iso_8601(self) -> str: + return iso_8601_datetime_without_milliseconds(self.creation_time) # type: ignore[return-value] def _add_stack_event( - self, resource_status, resource_status_reason=None, resource_properties=None - ): + self, + resource_status: str, + resource_status_reason: Optional[str] = None, + resource_properties: Optional[str] = None, + ) -> None: event = FakeEvent( stack_id=self.stack_id, @@ -304,58 +323,36 @@ class FakeStack(BaseModel): event.sendToSns(self.account_id, self.region_name, self.notification_arns) self.events.append(event) - def _add_resource_event( - self, - logical_resource_id, - resource_status, - resource_status_reason=None, - resource_properties=None, - ): - # not used yet... feel free to help yourself - resource = self.resource_map[logical_resource_id] - self.events.append( - FakeEvent( - stack_id=self.stack_id, - stack_name=self.name, - logical_resource_id=logical_resource_id, - physical_resource_id=resource.physical_resource_id, - resource_type=resource.type, - resource_status=resource_status, - resource_status_reason=resource_status_reason, - resource_properties=resource_properties, - ) - ) - - def _parse_template(self): + def _parse_template(self) -> None: yaml.add_multi_constructor("", yaml_tag_constructor) try: - self.template_dict = yaml.load(self.template, Loader=yaml.Loader) + self.template_dict = yaml.load(self.template, Loader=yaml.Loader) # type: ignore[arg-type] except (ParserError, ScannerError): - self.template_dict = json.loads(self.template) + self.template_dict = json.loads(self.template) # type: ignore[arg-type] @property - def stack_parameters(self): + def stack_parameters(self) -> Dict[str, Any]: # type: ignore[misc] return self.resource_map.resolved_parameters @property - def stack_resources(self): + def stack_resources(self) -> Iterable[Type[CloudFormationModel]]: return self.resource_map.values() @property - def stack_outputs(self): + def stack_outputs(self) -> List[Output]: return [v for v in self.output_map.values() if v] @property - def exports(self): + def exports(self) -> List[Export]: return self.output_map.exports - def add_custom_resource(self, custom_resource): + def add_custom_resource(self, custom_resource: CustomModel) -> None: self.custom_resources[custom_resource.logical_id] = custom_resource - def get_custom_resource(self, custom_resource): + def get_custom_resource(self, custom_resource: str) -> CustomModel: return self.custom_resources[custom_resource] - def create_resources(self): + def create_resources(self) -> None: self.status = "CREATE_IN_PROGRESS" all_resources_ready = self.resource_map.create(self.template_dict) # Set the description of the stack @@ -363,15 +360,21 @@ class FakeStack(BaseModel): if all_resources_ready: self.mark_creation_complete() - def verify_readiness(self): + def verify_readiness(self) -> None: if self.resource_map.creation_complete(): self.mark_creation_complete() - def mark_creation_complete(self): + def mark_creation_complete(self) -> None: self.status = "CREATE_COMPLETE" self._add_stack_event("CREATE_COMPLETE") - def update(self, template, role_arn=None, parameters=None, tags=None): + def update( + self, + template: str, + role_arn: Optional[str] = None, + parameters: Optional[Dict[str, Any]] = None, + tags: Optional[Dict[str, str]] = None, + ) -> None: self._add_stack_event( "UPDATE_IN_PROGRESS", resource_status_reason="User Initiated" ) @@ -387,7 +390,7 @@ class FakeStack(BaseModel): self.tags = tags # TODO: update tags in the resource map - def delete(self): + def delete(self) -> None: self._add_stack_event( "DELETE_IN_PROGRESS", resource_status_reason="User Initiated" ) @@ -397,7 +400,7 @@ class FakeStack(BaseModel): class FakeChange(BaseModel): - def __init__(self, action, logical_resource_id, resource_type): + def __init__(self, action: str, logical_resource_id: str, resource_type: str): self.action = action self.logical_resource_id = logical_resource_id self.resource_type = resource_type @@ -406,16 +409,16 @@ class FakeChange(BaseModel): class FakeChangeSet(BaseModel): def __init__( self, - change_set_type, - change_set_id, - change_set_name, - stack, - template, - parameters, - description, - notification_arns=None, - tags=None, - role_arn=None, + change_set_type: str, + change_set_id: str, + change_set_name: str, + stack: FakeStack, + template: str, + parameters: Dict[str, str], + description: str, + notification_arns: Optional[List[str]] = None, + tags: Optional[Dict[str, str]] = None, + role_arn: Optional[str] = None, ): self.change_set_type = change_set_type self.change_set_id = change_set_id @@ -435,7 +438,11 @@ class FakeChangeSet(BaseModel): self.creation_time = datetime.utcnow() self.changes = self.diff() - def _parse_template(self): + self.status: Optional[str] = None + self.execution_status: Optional[str] = None + self.status_reason: Optional[str] = None + + def _parse_template(self) -> None: yaml.add_multi_constructor("", yaml_tag_constructor) try: self.template_dict = yaml.load(self.template, Loader=yaml.Loader) @@ -443,13 +450,13 @@ class FakeChangeSet(BaseModel): self.template_dict = json.loads(self.template) @property - def creation_time_iso_8601(self): - return iso_8601_datetime_without_milliseconds(self.creation_time) + def creation_time_iso_8601(self) -> str: + return iso_8601_datetime_without_milliseconds(self.creation_time) # type: ignore[return-value] - def diff(self): + def diff(self) -> List[FakeChange]: changes = [] resources_by_action = self.stack.resource_map.build_change_set_actions( - self.template_dict, self.parameters + self.template_dict ) for action, resources in resources_by_action.items(): for resource_name, resource in resources.items(): @@ -462,22 +469,21 @@ class FakeChangeSet(BaseModel): ) return changes - def apply(self): + def apply(self) -> None: self.stack.resource_map.update(self.template_dict, self.parameters) class FakeEvent(BaseModel): def __init__( self, - stack_id, - stack_name, - logical_resource_id, - physical_resource_id, - resource_type, - resource_status, - resource_status_reason=None, - resource_properties=None, - client_request_token=None, + stack_id: str, + stack_name: str, + logical_resource_id: str, + physical_resource_id: str, + resource_type: str, + resource_status: str, + resource_status_reason: Optional[str], + resource_properties: Optional[str], ): self.stack_id = stack_id self.stack_name = stack_name @@ -489,9 +495,11 @@ class FakeEvent(BaseModel): self.resource_properties = resource_properties self.timestamp = datetime.utcnow() self.event_id = mock_random.uuid4() - self.client_request_token = client_request_token + self.client_request_token = None - def sendToSns(self, account_id, region, sns_topic_arns): + def sendToSns( + self, account_id: str, region: str, sns_topic_arns: List[str] + ) -> None: message = """StackId='{stack_id}' Timestamp='{timestamp}' EventId='{event_id}' @@ -522,7 +530,9 @@ ClientRequestToken='{client_request_token}'""".format( ) -def filter_stacks(all_stacks, status_filter): +def filter_stacks( + all_stacks: List[FakeStack], status_filter: Optional[List[str]] +) -> List[FakeStack]: filtered_stacks = [] if not status_filter: return all_stacks @@ -539,22 +549,28 @@ class CloudFormationBackend(BaseBackend): This means it has to run inside a Docker-container, or be started using `moto_server -h 0.0.0.0`. """ - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.stacks = OrderedDict() - self.stacksets = OrderedDict() - self.deleted_stacks = {} - self.exports = OrderedDict() - self.change_sets = OrderedDict() + self.stacks: Dict[str, FakeStack] = OrderedDict() + self.stacksets: Dict[str, FakeStackSet] = OrderedDict() + self.deleted_stacks: Dict[str, FakeStack] = {} + self.exports: Dict[str, Export] = OrderedDict() + self.change_sets: Dict[str, FakeChangeSet] = OrderedDict() @staticmethod - def default_vpc_endpoint_service(service_region, zones): + def default_vpc_endpoint_service( + service_region: str, zones: List[str] + ) -> List[Dict[str, str]]: """Default VPC endpoint service.""" return BaseBackend.default_vpc_endpoint_service_factory( service_region, zones, "cloudformation", policy_supported=False ) - def _resolve_update_parameters(self, instance, incoming_params): + def _resolve_update_parameters( + self, + instance: Union[FakeStack, FakeStackSet], + incoming_params: List[Dict[str, str]], + ) -> Dict[str, str]: parameters = dict( [ (parameter["parameter_key"], parameter["parameter_value"]) @@ -578,30 +594,26 @@ class CloudFormationBackend(BaseBackend): def create_stack_set( self, - name, - template, - parameters, - tags=None, - description=None, - admin_role=None, - execution_role=None, - ): + name: str, + template: str, + parameters: Dict[str, str], + tags: Dict[str, str], + ) -> FakeStackSet: stackset_id = generate_stackset_id(name) new_stackset = FakeStackSet( stackset_id=stackset_id, account_id=self.account_id, name=name, + region=self.region_name, template=template, parameters=parameters, - description=description, + description=None, tags=tags, - admin_role=admin_role, - execution_role=execution_role, ) self.stacksets[stackset_id] = new_stackset return new_stackset - def get_stack_set(self, name): + def get_stack_set(self, name: str) -> FakeStackSet: stacksets = self.stacksets.keys() if name in stacksets: return self.stacksets[name] @@ -610,7 +622,7 @@ class CloudFormationBackend(BaseBackend): return self.stacksets[stackset] raise ValidationError(name) - def delete_stack_set(self, name): + def delete_stack_set(self, name: str) -> None: stacksets = self.stacksets.keys() if name in stacksets: self.stacksets[name].delete() @@ -619,31 +631,44 @@ class CloudFormationBackend(BaseBackend): self.stacksets[stackset].delete() def create_stack_instances( - self, stackset_name, accounts, regions, parameters, operation_id=None - ): + self, + stackset_name: str, + accounts: List[str], + regions: List[str], + parameters: List[Dict[str, str]], + ) -> FakeStackSet: stackset = self.get_stack_set(stackset_name) stackset.create_stack_instances( accounts=accounts, regions=regions, parameters=parameters, - operation_id=operation_id, ) return stackset + def update_stack_instances( + self, + stackset_name: str, + accounts: List[str], + regions: List[str], + parameters: Dict[str, str], + ) -> Dict[str, Any]: + stack_set = self.get_stack_set(stackset_name) + return stack_set.update_instances(accounts, regions, parameters) + def update_stack_set( self, - stackset_name, - template=None, - description=None, - parameters=None, - tags=None, - admin_role=None, - execution_role=None, - accounts=None, - regions=None, - operation_id=None, - ): + stackset_name: str, + template: str, + description: str, + parameters: List[Dict[str, str]], + tags: Dict[str, str], + admin_role: str, + execution_role: str, + accounts: List[str], + regions: List[str], + operation_id: str, + ) -> Dict[str, Any]: stackset = self.get_stack_set(stackset_name) resolved_parameters = self._resolve_update_parameters( instance=stackset, incoming_params=parameters @@ -662,21 +687,21 @@ class CloudFormationBackend(BaseBackend): return update def delete_stack_instances( - self, stackset_name, accounts, regions, operation_id=None - ): + self, stackset_name: str, accounts: List[str], regions: List[str] + ) -> FakeStackSet: stackset = self.get_stack_set(stackset_name) - stackset.delete_stack_instances(accounts, regions, operation_id) + stackset.delete_stack_instances(accounts, regions) return stackset def create_stack( self, - name, - template, - parameters, - notification_arns=None, - tags=None, - role_arn=None, - ): + name: str, + template: str, + parameters: Dict[str, Any], + notification_arns: Optional[List[str]] = None, + tags: Optional[Dict[str, str]] = None, + role_arn: Optional[str] = None, + ) -> FakeStack: stack_id = generate_stack_id(name, self.region_name, self.account_id) new_stack = FakeStack( stack_id=stack_id, @@ -702,16 +727,16 @@ class CloudFormationBackend(BaseBackend): def create_change_set( self, - stack_name, - change_set_name, - template, - parameters, - description, - change_set_type, - notification_arns=None, - tags=None, - role_arn=None, - ): + stack_name: str, + change_set_name: str, + template: str, + parameters: Dict[str, str], + description: str, + change_set_type: str, + notification_arns: Optional[List[str]] = None, + tags: Optional[Dict[str, str]] = None, + role_arn: Optional[str] = None, + ) -> Tuple[str, str]: if change_set_type == "UPDATE": for stack in self.stacks.values(): if stack.name == stack_name: @@ -768,7 +793,7 @@ class CloudFormationBackend(BaseBackend): self.change_sets[change_set_id] = new_change_set return change_set_id, stack.stack_id - def delete_change_set(self, change_set_name): + def delete_change_set(self, change_set_name: str) -> None: if change_set_name in self.change_sets: # This means arn was passed in del self.change_sets[change_set_name] @@ -779,7 +804,7 @@ class CloudFormationBackend(BaseBackend): break del self.change_sets[to_delete] - def describe_change_set(self, change_set_name): + def describe_change_set(self, change_set_name: str) -> Optional[FakeChangeSet]: change_set = None if change_set_name in self.change_sets: # This means arn was passed in @@ -792,7 +817,9 @@ class CloudFormationBackend(BaseBackend): raise ValidationError(change_set_name) return change_set - def execute_change_set(self, change_set_name, stack_name=None): + def execute_change_set( + self, change_set_name: str, stack_name: Optional[str] = None + ) -> None: if change_set_name in self.change_sets: # This means arn was passed in change_set = self.change_sets[change_set_name] @@ -823,9 +850,8 @@ class CloudFormationBackend(BaseBackend): # set the status of the stack stack.status = f"{change_set.change_set_type}_COMPLETE" stack.template = change_set.template - return True - def describe_stacks(self, name_or_stack_id): + def describe_stacks(self, name_or_stack_id: str) -> List[FakeStack]: stacks = self.stacks.values() if name_or_stack_id: for stack in stacks: @@ -840,16 +866,16 @@ class CloudFormationBackend(BaseBackend): else: return list(stacks) - def list_change_sets(self): + def list_change_sets(self) -> Iterable[FakeChangeSet]: return self.change_sets.values() - def list_stacks(self, status_filter=None): + def list_stacks(self, status_filter: Optional[List[str]] = None) -> List[FakeStack]: total_stacks = [v for v in self.stacks.values()] + [ v for v in self.deleted_stacks.values() ] return filter_stacks(total_stacks, status_filter) - def get_stack(self, name_or_stack_id): + def get_stack(self, name_or_stack_id: str) -> FakeStack: all_stacks = dict(self.deleted_stacks, **self.stacks) if name_or_stack_id in all_stacks: # Lookup by stack id - deleted stacks incldued @@ -861,7 +887,14 @@ class CloudFormationBackend(BaseBackend): return stack raise ValidationError(name_or_stack_id) - def update_stack(self, name, template, role_arn=None, parameters=None, tags=None): + def update_stack( + self, + name: str, + template: str, + role_arn: Optional[str], + parameters: List[Dict[str, Any]], + tags: Optional[Dict[str, str]], + ) -> FakeStack: stack = self.get_stack(name) resolved_parameters = self._resolve_update_parameters( instance=stack, incoming_params=parameters @@ -869,14 +902,14 @@ class CloudFormationBackend(BaseBackend): stack.update(template, role_arn, parameters=resolved_parameters, tags=tags) return stack - def get_stack_policy(self, stack_name): + def get_stack_policy(self, stack_name: str) -> str: try: stack = self.get_stack(stack_name) except ValidationError: raise ValidationError(message=f"Stack: {stack_name} does not exist") return stack.policy - def set_stack_policy(self, stack_name, policy_body): + def set_stack_policy(self, stack_name: str, policy_body: str) -> None: """ Note that Moto does no validation/parsing/enforcement of this policy - we simply persist it. """ @@ -886,41 +919,45 @@ class CloudFormationBackend(BaseBackend): raise ValidationError(message=f"Stack: {stack_name} does not exist") stack.policy = policy_body - def list_stack_resources(self, stack_name_or_id): + def list_stack_resources( + self, stack_name_or_id: str + ) -> Iterable[Type[CloudFormationModel]]: stack = self.get_stack(stack_name_or_id) return stack.stack_resources - def delete_stack(self, name_or_stack_id): + def delete_stack(self, name_or_stack_id: str) -> None: if name_or_stack_id in self.stacks: # Delete by stack id - stack = self.stacks.pop(name_or_stack_id, None) + stack = self.stacks.pop(name_or_stack_id) export_names = [export.name for export in stack.exports] stack.delete() self.deleted_stacks[stack.stack_id] = stack for export_name in export_names: self.exports.pop(export_name) - return self.stacks.pop(name_or_stack_id, None) + self.stacks.pop(name_or_stack_id, None) else: # Delete by stack name for stack in list(self.stacks.values()): if stack.name == name_or_stack_id: self.delete_stack(stack.stack_id) - def list_exports(self, token): + def list_exports( + self, tokenstr: Optional[str] + ) -> Tuple[List[Export], Optional[str]]: all_exports = list(self.exports.values()) - if token is None: + if tokenstr is None: exports = all_exports[0:100] next_token = "100" if len(all_exports) > 100 else None else: - token = int(token) + token = int(tokenstr) exports = all_exports[token : token + 100] next_token = str(token + 100) if len(all_exports) > token + 100 else None return exports, next_token - def validate_template(self, template): + def validate_template(self, template: str) -> List[Any]: return validate_template_cfn_lint(template) - def _validate_export_uniqueness(self, stack): + def _validate_export_uniqueness(self, stack: FakeStack) -> None: new_stack_export_names = [x.name for x in stack.exports] export_names = self.exports.keys() if not set(export_names).isdisjoint(new_stack_export_names): diff --git a/moto/cloudformation/parsing.py b/moto/cloudformation/parsing.py index a303b6861..22bff1b11 100644 --- a/moto/cloudformation/parsing.py +++ b/moto/cloudformation/parsing.py @@ -6,6 +6,18 @@ import warnings import re import collections.abc as collections_abc +from typing import ( + Any, + Dict, + List, + Union, + Iterable, + Iterator, + Optional, + Tuple, + TypeVar, + Type, +) # This ugly section of imports is necessary because we # build the list of CloudFormationModel subclasses using @@ -14,35 +26,34 @@ import collections.abc as collections_abc # the subclass's module hasn't been imported yet - then that subclass # doesn't exist yet, and __subclasses__ won't find it. # So we import here to populate the list of subclasses. -from moto.apigateway import models # noqa # pylint: disable=all -from moto.autoscaling import models # noqa # pylint: disable=all -from moto.awslambda import models # noqa # pylint: disable=all -from moto.batch import models # noqa # pylint: disable=all +from moto.apigateway import models as apigw_models # noqa # pylint: disable=all +from moto.autoscaling import models as as_models # noqa # pylint: disable=all +from moto.awslambda import models as lambda_models # noqa # pylint: disable=all +from moto.batch import models as batch_models # noqa # pylint: disable=all from moto.cloudformation.custom_model import CustomModel -from moto.cloudwatch import models # noqa # pylint: disable=all -from moto.datapipeline import models # noqa # pylint: disable=all -from moto.dynamodb import models # noqa # pylint: disable=all +from moto.cloudwatch import models as cw_models # noqa # pylint: disable=all +from moto.datapipeline import models as data_models # noqa # pylint: disable=all +from moto.dynamodb import models as ddb_models # noqa # pylint: disable=all from moto.ec2 import models as ec2_models from moto.ec2.models.core import TaggedEC2Resource -from moto.ecr import models # noqa # pylint: disable=all -from moto.ecs import models # noqa # pylint: disable=all -from moto.efs import models # noqa # pylint: disable=all -from moto.elb import models # noqa # pylint: disable=all -from moto.elbv2 import models # noqa # pylint: disable=all -from moto.events import models # noqa # pylint: disable=all -from moto.iam import models # noqa # pylint: disable=all -from moto.kinesis import models # noqa # pylint: disable=all -from moto.kms import models # noqa # pylint: disable=all -from moto.rds import models # noqa # pylint: disable=all -from moto.rds import models # noqa # pylint: disable=all -from moto.redshift import models # noqa # pylint: disable=all -from moto.route53 import models # noqa # pylint: disable=all -from moto.s3 import models # noqa # pylint: disable=all -from moto.sagemaker import models # noqa # pylint: disable=all -from moto.sns import models # noqa # pylint: disable=all -from moto.sqs import models # noqa # pylint: disable=all -from moto.stepfunctions import models # noqa # pylint: disable=all -from moto.ssm import models # noqa # pylint: disable=all +from moto.ecr import models as ecr_models # noqa # pylint: disable=all +from moto.ecs import models as ecs_models # noqa # pylint: disable=all +from moto.efs import models as efs_models # noqa # pylint: disable=all +from moto.elb import models as elb_models # noqa # pylint: disable=all +from moto.elbv2 import models as elbv2_models # noqa # pylint: disable=all +from moto.events import models as events_models # noqa # pylint: disable=all +from moto.iam import models as iam_models # noqa # pylint: disable=all +from moto.kinesis import models as kinesis_models # noqa # pylint: disable=all +from moto.kms import models as kms_models # noqa # pylint: disable=all +from moto.rds import models as rds_models # noqa # pylint: disable=all +from moto.redshift import models as redshift_models # noqa # pylint: disable=all +from moto.route53 import models as route53_models # noqa # pylint: disable=all +from moto.s3 import models as s3_models # noqa # pylint: disable=all +from moto.sagemaker import models as sagemaker_models # noqa # pylint: disable=all +from moto.sns import models as sns_models # noqa # pylint: disable=all +from moto.sqs import models as sqs_models # noqa # pylint: disable=all +from moto.stepfunctions import models as sfn_models # noqa # pylint: disable=all +from moto.ssm import models as ssm_models # noqa # pylint: disable=all # End ugly list of imports @@ -66,6 +77,7 @@ NAME_TYPE_MAP = { model.cloudformation_type(): model.cloudformation_name_type() for model in MODEL_LIST } +CF_MODEL = TypeVar("CF_MODEL", bound=CloudFormationModel) # Just ignore these models types for now NULL_MODELS = [ @@ -79,18 +91,17 @@ logger = logging.getLogger("moto") class Output(object): - def __init__(self, connection=None): - self.connection = connection - self.description = None - self.key = None - self.value = None + def __init__(self, key: str, value: str, description: str): + self.description = description + self.key = key + self.value = value - def __repr__(self): + def __repr__(self) -> str: return 'Output:"%s"="%s"' % (self.key, self.value) -class LazyDict(dict): - def __getitem__(self, key): +class LazyDict(Dict[str, Any]): + def __getitem__(self, key: str) -> Any: val = dict.__getitem__(self, key) if callable(val): val = val() @@ -98,7 +109,7 @@ class LazyDict(dict): return val -def clean_json(resource_json, resources_map): +def clean_json(resource_json: Any, resources_map: "ResourceMap") -> Any: """ Cleanup the a resource dict. For now, this just means replacing any Ref node with the corresponding physical_resource_id. @@ -110,7 +121,7 @@ def clean_json(resource_json, resources_map): # Parse resource reference resource = resources_map[resource_json["Ref"]] if hasattr(resource, "physical_resource_id"): - return resource.physical_resource_id + return resource.physical_resource_id # type: ignore[attr-defined] else: return resource @@ -119,10 +130,10 @@ def clean_json(resource_json, resources_map): map_path = resource_json["Fn::FindInMap"][1:] result = resources_map[map_name] for path in map_path: - if "Fn::Transform" in result: + if "Fn::Transform" in result: # type: ignore[operator] result = resources_map[clean_json(path, resources_map)] else: - result = result[clean_json(path, resources_map)] + result = result[clean_json(path, resources_map)] # type: ignore[index] return result if "Fn::GetAtt" in resource_json: @@ -201,7 +212,7 @@ def clean_json(resource_json, resources_map): cleaned_val = clean_json(resource_json["Fn::ImportValue"], resources_map) values = [ x.value - for x in resources_map.cross_stack_resources.values() + for x in resources_map.cross_stack_resources.values() # type: ignore[union-attr] if x.name == cleaned_val ] if any(values): @@ -231,26 +242,26 @@ def clean_json(resource_json, resources_map): return resource_json -def resource_class_from_type(resource_type): +def resource_class_from_type(resource_type: str) -> Type[CloudFormationModel]: if resource_type in NULL_MODELS: - return None + return None # type: ignore[return-value] if resource_type.startswith("Custom::"): return CustomModel if resource_type not in MODEL_MAP: logger.warning("No Moto CloudFormation support for %s", resource_type) - return None + return None # type: ignore[return-value] - return MODEL_MAP.get(resource_type) + return MODEL_MAP.get(resource_type) # type: ignore[return-value] -def resource_name_property_from_type(resource_type): +def resource_name_property_from_type(resource_type: str) -> Optional[str]: for model in MODEL_LIST: if model.cloudformation_type() == resource_type: return model.cloudformation_name_type() return NAME_TYPE_MAP.get(resource_type) -def generate_resource_name(resource_type, stack_name, logical_id): +def generate_resource_name(resource_type: str, stack_name: str, logical_id: str) -> str: if resource_type in [ "AWS::ElasticLoadBalancingV2::TargetGroup", "AWS::ElasticLoadBalancingV2::LoadBalancer", @@ -277,7 +288,9 @@ def generate_resource_name(resource_type, stack_name, logical_id): return "{0}-{1}-{2}".format(stack_name, logical_id, random_suffix()) -def parse_resource(resource_json, resources_map): +def parse_resource( + resource_json: Dict[str, Any], resources_map: "ResourceMap" +) -> Tuple[Type[CloudFormationModel], Any, str]: resource_type = resource_json["Type"] resource_class = resource_class_from_type(resource_type) if not resource_class: @@ -286,7 +299,7 @@ def parse_resource(resource_json, resources_map): resource_type ) ) - return None + return None # type: ignore[return-value] if "Properties" not in resource_json: resource_json["Properties"] = {} @@ -296,14 +309,18 @@ def parse_resource(resource_json, resources_map): return resource_class, resource_json, resource_type -def parse_resource_and_generate_name(logical_id, resource_json, resources_map): - resource_tuple = parse_resource(resource_json, resources_map) +def parse_resource_and_generate_name( + logical_id: str, resource_json: Dict[str, Any], resources_map: "ResourceMap" +) -> Tuple[Type[CloudFormationModel], Dict[str, Any], str]: + resource_tuple: Tuple[ + Type[CloudFormationModel], Dict[str, Any], str + ] = parse_resource(resource_json, resources_map) if not resource_tuple: return None resource_class, resource_json, resource_type = resource_tuple generated_resource_name = generate_resource_name( - resource_type, resources_map.get("AWS::StackName"), logical_id + resource_type, resources_map["AWS::StackName"], logical_id # type: ignore[arg-type] ) resource_name_property = resource_name_property_from_type(resource_type) @@ -322,17 +339,21 @@ def parse_resource_and_generate_name(logical_id, resource_json, resources_map): def parse_and_create_resource( - logical_id, resource_json, resources_map, account_id, region_name -): + logical_id: str, + resource_json: Dict[str, Any], + resources_map: "ResourceMap", + account_id: str, + region_name: str, +) -> Optional[CF_MODEL]: condition = resource_json.get("Condition") if condition and not resources_map.lazy_condition_map[condition]: # If this has a False condition, don't create the resource return None resource_type = resource_json["Type"] - resource_tuple = parse_resource_and_generate_name( - logical_id, resource_json, resources_map - ) + resource_tuple: Tuple[ + Type[CloudFormationModel], Dict[str, Any], str + ] = parse_resource_and_generate_name(logical_id, resource_json, resources_map) if not resource_tuple: return None resource_class, resource_json, resource_physical_name = resource_tuple @@ -350,11 +371,15 @@ def parse_and_create_resource( def parse_and_update_resource( - logical_id, resource_json, resources_map, account_id, region_name -): - resource_tuple = parse_resource_and_generate_name( - logical_id, resource_json, resources_map - ) + logical_id: str, + resource_json: Dict[str, Any], + resources_map: "ResourceMap", + account_id: str, + region_name: str, +) -> Optional[CF_MODEL]: + resource_tuple: Optional[ + Tuple[Type[CloudFormationModel], Dict[str, Any], str] + ] = parse_resource_and_generate_name(logical_id, resource_json, resources_map) if not resource_tuple: return None resource_class, resource_json, new_resource_name = resource_tuple @@ -376,7 +401,9 @@ def parse_and_update_resource( return None -def parse_and_delete_resource(resource_name, resource_json, account_id, region_name): +def parse_and_delete_resource( + resource_name: str, resource_json: Dict[str, Any], account_id: str, region_name: str +) -> None: resource_type = resource_json["Type"] resource_class = resource_class_from_type(resource_type) if not hasattr( @@ -387,7 +414,7 @@ def parse_and_delete_resource(resource_name, resource_json, account_id, region_n ) -def parse_condition(condition, resources_map, condition_map): +def parse_condition(condition: Union[Dict[str, Any], bool], resources_map: "ResourceMap", condition_map: Dict[str, Any]) -> bool: # type: ignore[return] if isinstance(condition, bool): return condition @@ -423,18 +450,21 @@ def parse_condition(condition, resources_map, condition_map): ) -def parse_output(output_logical_id, output_json, resources_map): +def parse_output( + output_logical_id: str, output_json: Any, resources_map: "ResourceMap" +) -> Optional[Output]: output_json = clean_json(output_json, resources_map) if "Value" not in output_json: return None - output = Output() - output.key = output_logical_id - output.value = clean_json(output_json["Value"], resources_map) - output.description = output_json.get("Description") + output = Output( + key=output_logical_id, + value=clean_json(output_json["Value"], resources_map), + description=output_json.get("Description"), + ) return output -class ResourceMap(collections_abc.Mapping): +class ResourceMap(collections_abc.Mapping): # type: ignore[type-arg] """ This is a lazy loading map for resources. This allows us to create resources without needing to create a full dependency tree. Upon creation, each @@ -443,27 +473,29 @@ class ResourceMap(collections_abc.Mapping): def __init__( self, - stack_id, - stack_name, - parameters, - tags, - region_name, - account_id, - template, - cross_stack_resources, + stack_id: str, + stack_name: str, + parameters: Dict[str, Any], + tags: Dict[str, Any], + region_name: str, + account_id: str, + template: Dict[str, Any], + cross_stack_resources: Optional[Dict[str, "Export"]], ): self._template = template - self._resource_json_map = template["Resources"] if template != {} else {} + self._resource_json_map: Dict[str, Any] = ( + template["Resources"] if template != {} else {} + ) self._account_id = account_id self._region_name = region_name self.input_parameters = parameters self.tags = copy.deepcopy(tags) - self.resolved_parameters = {} + self.resolved_parameters: Dict[str, Any] = {} self.cross_stack_resources = cross_stack_resources self.stack_id = stack_id # Create the default resources - self._parsed_resources = { + self._parsed_resources: Dict[str, Any] = { "AWS::AccountId": account_id, "AWS::Region": self._region_name, "AWS::StackId": stack_id, @@ -473,7 +505,7 @@ class ResourceMap(collections_abc.Mapping): "AWS::Partition": "aws", } - def __getitem__(self, key): + def __getitem__(self, key: str) -> Optional[CF_MODEL]: resource_logical_id = key if resource_logical_id in self._parsed_resources: @@ -494,17 +526,17 @@ class ResourceMap(collections_abc.Mapping): self._parsed_resources[resource_logical_id] = new_resource return new_resource - def __iter__(self): + def __iter__(self) -> Iterator[str]: return iter(self.resources) - def __len__(self): + def __len__(self) -> int: return len(self._resource_json_map) - def __get_resources_in_dependency_order(self): + def __get_resources_in_dependency_order(self) -> List[str]: resource_map = copy.deepcopy(self._resource_json_map) resources_in_dependency_order = [] - def recursively_get_dependencies(resource): + def recursively_get_dependencies(resource: str) -> None: resource_info = resource_map[resource] if "DependsOn" not in resource_info: @@ -529,13 +561,13 @@ class ResourceMap(collections_abc.Mapping): return resources_in_dependency_order @property - def resources(self): + def resources(self) -> Iterable[str]: return self._resource_json_map.keys() - def load_mapping(self): + def load_mapping(self) -> None: self._parsed_resources.update(self._template.get("Mappings", {})) - def transform_mapping(self): + def transform_mapping(self) -> None: for v in self._template.get("Mappings", {}).values(): if "Fn::Transform" in v: name = v["Fn::Transform"]["Name"] @@ -548,7 +580,7 @@ class ResourceMap(collections_abc.Mapping): ) self._parsed_resources.update(json.loads(key.value)) - def parse_ssm_parameter(self, value, value_type): + def parse_ssm_parameter(self, value: str, value_type: str) -> str: # The Value in SSM parameters is the SSM parameter path # we need to use ssm_backend to retrieve the # actual value from parameter store @@ -560,7 +592,7 @@ class ResourceMap(collections_abc.Mapping): return actual_value.split(",") return actual_value - def load_parameters(self): + def load_parameters(self) -> None: parameter_slots = self._template.get("Parameters", {}) for parameter_name, parameter in parameter_slots.items(): # Set the default values. @@ -582,7 +614,7 @@ class ResourceMap(collections_abc.Mapping): if value_type == "CommaDelimitedList" or value_type.startswith("List"): value = value.split(",") - def _parse_number_parameter(num_string): + def _parse_number_parameter(num_string: str) -> Union[int, float]: """CloudFormation NUMBER types can be an int or float. Try int first and then fall back to float if that fails """ @@ -612,7 +644,7 @@ class ResourceMap(collections_abc.Mapping): self._parsed_resources.update(self.resolved_parameters) - def load_conditions(self): + def load_conditions(self) -> None: conditions = self._template.get("Conditions", {}) self.lazy_condition_map = LazyDict() for condition_name, condition in conditions.items(): @@ -626,12 +658,12 @@ class ResourceMap(collections_abc.Mapping): for condition_name in self.lazy_condition_map: self.lazy_condition_map[condition_name] - def validate_outputs(self): + def validate_outputs(self) -> None: outputs = self._template.get("Outputs") or {} for value in outputs.values(): value = value.get("Value", {}) if "Fn::GetAtt" in value: - resource_type = self._resource_json_map.get(value["Fn::GetAtt"][0])[ + resource_type = self._resource_json_map.get(value["Fn::GetAtt"][0])[ # type: ignore[index] "Type" ] attr = value["Fn::GetAtt"][1] @@ -641,14 +673,14 @@ class ResourceMap(collections_abc.Mapping): short_type = resource_type[resource_type.rindex(":") + 1 :] raise UnsupportedAttribute(resource=short_type, attr=attr) - def load(self): + def load(self) -> None: self.load_mapping() self.transform_mapping() self.load_parameters() self.load_conditions() self.validate_outputs() - def create(self, template): + def create(self, template: Dict[str, Any]) -> bool: # Since this is a lazy map, to create every object we just need to # iterate through self. # Assumes that self.load() has been called before @@ -656,8 +688,8 @@ class ResourceMap(collections_abc.Mapping): self._resource_json_map = template["Resources"] self.tags.update( { - "aws:cloudformation:stack-name": self.get("AWS::StackName"), - "aws:cloudformation:stack-id": self.get("AWS::StackId"), + "aws:cloudformation:stack-name": self["AWS::StackName"], + "aws:cloudformation:stack-id": self["AWS::StackId"], } ) all_resources_ready = True @@ -667,12 +699,14 @@ class ResourceMap(collections_abc.Mapping): self.tags["aws:cloudformation:logical-id"] = resource ec2_models.ec2_backends[self._account_id][ self._region_name - ].create_tags([instance.physical_resource_id], self.tags) + ].create_tags( + [instance.physical_resource_id], self.tags + ) # type: ignore[attr-defined] if instance and not instance.is_created(): all_resources_ready = False return all_resources_ready - def creation_complete(self): + def creation_complete(self) -> bool: all_resources_ready = True for resource in self.__get_resources_in_dependency_order(): instance = self[resource] @@ -680,7 +714,7 @@ class ResourceMap(collections_abc.Mapping): all_resources_ready = False return all_resources_ready - def build_resource_diff(self, other_template): + def build_resource_diff(self, other_template: Dict[str, Any]) -> Dict[str, Any]: old = self._resource_json_map new = other_template["Resources"] @@ -695,11 +729,17 @@ class ResourceMap(collections_abc.Mapping): return resource_names_by_action - def build_change_set_actions(self, template, parameters): + def build_change_set_actions( + self, template: Dict[str, Any] + ) -> Dict[str, Dict[str, Dict[str, str]]]: resource_names_by_action = self.build_resource_diff(template) - resources_by_action = {"Add": {}, "Modify": {}, "Remove": {}} + resources_by_action: Dict[str, Dict[str, Dict[str, str]]] = { + "Add": {}, + "Modify": {}, + "Remove": {}, + } for resource_name in resource_names_by_action["Add"]: resources_by_action["Add"][resource_name] = { @@ -721,7 +761,9 @@ class ResourceMap(collections_abc.Mapping): return resources_by_action - def update(self, template, parameters=None): + def update( + self, template: Dict[str, Any], parameters: Optional[Dict[str, Any]] = None + ) -> None: resource_names_by_action = self.build_resource_diff(template) @@ -778,7 +820,7 @@ class ResourceMap(collections_abc.Mapping): if tries == 5: raise last_exception - def delete(self): + def delete(self) -> None: remaining_resources = set(self.resources) tries = 1 while remaining_resources and tries < 5: @@ -820,8 +862,8 @@ class ResourceMap(collections_abc.Mapping): raise last_exception -class OutputMap(collections_abc.Mapping): - def __init__(self, resources, template, stack_id): +class OutputMap(collections_abc.Mapping): # type: ignore[type-arg] + def __init__(self, resources: ResourceMap, template: Dict[str, Any], stack_id: str): self._template = template self._stack_id = stack_id @@ -831,13 +873,13 @@ class OutputMap(collections_abc.Mapping): message="[/Outputs] 'null' values are not allowed in templates", ) - self._output_json_map = template.get("Outputs") + self._output_json_map: Dict[str, Any] = template.get("Outputs") # type: ignore[assignment] # Create the default resources self._resource_map = resources - self._parsed_outputs = dict() + self._parsed_outputs: Dict[str, Output] = dict() - def __getitem__(self, key): + def __getitem__(self, key: str) -> Optional[Output]: output_logical_id = key if output_logical_id in self._parsed_outputs: @@ -851,21 +893,21 @@ class OutputMap(collections_abc.Mapping): self._parsed_outputs[output_logical_id] = new_output return new_output - def __iter__(self): + def __iter__(self) -> Iterator[str]: return iter(self.outputs) - def __len__(self): - return len(self._output_json_map) + def __len__(self) -> int: + return len(self._output_json_map) # type: ignore[arg-type] @property - def outputs(self): + def outputs(self) -> Iterable[str]: return self._output_json_map.keys() if self._output_json_map else [] @property - def exports(self): + def exports(self) -> List["Export"]: exports = [] if self.outputs: - for value in self._output_json_map.values(): + for value in self._output_json_map.values(): # type: ignore[union-attr] if value.get("Export"): cleaned_name = clean_json( value["Export"].get("Name"), self._resource_map @@ -876,19 +918,19 @@ class OutputMap(collections_abc.Mapping): class Export(object): - def __init__(self, exporting_stack_id, name, value): + def __init__(self, exporting_stack_id: str, name: str, value: str): self._exporting_stack_id = exporting_stack_id self._name = name self._value = value @property - def exporting_stack_id(self): + def exporting_stack_id(self) -> str: return self._exporting_stack_id @property - def name(self): + def name(self) -> str: return self._name @property - def value(self): + def value(self) -> str: return self._value diff --git a/moto/cloudformation/responses.py b/moto/cloudformation/responses.py index 9edc88e04..f0be4aeea 100644 --- a/moto/cloudformation/responses.py +++ b/moto/cloudformation/responses.py @@ -1,5 +1,6 @@ import json import yaml +from typing import Any, Dict, Tuple, List, Optional, Union from urllib.parse import urlparse from yaml.parser import ParserError # pylint:disable=c-extension-no-member from yaml.scanner import ScannerError # pylint:disable=c-extension-no-member @@ -8,13 +9,13 @@ from moto.core.responses import BaseResponse from moto.s3.models import s3_backends from moto.s3.exceptions import S3ClientError from moto.utilities.aws_headers import amzn_request_id -from .models import cloudformation_backends +from .models import cloudformation_backends, CloudFormationBackend, FakeStack from .exceptions import ValidationError, MissingParameterError from .utils import yaml_tag_constructor -def get_template_summary_response_from_template(template_body): - def get_resource_types(template_dict): +def get_template_summary_response_from_template(template_body: str) -> Dict[str, Any]: + def get_resource_types(template_dict: Dict[str, Any]) -> List[Any]: resources = {} for key, value in template_dict.items(): if key == "Resources": @@ -38,20 +39,20 @@ def get_template_summary_response_from_template(template_body): class CloudFormationResponse(BaseResponse): - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="cloudformation") @property - def cloudformation_backend(self): + def cloudformation_backend(self) -> CloudFormationBackend: return cloudformation_backends[self.current_account][self.region] @classmethod - def cfnresponse(cls, *args, **kwargs): # pylint: disable=unused-argument + def cfnresponse(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc] # pylint: disable=unused-argument request, full_url, headers = args full_url += "&Action=ProcessCfnResponse" return cls.dispatch(request=request, full_url=full_url, headers=headers) - def _get_stack_from_s3_url(self, template_url): + def _get_stack_from_s3_url(self, template_url: str) -> str: template_url_parts = urlparse(template_url) if "localhost" in template_url: bucket_name, key_name = template_url_parts.path.lstrip("/").split("/", 1) @@ -75,7 +76,9 @@ class CloudFormationResponse(BaseResponse): ) return key.value.decode("utf-8") - def _get_params_from_list(self, parameters_list): + def _get_params_from_list( + self, parameters_list: List[Dict[str, Any]] + ) -> Dict[str, Any]: # Hack dict-comprehension return dict( [ @@ -84,7 +87,9 @@ class CloudFormationResponse(BaseResponse): ] ) - def _get_param_values(self, parameters_list, existing_params): + def _get_param_values( + self, parameters_list: List[Dict[str, str]], existing_params: Dict[str, str] + ) -> Dict[str, Any]: result = {} for parameter in parameters_list: if parameter.keys() >= {"parameter_key", "parameter_value"}: @@ -100,7 +105,7 @@ class CloudFormationResponse(BaseResponse): raise MissingParameterError(parameter["parameter_key"]) return result - def process_cfn_response(self): + def process_cfn_response(self) -> Tuple[int, Dict[str, int], str]: status = self._get_param("Status") if status == "SUCCESS": stack_id = self._get_param("StackId") @@ -113,7 +118,7 @@ class CloudFormationResponse(BaseResponse): return 200, {"status": 200}, json.dumps("{}") - def create_stack(self): + def create_stack(self) -> Union[str, Tuple[int, Dict[str, int], str]]: stack_name = self._get_param("StackName") stack_body = self._get_param("TemplateBody") template_url = self._get_param("TemplateURL") @@ -156,14 +161,14 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(CREATE_STACK_RESPONSE_TEMPLATE) return template.render(stack=stack) - def stack_name_exists(self, new_stack_name): + def stack_name_exists(self, new_stack_name: str) -> bool: for stack in self.cloudformation_backend.stacks.values(): if stack.name == new_stack_name: return True return False @amzn_request_id - def create_change_set(self): + def create_change_set(self) -> str: stack_name = self._get_param("StackName") change_set_name = self._get_param("ChangeSetName") stack_body = self._get_param("TemplateBody") @@ -209,7 +214,7 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(CREATE_CHANGE_SET_RESPONSE_TEMPLATE) return template.render(stack_id=stack_id, change_set_id=change_set_id) - def delete_change_set(self): + def delete_change_set(self) -> str: change_set_name = self._get_param("ChangeSetName") self.cloudformation_backend.delete_change_set(change_set_name=change_set_name) @@ -221,7 +226,7 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(DELETE_CHANGE_SET_RESPONSE_TEMPLATE) return template.render() - def describe_change_set(self): + def describe_change_set(self) -> str: change_set_name = self._get_param("ChangeSetName") change_set = self.cloudformation_backend.describe_change_set( change_set_name=change_set_name @@ -230,7 +235,7 @@ class CloudFormationResponse(BaseResponse): return template.render(change_set=change_set) @amzn_request_id - def execute_change_set(self): + def execute_change_set(self) -> str: stack_name = self._get_param("StackName") change_set_name = self._get_param("ChangeSetName") self.cloudformation_backend.execute_change_set( @@ -244,10 +249,8 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(EXECUTE_CHANGE_SET_RESPONSE_TEMPLATE) return template.render() - def describe_stacks(self): - stack_name_or_id = None - if self._get_param("StackName"): - stack_name_or_id = self.querystring.get("StackName")[0] + def describe_stacks(self) -> str: + stack_name_or_id = self._get_param("StackName") token = self._get_param("NextToken") stacks = self.cloudformation_backend.describe_stacks(stack_name_or_id) stack_ids = [stack.stack_id for stack in stacks] @@ -263,14 +266,14 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(DESCRIBE_STACKS_TEMPLATE) return template.render(stacks=stacks_resp, next_token=next_token) - def describe_stack_resource(self): + def describe_stack_resource(self) -> str: stack_name = self._get_param("StackName") stack = self.cloudformation_backend.get_stack(stack_name) logical_resource_id = self._get_param("LogicalResourceId") resource = None for stack_resource in stack.stack_resources: - if stack_resource.logical_resource_id == logical_resource_id: + if stack_resource.logical_resource_id == logical_resource_id: # type: ignore[attr-defined] resource = stack_resource break @@ -283,40 +286,40 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(DESCRIBE_STACK_RESOURCE_RESPONSE_TEMPLATE) return template.render(stack=stack, resource=resource) - def describe_stack_resources(self): + def describe_stack_resources(self) -> str: stack_name = self._get_param("StackName") stack = self.cloudformation_backend.get_stack(stack_name) template = self.response_template(DESCRIBE_STACK_RESOURCES_RESPONSE) return template.render(stack=stack) - def describe_stack_events(self): + def describe_stack_events(self) -> str: stack_name = self._get_param("StackName") stack = self.cloudformation_backend.get_stack(stack_name) template = self.response_template(DESCRIBE_STACK_EVENTS_RESPONSE) return template.render(stack=stack) - def list_change_sets(self): + def list_change_sets(self) -> str: change_sets = self.cloudformation_backend.list_change_sets() template = self.response_template(LIST_CHANGE_SETS_RESPONSE) return template.render(change_sets=change_sets) - def list_stacks(self): + def list_stacks(self) -> str: status_filter = self._get_multi_param("StackStatusFilter.member") stacks = self.cloudformation_backend.list_stacks(status_filter) template = self.response_template(LIST_STACKS_RESPONSE) return template.render(stacks=stacks) - def list_stack_resources(self): + def list_stack_resources(self) -> str: stack_name_or_id = self._get_param("StackName") resources = self.cloudformation_backend.list_stack_resources(stack_name_or_id) template = self.response_template(LIST_STACKS_RESOURCES_RESPONSE) return template.render(resources=resources) - def get_template(self): - name_or_stack_id = self.querystring.get("StackName")[0] + def get_template(self) -> str: + name_or_stack_id = self.querystring.get("StackName")[0] # type: ignore[index] stack = self.cloudformation_backend.get_stack(name_or_stack_id) if self.request_json: @@ -336,7 +339,7 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(GET_TEMPLATE_RESPONSE_TEMPLATE) return template.render(stack=stack) - def get_template_summary(self): + def get_template_summary(self) -> str: stack_name = self._get_param("StackName") template_url = self._get_param("TemplateURL") stack_body = self._get_param("TemplateBody") @@ -355,7 +358,12 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(GET_TEMPLATE_SUMMARY_TEMPLATE) return template.render(template_summary=template_summary) - def _validate_different_update(self, incoming_params, stack_body, old_stack): + def _validate_different_update( + self, + incoming_params: Optional[List[Dict[str, Any]]], + stack_body: str, + old_stack: FakeStack, + ) -> None: if incoming_params and stack_body: new_params = self._get_param_values(incoming_params, old_stack.parameters) if old_stack.template == stack_body and old_stack.parameters == new_params: @@ -363,7 +371,7 @@ class CloudFormationResponse(BaseResponse): old_stack.name, message=f"Stack [{old_stack.name}] already exists" ) - def _validate_status(self, stack): + def _validate_status(self, stack: FakeStack) -> None: if stack.status == "ROLLBACK_COMPLETE": raise ValidationError( stack.stack_id, @@ -371,7 +379,7 @@ class CloudFormationResponse(BaseResponse): "be updated.".format(stack.stack_id), ) - def update_stack(self): + def update_stack(self) -> str: stack_name = self._get_param("StackName") role_arn = self._get_param("RoleARN") template_url = self._get_param("TemplateURL") @@ -386,7 +394,7 @@ class CloudFormationResponse(BaseResponse): # boto3 is supposed to let you clear the tags by passing an empty value, but the request body doesn't # end up containing anything we can use to differentiate between passing an empty value versus not # passing anything. so until that changes, moto won't be able to clear tags, only update them. - tags = dict( + tags: Optional[Dict[str, str]] = dict( (item["key"], item["value"]) for item in self._get_list_prefix("Tags.member") ) @@ -414,8 +422,8 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(UPDATE_STACK_RESPONSE_TEMPLATE) return template.render(stack=stack) - def delete_stack(self): - name_or_stack_id = self.querystring.get("StackName")[0] + def delete_stack(self) -> str: + name_or_stack_id = self.querystring.get("StackName")[0] # type: ignore[index] self.cloudformation_backend.delete_stack(name_or_stack_id) if self.request_json: @@ -424,13 +432,13 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(DELETE_STACK_RESPONSE_TEMPLATE) return template.render() - def list_exports(self): + def list_exports(self) -> str: token = self._get_param("NextToken") - exports, next_token = self.cloudformation_backend.list_exports(token=token) + exports, next_token = self.cloudformation_backend.list_exports(tokenstr=token) template = self.response_template(LIST_EXPORTS_RESPONSE) return template.render(exports=exports, next_token=next_token) - def validate_template(self): + def validate_template(self) -> str: template_body = self._get_param("TemplateBody") template_url = self._get_param("TemplateURL") if template_url: @@ -451,7 +459,7 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(VALIDATE_STACK_RESPONSE_TEMPLATE) return template.render(description=description) - def create_stack_set(self): + def create_stack_set(self) -> str: stackset_name = self._get_param("StackSetName") stack_body = self._get_param("TemplateBody") template_url = self._get_param("TemplateURL") @@ -486,7 +494,7 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(CREATE_STACK_SET_RESPONSE_TEMPLATE) return template.render(stackset=stackset) - def create_stack_instances(self): + def create_stack_instances(self) -> str: stackset_name = self._get_param("StackSetName") accounts = self._get_multi_param("Accounts.member") regions = self._get_multi_param("Regions.member") @@ -497,13 +505,13 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(CREATE_STACK_INSTANCES_TEMPLATE) return template.render() - def delete_stack_set(self): + def delete_stack_set(self) -> str: stackset_name = self._get_param("StackSetName") self.cloudformation_backend.delete_stack_set(stackset_name) template = self.response_template(DELETE_STACK_SET_RESPONSE_TEMPLATE) return template.render() - def delete_stack_instances(self): + def delete_stack_instances(self) -> str: stackset_name = self._get_param("StackSetName") accounts = self._get_multi_param("Accounts.member") regions = self._get_multi_param("Regions.member") @@ -514,7 +522,7 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(DELETE_STACK_INSTANCES_TEMPLATE) return template.render(operation=operation) - def describe_stack_set(self): + def describe_stack_set(self) -> str: stackset_name = self._get_param("StackSetName") stackset = self.cloudformation_backend.get_stack_set(stackset_name) @@ -526,7 +534,7 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(DESCRIBE_STACK_SET_RESPONSE_TEMPLATE) return template.render(stackset=stackset) - def describe_stack_instance(self): + def describe_stack_instance(self) -> str: stackset_name = self._get_param("StackSetName") account = self._get_param("StackInstanceAccount") region = self._get_param("StackInstanceRegion") @@ -538,24 +546,24 @@ class CloudFormationResponse(BaseResponse): rendered = template.render(instance=instance) return rendered - def list_stack_sets(self): + def list_stack_sets(self) -> str: stacksets = self.cloudformation_backend.stacksets template = self.response_template(LIST_STACK_SETS_TEMPLATE) return template.render(stacksets=stacksets) - def list_stack_instances(self): + def list_stack_instances(self) -> str: stackset_name = self._get_param("StackSetName") stackset = self.cloudformation_backend.get_stack_set(stackset_name) template = self.response_template(LIST_STACK_INSTANCES_TEMPLATE) return template.render(stackset=stackset) - def list_stack_set_operations(self): + def list_stack_set_operations(self) -> str: stackset_name = self._get_param("StackSetName") stackset = self.cloudformation_backend.get_stack_set(stackset_name) template = self.response_template(LIST_STACK_SET_OPERATIONS_RESPONSE_TEMPLATE) return template.render(stackset=stackset) - def stop_stack_set_operation(self): + def stop_stack_set_operation(self) -> str: stackset_name = self._get_param("StackSetName") operation_id = self._get_param("OperationId") stackset = self.cloudformation_backend.get_stack_set(stackset_name) @@ -563,7 +571,7 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(STOP_STACK_SET_OPERATION_RESPONSE_TEMPLATE) return template.render() - def describe_stack_set_operation(self): + def describe_stack_set_operation(self) -> str: stackset_name = self._get_param("StackSetName") operation_id = self._get_param("OperationId") stackset = self.cloudformation_backend.get_stack_set(stackset_name) @@ -571,7 +579,7 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(DESCRIBE_STACKSET_OPERATION_RESPONSE_TEMPLATE) return template.render(stackset=stackset, operation=operation) - def list_stack_set_operation_results(self): + def list_stack_set_operation_results(self) -> str: stackset_name = self._get_param("StackSetName") operation_id = self._get_param("OperationId") stackset = self.cloudformation_backend.get_stack_set(stackset_name) @@ -581,7 +589,7 @@ class CloudFormationResponse(BaseResponse): ) return template.render(operation=operation) - def update_stack_set(self): + def update_stack_set(self) -> str: stackset_name = self._get_param("StackSetName") operation_id = self._get_param("OperationId") description = self._get_param("Description") @@ -615,24 +623,24 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(UPDATE_STACK_SET_RESPONSE_TEMPLATE) return template.render(operation=operation) - def update_stack_instances(self): + def update_stack_instances(self) -> str: stackset_name = self._get_param("StackSetName") accounts = self._get_multi_param("Accounts.member") regions = self._get_multi_param("Regions.member") parameters = self._get_multi_param("ParameterOverrides.member") - operation = self.cloudformation_backend.get_stack_set( - stackset_name - ).update_instances(accounts, regions, parameters) + operation = self.cloudformation_backend.update_stack_instances( + stackset_name, accounts, regions, parameters + ) template = self.response_template(UPDATE_STACK_INSTANCES_RESPONSE_TEMPLATE) return template.render(operation=operation) - def get_stack_policy(self): + def get_stack_policy(self) -> str: stack_name = self._get_param("StackName") policy = self.cloudformation_backend.get_stack_policy(stack_name) template = self.response_template(GET_STACK_POLICY_RESPONSE) return template.render(policy=policy) - def set_stack_policy(self): + def set_stack_policy(self) -> str: stack_name = self._get_param("StackName") policy_url = self._get_param("StackPolicyURL") policy_body = self._get_param("StackPolicyBody") diff --git a/moto/cloudformation/utils.py b/moto/cloudformation/utils.py index 5ddfb073e..c37d8bef6 100644 --- a/moto/cloudformation/utils.py +++ b/moto/cloudformation/utils.py @@ -2,37 +2,40 @@ import yaml import os import string from moto.moto_api._internal import mock_random as random +from typing import Any, List -def generate_stack_id(stack_name, region, account): +def generate_stack_id(stack_name: str, region: str, account: str) -> str: random_id = random.uuid4() return f"arn:aws:cloudformation:{region}:{account}:stack/{stack_name}/{random_id}" -def generate_changeset_id(changeset_name, region_name, account_id): +def generate_changeset_id( + changeset_name: str, region_name: str, account_id: str +) -> str: random_id = random.uuid4() return f"arn:aws:cloudformation:{region_name}:{account_id}:changeSet/{changeset_name}/{random_id}" -def generate_stackset_id(stackset_name): +def generate_stackset_id(stackset_name: str) -> str: random_id = random.uuid4() return "{}:{}".format(stackset_name, random_id) -def generate_stackset_arn(stackset_id, region_name, account_id): +def generate_stackset_arn(stackset_id: str, region_name: str, account_id: str) -> str: return f"arn:aws:cloudformation:{region_name}:{account_id}:stackset/{stackset_id}" -def random_suffix(): +def random_suffix() -> str: size = 12 chars = list(range(10)) + list(string.ascii_uppercase) return "".join(str(random.choice(chars)) for x in range(size)) -def yaml_tag_constructor(loader, tag, node): +def yaml_tag_constructor(loader: Any, tag: Any, node: Any) -> Any: """convert shorthand intrinsic function to full name""" - def _f(loader, tag, node): + def _f(loader: Any, tag: Any, node: Any) -> Any: if tag == "!GetAtt": if isinstance(node.value, list): return node.value @@ -50,7 +53,7 @@ def yaml_tag_constructor(loader, tag, node): return {key: _f(loader, tag, node)} -def validate_template_cfn_lint(template): +def validate_template_cfn_lint(template: str) -> List[Any]: # Importing cfnlint adds a significant overhead, so we keep it local from cfnlint import decode, core diff --git a/moto/core/common_models.py b/moto/core/common_models.py index 0a0bb6189..950a56019 100644 --- a/moto/core/common_models.py +++ b/moto/core/common_models.py @@ -1,4 +1,5 @@ from abc import abstractmethod +from typing import Any, Dict from .base_backend import InstanceTrackerMeta @@ -14,22 +15,22 @@ class BaseModel(metaclass=InstanceTrackerMeta): class CloudFormationModel(BaseModel): @staticmethod @abstractmethod - def cloudformation_name_type(): + def cloudformation_name_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-name.html # This must be implemented as a staticmethod with no parameters - # Return None for resources that do not have a name property - pass + # Return "" for resources that do not have a name property + return "" @staticmethod @abstractmethod - def cloudformation_type(): + def cloudformation_type() -> str: # This must be implemented as a staticmethod with no parameters # See for example https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-dynamodb-table.html return "AWS::SERVICE::RESOURCE" @classmethod @abstractmethod - def has_cfn_attr(cls, attr): # pylint: disable=unused-argument + def has_cfn_attr(cls, attr: str) -> bool: # pylint: disable=unused-argument # Used for validation # If a template creates an Output for an attribute that does not exist, an error should be thrown return True @@ -37,44 +38,53 @@ class CloudFormationModel(BaseModel): @classmethod @abstractmethod def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + cls, + resource_name: str, + cloudformation_json: Dict[str, Any], + account_id: str, + region_name: str, + **kwargs: Any + ) -> Any: # This must be implemented as a classmethod with parameters: # cls, resource_name, cloudformation_json, account_id, region_name # Extract the resource parameters from the cloudformation json # and return an instance of the resource class - pass + ... @classmethod @abstractmethod def update_from_cloudformation_json( cls, - original_resource, - new_resource_name, - cloudformation_json, - account_id, - region_name, - ): + original_resource: Any, + new_resource_name: str, + cloudformation_json: Dict[str, Any], + account_id: str, + region_name: str, + ) -> Any: # This must be implemented as a classmethod with parameters: # cls, original_resource, new_resource_name, cloudformation_json, account_id, region_name # Extract the resource parameters from the cloudformation json, # delete the old resource and return the new one. Optionally inspect # the change in parameters and no-op when nothing has changed. - pass + ... @classmethod @abstractmethod def delete_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name - ): + cls, + resource_name: str, + cloudformation_json: Dict[str, Any], + account_id: str, + region_name: str, + ) -> None: # This must be implemented as a classmethod with parameters: # cls, resource_name, cloudformation_json, account_id, region_name # Extract the resource parameters from the cloudformation json # and delete the resource. Do not include a return statement. - pass + ... @abstractmethod - def is_created(self): + def is_created(self) -> bool: # Verify whether the resource was created successfully # Assume True after initialization # Custom resources may need time after init before they are created successfully diff --git a/moto/core/responses.py b/moto/core/responses.py index 0b6a504c7..7d4b7fe7d 100644 --- a/moto/core/responses.py +++ b/moto/core/responses.py @@ -218,7 +218,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin): self.service_name = service_name @classmethod - def dispatch(cls, *args, **kwargs): + def dispatch(cls, *args: Any, **kwargs: Any) -> Any: return cls()._dispatch(*args, **kwargs) def setup_class( diff --git a/moto/core/utils.py b/moto/core/utils.py index f1959669f..5eab94456 100644 --- a/moto/core/utils.py +++ b/moto/core/utils.py @@ -152,16 +152,18 @@ def iso_8601_datetime_with_milliseconds(value: datetime) -> str: # Even Python does not support nanoseconds, other languages like Go do (needed for Terraform) -def iso_8601_datetime_with_nanoseconds(value): +def iso_8601_datetime_with_nanoseconds(value: datetime.datetime) -> str: return value.strftime("%Y-%m-%dT%H:%M:%S.%f000Z") -def iso_8601_datetime_without_milliseconds(value): - return None if value is None else value.strftime("%Y-%m-%dT%H:%M:%SZ") +def iso_8601_datetime_without_milliseconds(value: datetime.datetime) -> Optional[str]: + return value.strftime("%Y-%m-%dT%H:%M:%SZ") if value else None -def iso_8601_datetime_without_milliseconds_s3(value): - return None if value is None else value.strftime("%Y-%m-%dT%H:%M:%S.000Z") +def iso_8601_datetime_without_milliseconds_s3( + value: datetime.datetime, +) -> Optional[str]: + return value.strftime("%Y-%m-%dT%H:%M:%S.000Z") if value else None RFC1123 = "%a, %d %b %Y %H:%M:%S GMT" diff --git a/moto/s3/utils.py b/moto/s3/utils.py index 79cf48ca5..fcf404484 100644 --- a/moto/s3/utils.py +++ b/moto/s3/utils.py @@ -3,6 +3,7 @@ import logging import re from urllib.parse import urlparse, unquote, quote from requests.structures import CaseInsensitiveDict +from typing import Union, Tuple import sys from moto.settings import S3_IGNORE_SUBDOMAIN_BUCKETNAME @@ -44,7 +45,7 @@ def bucket_name_from_url(url): # 'owi-common-cf', 'snippets/test.json' = bucket_and_name_from_url('s3://owi-common-cf/snippets/test.json') -def bucket_and_name_from_url(url): +def bucket_and_name_from_url(url: str) -> Union[Tuple[str, str], Tuple[None, None]]: prefix = "s3://" if url.startswith(prefix): bucket_name = url[len(prefix) : url.index("/", len(prefix))] diff --git a/setup.cfg b/setup.cfg index e06e9cec2..3391d9fdf 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,7 +18,7 @@ disable = W,C,R,E enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import [mypy] -files= moto/a*,moto/b*,moto/ce +files= moto/a*,moto/b*,moto/ce,moto/cloudformation show_column_numbers=True show_error_codes = True disable_error_code=abstract