From 2500affb1d838c4453739dc114c80a47b847b64b Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Thu, 3 Nov 2022 18:32:00 -0100 Subject: [PATCH] TechDebt: MyPy Config (#5633) --- moto/config/exceptions.py | 79 +++--- moto/config/models.py | 439 +++++++++++++++++------------- moto/config/responses.py | 76 +++--- moto/core/common_models.py | 30 +- moto/core/responses.py | 2 +- moto/ec2/models/instance_types.py | 6 +- moto/utilities/utils.py | 6 +- setup.cfg | 2 +- 8 files changed, 358 insertions(+), 282 deletions(-) diff --git a/moto/config/exceptions.py b/moto/config/exceptions.py index 5bb2c9e02..d14f1a7db 100644 --- a/moto/config/exceptions.py +++ b/moto/config/exceptions.py @@ -1,10 +1,11 @@ from moto.core.exceptions import JsonRESTError +from typing import Any, List, Optional class NameTooLongException(JsonRESTError): code = 400 - def __init__(self, name, location, max_limit=256): + def __init__(self, name: str, location: str, max_limit: int = 256): message = ( f"1 validation error detected: Value '{name}' at '{location}' " f"failed to satisfy constraint: Member must have length less " @@ -16,7 +17,7 @@ class NameTooLongException(JsonRESTError): class InvalidConfigurationRecorderNameException(JsonRESTError): code = 400 - def __init__(self, name): + def __init__(self, name: Optional[str]): message = ( f"The configuration recorder name '{name}' is not valid, blank string." ) @@ -26,7 +27,7 @@ class InvalidConfigurationRecorderNameException(JsonRESTError): class MaxNumberOfConfigurationRecordersExceededException(JsonRESTError): code = 400 - def __init__(self, name): + def __init__(self, name: str): message = ( f"Failed to put configuration recorder '{name}' because the maximum number of " "configuration recorders: 1 is reached." @@ -37,7 +38,7 @@ class MaxNumberOfConfigurationRecordersExceededException(JsonRESTError): class InvalidRecordingGroupException(JsonRESTError): code = 400 - def __init__(self): + def __init__(self) -> None: message = "The recording group provided is not valid" super().__init__("InvalidRecordingGroupException", message) @@ -45,7 +46,7 @@ class InvalidRecordingGroupException(JsonRESTError): class InvalidResourceTypeException(JsonRESTError): code = 400 - def __init__(self, bad_list, good_list): + def __init__(self, bad_list: List[str], good_list: Any): message = ( f"{len(bad_list)} validation error detected: Value '{bad_list}' at " "'configurationRecorder.recordingGroup.resourceTypes' failed to satisfy constraint: " @@ -58,7 +59,7 @@ class InvalidResourceTypeException(JsonRESTError): class NoSuchConfigurationAggregatorException(JsonRESTError): code = 400 - def __init__(self, number=1): + def __init__(self, number: int = 1): if number == 1: message = "The configuration aggregator does not exist. Check the configuration aggregator name and try again." else: @@ -72,7 +73,7 @@ class NoSuchConfigurationAggregatorException(JsonRESTError): class NoSuchConfigurationRecorderException(JsonRESTError): code = 400 - def __init__(self, name): + def __init__(self, name: str): message = ( f"Cannot find configuration recorder with the specified name '{name}'." ) @@ -82,7 +83,7 @@ class NoSuchConfigurationRecorderException(JsonRESTError): class InvalidDeliveryChannelNameException(JsonRESTError): code = 400 - def __init__(self, name): + def __init__(self, name: Optional[str]): message = f"The delivery channel name '{name}' is not valid, blank string." super().__init__("InvalidDeliveryChannelNameException", message) @@ -92,7 +93,7 @@ class NoSuchBucketException(JsonRESTError): code = 400 - def __init__(self): + def __init__(self) -> None: message = "Cannot find a S3 bucket with an empty bucket name." super().__init__("NoSuchBucketException", message) @@ -100,7 +101,7 @@ class NoSuchBucketException(JsonRESTError): class InvalidNextTokenException(JsonRESTError): code = 400 - def __init__(self): + def __init__(self) -> None: message = "The nextToken provided is invalid" super().__init__("InvalidNextTokenException", message) @@ -108,7 +109,7 @@ class InvalidNextTokenException(JsonRESTError): class InvalidS3KeyPrefixException(JsonRESTError): code = 400 - def __init__(self): + def __init__(self) -> None: message = "The s3 key prefix '' is not valid, empty s3 key prefix." super().__init__("InvalidS3KeyPrefixException", message) @@ -118,7 +119,7 @@ class InvalidSNSTopicARNException(JsonRESTError): code = 400 - def __init__(self): + def __init__(self) -> None: message = "The sns topic arn '' is not valid." super().__init__("InvalidSNSTopicARNException", message) @@ -126,7 +127,7 @@ class InvalidSNSTopicARNException(JsonRESTError): class InvalidDeliveryFrequency(JsonRESTError): code = 400 - def __init__(self, value, good_list): + def __init__(self, value: str, good_list: Any): message = ( f"1 validation error detected: Value '{value}' at " "'deliveryChannel.configSnapshotDeliveryProperties.deliveryFrequency' failed to satisfy " @@ -138,7 +139,7 @@ class InvalidDeliveryFrequency(JsonRESTError): class MaxNumberOfDeliveryChannelsExceededException(JsonRESTError): code = 400 - def __init__(self, name): + def __init__(self, name: str): message = f"Failed to put delivery channel '{name}' because the maximum number of delivery channels: 1 is reached." super().__init__("MaxNumberOfDeliveryChannelsExceededException", message) @@ -146,7 +147,7 @@ class MaxNumberOfDeliveryChannelsExceededException(JsonRESTError): class NoSuchDeliveryChannelException(JsonRESTError): code = 400 - def __init__(self, name): + def __init__(self, name: str): message = f"Cannot find delivery channel with specified name '{name}'." super().__init__("NoSuchDeliveryChannelException", message) @@ -154,7 +155,7 @@ class NoSuchDeliveryChannelException(JsonRESTError): class NoAvailableConfigurationRecorderException(JsonRESTError): code = 400 - def __init__(self): + def __init__(self) -> None: message = "Configuration recorder is not available to put delivery channel." super().__init__("NoAvailableConfigurationRecorderException", message) @@ -162,7 +163,7 @@ class NoAvailableConfigurationRecorderException(JsonRESTError): class NoAvailableDeliveryChannelException(JsonRESTError): code = 400 - def __init__(self): + def __init__(self) -> None: message = "Delivery channel is not available to start configuration recorder." super().__init__("NoAvailableDeliveryChannelException", message) @@ -170,7 +171,7 @@ class NoAvailableDeliveryChannelException(JsonRESTError): class LastDeliveryChannelDeleteFailedException(JsonRESTError): code = 400 - def __init__(self, name): + def __init__(self, name: str): message = ( f"Failed to delete last specified delivery channel with name '{name}', because there, " "because there is a running configuration recorder." @@ -181,7 +182,7 @@ class LastDeliveryChannelDeleteFailedException(JsonRESTError): class TooManyAccountSources(JsonRESTError): code = 400 - def __init__(self, length): + def __init__(self, length: int): locations = ["com.amazonaws.xyz"] * length message = ( @@ -196,7 +197,7 @@ class TooManyAccountSources(JsonRESTError): class DuplicateTags(JsonRESTError): code = 400 - def __init__(self): + def __init__(self) -> None: super().__init__( "InvalidInput", "Duplicate tag keys found. Please note that Tag keys are case insensitive.", @@ -206,7 +207,7 @@ class DuplicateTags(JsonRESTError): class TagKeyTooBig(JsonRESTError): code = 400 - def __init__(self, tag, param="tags.X.member.key"): + def __init__(self, tag: str, param: str = "tags.X.member.key"): super().__init__( "ValidationException", f"1 validation error detected: Value '{tag}' at '{param}' failed to satisfy " @@ -217,7 +218,7 @@ class TagKeyTooBig(JsonRESTError): class TagValueTooBig(JsonRESTError): code = 400 - def __init__(self, tag, param="tags.X.member.value"): + def __init__(self, tag: str, param: str = "tags.X.member.value"): super().__init__( "ValidationException", f"1 validation error detected: Value '{tag}' at '{param}' failed to satisfy " @@ -228,14 +229,14 @@ class TagValueTooBig(JsonRESTError): class InvalidParameterValueException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("InvalidParameterValueException", message) class InvalidTagCharacters(JsonRESTError): code = 400 - def __init__(self, tag, param="tags.X.member.key"): + def __init__(self, tag: str, param: str = "tags.X.member.key"): message = f"1 validation error detected: Value '{tag}' at '{param}' failed to satisfy " message += "constraint: Member must satisfy regular expression pattern: [\\\\p{L}\\\\p{Z}\\\\p{N}_.:/=+\\\\-@]+" @@ -245,7 +246,7 @@ class InvalidTagCharacters(JsonRESTError): class TooManyTags(JsonRESTError): code = 400 - def __init__(self, tags, param="tags"): + def __init__(self, tags: Any, param: str = "tags"): super().__init__( "ValidationException", f"1 validation error detected: Value '{tags}' at '{param}' failed to satisfy " @@ -256,7 +257,7 @@ class TooManyTags(JsonRESTError): class InvalidResourceParameters(JsonRESTError): code = 400 - def __init__(self): + def __init__(self) -> None: super().__init__( "ValidationException", "Both Resource ID and Resource Name " "cannot be specified in the request", @@ -266,7 +267,7 @@ class InvalidResourceParameters(JsonRESTError): class InvalidLimitException(JsonRESTError): code = 400 - def __init__(self, value): + def __init__(self, value: int): super().__init__( "InvalidLimitException", f"Value '{value}' at 'limit' failed to satisfy constraint: Member" @@ -277,7 +278,7 @@ class InvalidLimitException(JsonRESTError): class TooManyResourceIds(JsonRESTError): code = 400 - def __init__(self): + def __init__(self) -> None: super().__init__( "ValidationException", "The specified list had more than 20 resource ID's. " @@ -288,7 +289,7 @@ class TooManyResourceIds(JsonRESTError): class ResourceNotDiscoveredException(JsonRESTError): code = 400 - def __init__(self, resource_type, resource): + def __init__(self, resource_type: str, resource: str): super().__init__( "ResourceNotDiscoveredException", f"Resource {resource} of resourceType:{resource_type} is unknown or has not been discovered", @@ -298,7 +299,7 @@ class ResourceNotDiscoveredException(JsonRESTError): class ResourceNotFoundException(JsonRESTError): code = 400 - def __init__(self, resource_arn): + def __init__(self, resource_arn: str): super().__init__( "ResourceNotFoundException", f"ResourceArn '{resource_arn}' does not exist" ) @@ -307,7 +308,7 @@ class ResourceNotFoundException(JsonRESTError): class TooManyResourceKeys(JsonRESTError): code = 400 - def __init__(self, bad_list): + def __init__(self, bad_list: List[str]): message = ( f"1 validation error detected: Value '{bad_list}' at " "'resourceKeys' failed to satisfy constraint: " @@ -319,7 +320,7 @@ class TooManyResourceKeys(JsonRESTError): class InvalidResultTokenException(JsonRESTError): code = 400 - def __init__(self): + def __init__(self) -> None: message = "The resultToken provided is invalid" super().__init__("InvalidResultTokenException", message) @@ -327,21 +328,21 @@ class InvalidResultTokenException(JsonRESTError): class ValidationException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("ValidationException", message) class NoSuchOrganizationConformancePackException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("NoSuchOrganizationConformancePackException", message) class MaxNumberOfConfigRulesExceededException(JsonRESTError): code = 400 - def __init__(self, name, max_limit): + def __init__(self, name: str, max_limit: int): message = f"Failed to put config rule '{name}' because the maximum number of config rules: {max_limit} is reached." super().__init__("MaxNumberOfConfigRulesExceededException", message) @@ -349,21 +350,21 @@ class MaxNumberOfConfigRulesExceededException(JsonRESTError): class ResourceInUseException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("ResourceInUseException", message) class InsufficientPermissionsException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("InsufficientPermissionsException", message) class NoSuchConfigRuleException(JsonRESTError): code = 400 - def __init__(self, rule_name): + def __init__(self, rule_name: str): message = f"The ConfigRule '{rule_name}' provided in the request is invalid. Please check the configRule name" super().__init__("NoSuchConfigRuleException", message) @@ -371,5 +372,5 @@ class NoSuchConfigRuleException(JsonRESTError): class MissingRequiredConfigRuleParameterException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("ParamValidationError", message) diff --git a/moto/config/models.py b/moto/config/models.py index c516410c7..ac9786ce0 100644 --- a/moto/config/models.py +++ b/moto/config/models.py @@ -4,6 +4,7 @@ import re import time from datetime import datetime +from typing import Any, Dict, List, Optional, Union from moto.config.exceptions import ( InvalidResourceTypeException, @@ -48,6 +49,7 @@ from moto.config.exceptions import ( ) from moto.core import BaseBackend, BaseModel +from moto.core.common_models import ConfigQueryModel from moto.core.responses import AWSServiceSpec from moto.core.utils import BackendDict from moto.iam.config import role_config_query, policy_config_query @@ -69,7 +71,7 @@ DEFAULT_PAGE_SIZE = 100 CONFIG_RULE_PAGE_SIZE = 25 # Map the Config resource type to a backend: -RESOURCE_MAP = { +RESOURCE_MAP: Dict[str, ConfigQueryModel] = { "AWS::S3::Bucket": s3_config_query, "AWS::S3::AccountPublicAccessBlock": s3_account_public_access_block_query, "AWS::IAM::Role": role_config_query, @@ -81,14 +83,14 @@ CAMEL_TO_SNAKE_REGEX = re.compile(r"(? int: return int(time.mktime(date.timetuple())) -def snake_to_camels(original, cap_start, cap_arn): +def snake_to_camels(original: str, cap_start: bool, cap_arn: bool) -> str: parts = original.split("_") camel_cased = parts[0].lower() + "".join(p.title() for p in parts[1:]) @@ -104,12 +106,12 @@ def snake_to_camels(original, cap_start, cap_arn): return camel_cased -def random_string(): +def random_string() -> str: """Returns a random set of 8 lowercase letters for the Config Aggregator ARN""" return random.get_random_string(length=8, include_digits=False, lower_case=True) -def validate_tag_key(tag_key, exception_param="tags.X.member.key"): +def validate_tag_key(tag_key: str, exception_param: str = "tags.X.member.key") -> None: """Validates the tag key. :param tag_key: The tag key to check against. @@ -131,7 +133,7 @@ def validate_tag_key(tag_key, exception_param="tags.X.member.key"): raise InvalidTagCharacters(tag_key, param=exception_param) -def check_tag_duplicate(all_tags, tag_key): +def check_tag_duplicate(all_tags: Dict[str, str], tag_key: str) -> None: """Validates that a tag key is not a duplicate :param all_tags: Dict to check if there is a duplicate tag. @@ -142,8 +144,8 @@ def check_tag_duplicate(all_tags, tag_key): raise DuplicateTags() -def validate_tags(tags): - proper_tags = {} +def validate_tags(tags: List[Dict[str, str]]) -> Dict[str, str]: + proper_tags: Dict[str, str] = {} if len(tags) > MAX_TAGS_IN_ARG: raise TooManyTags(tags) @@ -162,7 +164,7 @@ def validate_tags(tags): return proper_tags -def convert_to_class_args(dict_arg): +def convert_to_class_args(dict_arg: Dict[str, Any]) -> Dict[str, Any]: """Return dict that can be used to instantiate it's representative class. Given a dictionary in the incoming API request, convert the keys to @@ -184,7 +186,7 @@ class ConfigEmptyDictable(BaseModel): This assumes that the sub-class will NOT return 'None's in the JSON. """ - def __init__(self, capitalize_start=False, capitalize_arn=True): + def __init__(self, capitalize_start: bool = False, capitalize_arn: bool = True): """Assists with the serialization of the config object :param capitalize_start: For some Config services, the first letter is lowercase -- for others it's capital @@ -194,8 +196,8 @@ class ConfigEmptyDictable(BaseModel): self.capitalize_start = capitalize_start self.capitalize_arn = capitalize_arn - def to_dict(self): - data = {} + def to_dict(self) -> Dict[str, Any]: + data: Dict[str, Any] = {} for item, value in self.__dict__.items(): # ignore private attributes if not item.startswith("_") and value is not None: @@ -220,32 +222,32 @@ class ConfigEmptyDictable(BaseModel): class ConfigRecorderStatus(ConfigEmptyDictable): - def __init__(self, name): + def __init__(self, name: str): super().__init__() self.name = name self.recording = False - self.last_start_time = None - self.last_stop_time = None - self.last_status = None - self.last_error_code = None - self.last_error_message = None - self.last_status_change_time = None + self.last_start_time: Optional[int] = None + self.last_stop_time: Optional[int] = None + self.last_status: Optional[str] = None + self.last_error_code: Optional[str] = None + self.last_error_message: Optional[str] = None + self.last_status_change_time: Optional[int] = None - def start(self): + def start(self) -> None: self.recording = True self.last_status = "PENDING" self.last_start_time = datetime2int(datetime.utcnow()) self.last_status_change_time = datetime2int(datetime.utcnow()) - def stop(self): + def stop(self) -> None: self.recording = False self.last_stop_time = datetime2int(datetime.utcnow()) self.last_status_change_time = datetime2int(datetime.utcnow()) class ConfigDeliverySnapshotProperties(ConfigEmptyDictable): - def __init__(self, delivery_frequency): + def __init__(self, delivery_frequency: str): super().__init__() self.delivery_frequency = delivery_frequency @@ -253,7 +255,12 @@ class ConfigDeliverySnapshotProperties(ConfigEmptyDictable): class ConfigDeliveryChannel(ConfigEmptyDictable): def __init__( - self, name, s3_bucket_name, prefix=None, sns_arn=None, snapshot_properties=None + self, + name: str, + s3_bucket_name: str, + prefix: Optional[str] = None, + sns_arn: Optional[str] = None, + snapshot_properties: Optional[ConfigDeliverySnapshotProperties] = None, ): super().__init__() @@ -267,9 +274,9 @@ class ConfigDeliveryChannel(ConfigEmptyDictable): class RecordingGroup(ConfigEmptyDictable): def __init__( self, - all_supported=True, - include_global_resource_types=False, - resource_types=None, + all_supported: bool = True, + include_global_resource_types: bool = False, + resource_types: Optional[List[str]] = None, ): super().__init__() @@ -279,7 +286,13 @@ class RecordingGroup(ConfigEmptyDictable): class ConfigRecorder(ConfigEmptyDictable): - def __init__(self, role_arn, recording_group, name="default", status=None): + def __init__( + self, + role_arn: str, + recording_group: RecordingGroup, + name: str = "default", + status: Optional[ConfigRecorderStatus] = None, + ): super().__init__() self.name = name @@ -293,7 +306,12 @@ class ConfigRecorder(ConfigEmptyDictable): class AccountAggregatorSource(ConfigEmptyDictable): - def __init__(self, account_ids, aws_regions=None, all_aws_regions=None): + def __init__( + self, + account_ids: List[str], + aws_regions: Optional[List[str]] = None, + all_aws_regions: Optional[bool] = None, + ): super().__init__(capitalize_start=True) # Can't have both the regions and all_regions flag present -- also @@ -321,7 +339,12 @@ class AccountAggregatorSource(ConfigEmptyDictable): class OrganizationAggregationSource(ConfigEmptyDictable): - def __init__(self, role_arn, aws_regions=None, all_aws_regions=None): + def __init__( + self, + role_arn: str, + aws_regions: Optional[List[str]] = None, + all_aws_regions: Optional[bool] = None, + ): super().__init__(capitalize_start=True, capitalize_arn=False) # Can't have both the regions and all_regions flag present -- also @@ -349,7 +372,13 @@ class OrganizationAggregationSource(ConfigEmptyDictable): class ConfigAggregator(ConfigEmptyDictable): def __init__( - self, name, account_id, region, account_sources=None, org_source=None, tags=None + self, + name: str, + account_id: str, + region: str, + account_sources: Optional[List[AccountAggregatorSource]] = None, + org_source: Optional[OrganizationAggregationSource] = None, + tags: Optional[Dict[str, str]] = None, ): super().__init__(capitalize_start=True, capitalize_arn=False) @@ -364,7 +393,7 @@ class ConfigAggregator(ConfigEmptyDictable): self.tags = tags or {} # Override the to_dict so that we can format the tags properly... - def to_dict(self): + def to_dict(self) -> Dict[str, Any]: result = super().to_dict() # Override the account aggregation sources if present: @@ -384,11 +413,11 @@ class ConfigAggregator(ConfigEmptyDictable): class ConfigAggregationAuthorization(ConfigEmptyDictable): def __init__( self, - account_id, - current_region, - authorized_account_id, - authorized_aws_region, - tags=None, + account_id: str, + current_region: str, + authorized_account_id: str, + authorized_aws_region: str, + tags: Dict[str, str], ): super().__init__(capitalize_start=True, capitalize_arn=False) @@ -412,13 +441,13 @@ class ConfigAggregationAuthorization(ConfigEmptyDictable): class OrganizationConformancePack(ConfigEmptyDictable): def __init__( self, - account_id, - region, - name, - delivery_s3_bucket, - delivery_s3_key_prefix=None, - input_parameters=None, - excluded_accounts=None, + account_id: str, + region: str, + name: str, + delivery_s3_bucket: str, + delivery_s3_key_prefix: Optional[str] = None, + input_parameters: Optional[List[Dict[str, Any]]] = None, + excluded_accounts: Optional[List[str]] = None, ): super().__init__(capitalize_start=True, capitalize_arn=False) @@ -435,11 +464,11 @@ class OrganizationConformancePack(ConfigEmptyDictable): def update( self, - delivery_s3_bucket, - delivery_s3_key_prefix, - input_parameters, - excluded_accounts, - ): + delivery_s3_bucket: str, + delivery_s3_key_prefix: str, + input_parameters: List[Dict[str, Any]], + excluded_accounts: List[str], + ) -> None: self._status = "UPDATE_SUCCESSFUL" self.conformance_pack_input_parameters = input_parameters @@ -464,10 +493,10 @@ class Scope(ConfigEmptyDictable): def __init__( self, - compliance_resource_types=None, - tag_key=None, - tag_value=None, - compliance_resource_id=None, + compliance_resource_types: Optional[List[str]] = None, + tag_key: Optional[str] = None, + tag_value: Optional[str] = None, + compliance_resource_id: Optional[str] = None, ): super().__init__(capitalize_start=True, capitalize_arn=False) self.tags = None @@ -489,7 +518,7 @@ class Scope(ConfigEmptyDictable): "Scope cannot be applied to both resource and tag" ) - if compliance_resource_id and len(compliance_resource_types) != 1: + if compliance_resource_id and len(compliance_resource_types) != 1: # type: ignore[arg-type] raise InvalidParameterValueException( "A single resourceType should be provided when resourceId " "is provided in scope" @@ -522,7 +551,10 @@ class SourceDetail(ConfigEmptyDictable): EVENT_SOURCES = ["aws.config"] def __init__( - self, event_source=None, message_type=None, maximum_execution_frequency=None + self, + event_source: Optional[str] = None, + message_type: Optional[str] = None, + maximum_execution_frequency: Optional[str] = None, ): super().__init__(capitalize_start=True, capitalize_arn=False) @@ -599,7 +631,12 @@ class Source(ConfigEmptyDictable): OWNERS = {"AWS", "CUSTOM_LAMBDA"} def __init__( - self, account_id, region, owner, source_identifier, source_details=None + self, + account_id: str, + region: str, + owner: str, + source_identifier: str, + source_details: Optional[SourceDetail] = None, ): super().__init__(capitalize_start=True, capitalize_arn=False) if owner not in Source.OWNERS: @@ -651,7 +688,7 @@ class Source(ConfigEmptyDictable): ) details = [] - for detail in source_details: + for detail in source_details: # type: ignore[attr-defined] detail_dict = convert_to_class_args(detail) details.append(SourceDetail(**detail_dict)) @@ -659,7 +696,7 @@ class Source(ConfigEmptyDictable): self.owner = owner self.source_identifier = source_identifier - def to_dict(self): + def to_dict(self) -> Dict[str, Any]: """Format the SourceDetails properly.""" result = super().to_dict() if self.source_details: @@ -678,7 +715,13 @@ class ConfigRule(ConfigEmptyDictable): MAX_RULES = 150 RULE_STATES = {"ACTIVE", "DELETING", "DELETING_RESULTS", "EVALUATING"} - def __init__(self, account_id, region, config_rule, tags): + def __init__( + self, + account_id: str, + region: str, + config_rule: Dict[str, Any], + tags: Dict[str, str], + ): super().__init__(capitalize_start=True, capitalize_arn=False) self.account_id = account_id self.config_rule_name = config_rule.get("ConfigRuleName") @@ -697,7 +740,9 @@ class ConfigRule(ConfigEmptyDictable): f"arn:aws:config:{region}:{account_id}:config-rule/{self.config_rule_id}" ) - def modify_fields(self, region, config_rule, tags): + def modify_fields( + self, region: str, config_rule: Dict[str, Any], tags: Dict[str, str] + ) -> None: """Initialize or update ConfigRule fields.""" self.config_rule_state = config_rule.get("ConfigRuleState", "ACTIVE") if self.config_rule_state not in ConfigRule.RULE_STATES: @@ -787,23 +832,22 @@ class ConfigRule(ConfigEmptyDictable): self.last_updated_time = datetime2int(datetime.utcnow()) self.tags = tags - def validate_managed_rule(self): + def validate_managed_rule(self) -> None: """Validate parameters specific to managed rules.""" - rule_info = MANAGED_RULES_CONSTRAINTS[self.source.source_identifier] + rule_info = MANAGED_RULES_CONSTRAINTS[self.source.source_identifier] # type: ignore[index] param_names = self.input_parameters_dict.keys() # Verify input parameter names are actual parameters for the rule ID. if param_names: - allowed_names = {x["Name"] for x in rule_info["Parameters"]} + allowed_names = {x["Name"] for x in rule_info["Parameters"]} # type: ignore[index] if not set(param_names).issubset(allowed_names): raise InvalidParameterValueException( - "Unknown parameters provided in the inputParameters: " - + self.input_parameters + f"Unknown parameters provided in the inputParameters: {self.input_parameters}" ) # Verify all the required parameters are specified. required_names = { - x["Name"] for x in rule_info["Parameters"] if not x["Optional"] + x["Name"] for x in rule_info["Parameters"] if not x["Optional"] # type: ignore[index] } diffs = required_names.difference(set(param_names)) if diffs: @@ -849,24 +893,24 @@ class ConfigRule(ConfigEmptyDictable): class ConfigBackend(BaseBackend): - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.recorders = {} - self.delivery_channels = {} - self.config_aggregators = {} - self.aggregation_authorizations = {} - self.organization_conformance_packs = {} - self.config_rules = {} - self.config_schema = None + self.recorders: Dict[str, ConfigRecorder] = {} + self.delivery_channels: Dict[str, ConfigDeliveryChannel] = {} + self.config_aggregators: Dict[str, ConfigAggregator] = {} + self.aggregation_authorizations: Dict[str, ConfigAggregationAuthorization] = {} + self.organization_conformance_packs: Dict[str, OrganizationConformancePack] = {} + self.config_rules: Dict[str, ConfigRule] = {} + self.config_schema: Optional[AWSServiceSpec] = None @staticmethod - def default_vpc_endpoint_service(service_region, zones): + def default_vpc_endpoint_service(service_region: str, zones: List[str]) -> List[Dict[str, Any]]: # type: ignore[misc] """List of dicts representing default VPC endpoints for this service.""" return BaseBackend.default_vpc_endpoint_service_factory( service_region, zones, "config" ) - def _validate_resource_types(self, resource_list): + def _validate_resource_types(self, resource_list: List[str]) -> None: if not self.config_schema: self.config_schema = AWSServiceSpec( path="data/config/2014-11-12/service-2.json" @@ -875,15 +919,17 @@ class ConfigBackend(BaseBackend): # Verify that each entry exists in the supported list: bad_list = [] for resource in resource_list: - if resource not in self.config_schema.shapes["ResourceType"]["enum"]: + if resource not in self.config_schema.shapes["ResourceType"]["enum"]: # type: ignore[index] bad_list.append(resource) if bad_list: raise InvalidResourceTypeException( - bad_list, self.config_schema.shapes["ResourceType"]["enum"] + bad_list, self.config_schema.shapes["ResourceType"]["enum"] # type: ignore[index] ) - def _validate_delivery_snapshot_properties(self, properties): + def _validate_delivery_snapshot_properties( + self, properties: Dict[str, Any] + ) -> None: if not self.config_schema: self.config_schema = AWSServiceSpec( path="data/config/2014-11-12/service-2.json" @@ -892,23 +938,23 @@ class ConfigBackend(BaseBackend): # Verify that the deliveryFrequency is set to an acceptable value: if ( properties.get("deliveryFrequency", None) - not in self.config_schema.shapes["MaximumExecutionFrequency"]["enum"] + not in self.config_schema.shapes["MaximumExecutionFrequency"]["enum"] # type: ignore[index] ): raise InvalidDeliveryFrequency( properties.get("deliveryFrequency", None), - self.config_schema.shapes["MaximumExecutionFrequency"]["enum"], + self.config_schema.shapes["MaximumExecutionFrequency"]["enum"], # type: ignore[index] ) - def put_configuration_aggregator(self, config_aggregator): + def put_configuration_aggregator( + self, config_aggregator: Dict[str, Any] + ) -> Dict[str, Any]: # Validate the name: - if len(config_aggregator["ConfigurationAggregatorName"]) > 256: - raise NameTooLongException( - config_aggregator["ConfigurationAggregatorName"], - "configurationAggregatorName", - ) + config_aggr_name = config_aggregator["ConfigurationAggregatorName"] + if len(config_aggr_name) > 256: + raise NameTooLongException(config_aggr_name, "configurationAggregatorName") - account_sources = None - org_source = None + account_sources: Optional[List[AccountAggregatorSource]] = None + org_source: Optional[OrganizationAggregationSource] = None # Tag validation: tags = validate_tags(config_aggregator.get("Tags", [])) @@ -965,25 +1011,19 @@ class ConfigBackend(BaseBackend): ) # Grab the existing one if it exists and update it: - if not self.config_aggregators.get( - config_aggregator["ConfigurationAggregatorName"] - ): + if not self.config_aggregators.get(config_aggr_name): aggregator = ConfigAggregator( - config_aggregator["ConfigurationAggregatorName"], + config_aggr_name, account_id=self.account_id, region=self.region_name, account_sources=account_sources, org_source=org_source, tags=tags, ) - self.config_aggregators[ - config_aggregator["ConfigurationAggregatorName"] - ] = aggregator + self.config_aggregators[config_aggr_name] = aggregator else: - aggregator = self.config_aggregators[ - config_aggregator["ConfigurationAggregatorName"] - ] + aggregator = self.config_aggregators[config_aggr_name] aggregator.tags = tags aggregator.account_aggregation_sources = account_sources aggregator.organization_aggregation_source = org_source @@ -991,10 +1031,12 @@ class ConfigBackend(BaseBackend): return aggregator.to_dict() - def describe_configuration_aggregators(self, names, token, limit): + def describe_configuration_aggregators( + self, names: List[str], token: str, limit: Optional[int] + ) -> Dict[str, Any]: limit = DEFAULT_PAGE_SIZE if not limit or limit < 0 else limit agg_list = [] - result = {"ConfigurationAggregators": []} + result: Dict[str, Any] = {"ConfigurationAggregators": []} if names: for name in names: @@ -1034,17 +1076,20 @@ class ConfigBackend(BaseBackend): return result - def delete_configuration_aggregator(self, config_aggregator): + def delete_configuration_aggregator(self, config_aggregator: str) -> None: if not self.config_aggregators.get(config_aggregator): raise NoSuchConfigurationAggregatorException() del self.config_aggregators[config_aggregator] def put_aggregation_authorization( - self, authorized_account, authorized_region, tags - ): + self, + authorized_account: str, + authorized_region: str, + tags: List[Dict[str, str]], + ) -> Dict[str, Any]: # Tag validation: - tags = validate_tags(tags or []) + tag_dict = validate_tags(tags or []) # Does this already exist? key = "{}/{}".format(authorized_account, authorized_region) @@ -1055,20 +1100,22 @@ class ConfigBackend(BaseBackend): self.region_name, authorized_account, authorized_region, - tags=tags, + tags=tag_dict, ) self.aggregation_authorizations[ "{}/{}".format(authorized_account, authorized_region) ] = agg_auth else: # Only update the tags: - agg_auth.tags = tags + agg_auth.tags = tag_dict return agg_auth.to_dict() - def describe_aggregation_authorizations(self, token, limit): + def describe_aggregation_authorizations( + self, token: Optional[str], limit: Optional[int] + ) -> Dict[str, Any]: limit = DEFAULT_PAGE_SIZE if not limit or limit < 0 else limit - result = {"AggregationAuthorizations": []} + result: Dict[str, Any] = {"AggregationAuthorizations": []} if not self.aggregation_authorizations: return result @@ -1097,19 +1144,21 @@ class ConfigBackend(BaseBackend): return result - def delete_aggregation_authorization(self, authorized_account, authorized_region): + def delete_aggregation_authorization( + self, authorized_account: str, authorized_region: str + ) -> None: # This will always return a 200 -- regardless if there is or isn't an existing # aggregation authorization. key = "{}/{}".format(authorized_account, authorized_region) self.aggregation_authorizations.pop(key, None) - def put_configuration_recorder(self, config_recorder): + def put_configuration_recorder(self, config_recorder: Dict[str, Any]) -> None: # Validate the name: if not config_recorder.get("name"): raise InvalidConfigurationRecorderNameException(config_recorder.get("name")) - if len(config_recorder.get("name")) > 256: + if len(config_recorder["name"]) > 256: raise NameTooLongException( - config_recorder.get("name"), "configurationRecorder.name" + config_recorder["name"], "configurationRecorder.name" ) # We're going to assume that the passed in Role ARN is correct. @@ -1164,8 +1213,10 @@ class ConfigBackend(BaseBackend): status=recorder_status, ) - def describe_configuration_recorders(self, recorder_names): - recorders = [] + def describe_configuration_recorders( + self, recorder_names: Optional[List[str]] + ) -> List[Dict[str, Any]]: + recorders: List[Dict[str, Any]] = [] if recorder_names: for rname in recorder_names: @@ -1181,8 +1232,10 @@ class ConfigBackend(BaseBackend): return recorders - def describe_configuration_recorder_status(self, recorder_names): - recorders = [] + def describe_configuration_recorder_status( + self, recorder_names: List[str] + ) -> List[Dict[str, Any]]: + recorders: List[Dict[str, Any]] = [] if recorder_names: for rname in recorder_names: @@ -1198,7 +1251,7 @@ class ConfigBackend(BaseBackend): return recorders - def put_delivery_channel(self, delivery_channel): + def put_delivery_channel(self, delivery_channel: Dict[str, Any]) -> None: # Must have a configuration recorder: if not self.recorders: raise NoAvailableConfigurationRecorderException() @@ -1206,10 +1259,8 @@ class ConfigBackend(BaseBackend): # Validate the name: if not delivery_channel.get("name"): raise InvalidDeliveryChannelNameException(delivery_channel.get("name")) - if len(delivery_channel.get("name")) > 256: - raise NameTooLongException( - delivery_channel.get("name"), "deliveryChannel.name" - ) + if len(delivery_channel["name"]) > 256: + raise NameTooLongException(delivery_channel["name"], "deliveryChannel.name") # We are going to assume that the bucket exists -- but will verify if # the bucket provided is blank: @@ -1256,8 +1307,10 @@ class ConfigBackend(BaseBackend): snapshot_properties=dprop, ) - def describe_delivery_channels(self, channel_names): - channels = [] + def describe_delivery_channels( + self, channel_names: List[str] + ) -> List[Dict[str, Any]]: + channels: List[Dict[str, Any]] = [] if channel_names: for cname in channel_names: @@ -1273,7 +1326,7 @@ class ConfigBackend(BaseBackend): return channels - def start_configuration_recorder(self, recorder_name): + def start_configuration_recorder(self, recorder_name: str) -> None: if not self.recorders.get(recorder_name): raise NoSuchConfigurationRecorderException(recorder_name) @@ -1284,20 +1337,20 @@ class ConfigBackend(BaseBackend): # Start recording: self.recorders[recorder_name].status.start() - def stop_configuration_recorder(self, recorder_name): + def stop_configuration_recorder(self, recorder_name: str) -> None: if not self.recorders.get(recorder_name): raise NoSuchConfigurationRecorderException(recorder_name) # Stop recording: self.recorders[recorder_name].status.stop() - def delete_configuration_recorder(self, recorder_name): + def delete_configuration_recorder(self, recorder_name: str) -> None: if not self.recorders.get(recorder_name): raise NoSuchConfigurationRecorderException(recorder_name) del self.recorders[recorder_name] - def delete_delivery_channel(self, channel_name): + def delete_delivery_channel(self, channel_name: str) -> None: if not self.delivery_channels.get(channel_name): raise NoSuchDeliveryChannelException(channel_name) @@ -1310,13 +1363,13 @@ class ConfigBackend(BaseBackend): def list_discovered_resources( self, - resource_type, - backend_region, - resource_ids, - resource_name, - limit, - next_token, - ): + resource_type: str, + backend_region: str, + resource_ids: List[str], + resource_name: str, + limit: int, + next_token: str, + ) -> Dict[str, Any]: """Queries against AWS Config (non-aggregated) listing function. The listing function must exist for the resource backend. @@ -1392,8 +1445,13 @@ class ConfigBackend(BaseBackend): return result def list_aggregate_discovered_resources( - self, aggregator_name, resource_type, filters, limit, next_token - ): + self, + aggregator_name: str, + resource_type: str, + filters: Dict[str, str], + limit: Optional[int], + next_token: Optional[str], + ) -> Dict[str, Any]: """Queries AWS Config listing function that must exist for resource backend. As far a moto goes -- the only real difference between this function @@ -1453,14 +1511,16 @@ class ConfigBackend(BaseBackend): resource_identifiers.append(item) - result = {"ResourceIdentifiers": resource_identifiers} + result: Dict[str, Any] = {"ResourceIdentifiers": resource_identifiers} if new_token: result["NextToken"] = new_token return result - def get_resource_config_history(self, resource_type, resource_id, backend_region): + def get_resource_config_history( + self, resource_type: str, resource_id: str, backend_region: str + ) -> Dict[str, Any]: """Returns configuration of resource for the current regional backend. Item returned in AWS Config format. @@ -1502,7 +1562,9 @@ class ConfigBackend(BaseBackend): return {"configurationItems": [item]} - def batch_get_resource_config(self, resource_keys, backend_region): + def batch_get_resource_config( + self, resource_keys: List[Dict[str, str]], backend_region: str + ) -> Dict[str, Any]: """Returns configuration of resource for the current regional backend. Item is returned in AWS Config format. @@ -1562,8 +1624,8 @@ class ConfigBackend(BaseBackend): } # At this time, moto is not adding unprocessed items. def batch_get_aggregate_resource_config( - self, aggregator_name, resource_identifiers - ): + self, aggregator_name: str, resource_identifiers: List[Dict[str, str]] + ) -> Dict[str, Any]: """Returns configuration of resource for current regional backend. Item is returned in AWS Config format. @@ -1621,7 +1683,12 @@ class ConfigBackend(BaseBackend): "UnprocessedResourceIdentifiers": not_found, } - def put_evaluations(self, evaluations=None, result_token=None, test_mode=False): + def put_evaluations( + self, + evaluations: Optional[List[Dict[str, Any]]] = None, + result_token: Optional[str] = None, + test_mode: Optional[bool] = False, + ) -> Dict[str, List[Any]]: if not evaluations: raise InvalidParameterValueException( "The Evaluations object in your request cannot be null." @@ -1644,14 +1711,14 @@ class ConfigBackend(BaseBackend): def put_organization_conformance_pack( self, - name, - template_s3_uri, - template_body, - delivery_s3_bucket, - delivery_s3_key_prefix, - input_parameters, - excluded_accounts, - ): + name: str, + template_s3_uri: str, + template_body: str, + delivery_s3_bucket: str, + delivery_s3_key_prefix: str, + input_parameters: List[Dict[str, str]], + excluded_accounts: List[str], + ) -> Dict[str, Any]: # a real validation of the content of the template is missing at the moment if not template_s3_uri and not template_body: raise ValidationException("Template body is invalid") @@ -1690,7 +1757,9 @@ class ConfigBackend(BaseBackend): "OrganizationConformancePackArn": pack.organization_conformance_pack_arn } - def describe_organization_conformance_packs(self, names): + def describe_organization_conformance_packs( + self, names: List[str] + ) -> Dict[str, Any]: packs = [] for name in names: @@ -1707,7 +1776,9 @@ class ConfigBackend(BaseBackend): return {"OrganizationConformancePacks": packs} - def describe_organization_conformance_pack_statuses(self, names): + def describe_organization_conformance_pack_statuses( + self, names: List[str] + ) -> Dict[str, Any]: packs = [] statuses = [] @@ -1737,7 +1808,9 @@ class ConfigBackend(BaseBackend): return {"OrganizationConformancePackStatuses": statuses} - def get_organization_conformance_pack_detailed_status(self, name): + def get_organization_conformance_pack_detailed_status( + self, name: str + ) -> Dict[str, Any]: pack = self.organization_conformance_packs.get(name) if not pack: @@ -1760,7 +1833,7 @@ class ConfigBackend(BaseBackend): return {"OrganizationConformancePackDetailedStatuses": statuses} - def delete_organization_conformance_pack(self, name): + def delete_organization_conformance_pack(self, name: str) -> None: pack = self.organization_conformance_packs.get(name) if not pack: @@ -1771,28 +1844,24 @@ class ConfigBackend(BaseBackend): self.organization_conformance_packs.pop(name) - def _match_arn(self, resource_arn): + def _match_arn( + self, resource_arn: str + ) -> Union[ConfigRule, ConfigAggregator, ConfigAggregationAuthorization]: """Return config instance that has a matching ARN.""" # The allowed resources are ConfigRule, ConfigurationAggregator, # and AggregatorAuthorization. - allowed_resources = [ - { - "configs": self.config_aggregators, - "arn_attribute": "configuration_aggregator_arn", - }, - { - "configs": self.aggregation_authorizations, - "arn_attribute": "aggregation_authorization_arn", - }, - {"configs": self.config_rules, "arn_attribute": "config_rule_arn"}, - ] + allowed_resources = { + "configuration_aggregator_arn": self.config_aggregators, + "aggregation_authorization_arn": self.aggregation_authorizations, + "config_rule_arn": self.config_rules, + } # Find matching config for given resource_arn among all the # allowed config resources. matched_config = None - for resource in allowed_resources: - for config in resource["configs"].values(): - if resource_arn == getattr(config, resource["arn_attribute"]): + for arn_attribute, configs in allowed_resources.items(): + for config in configs.values(): # type: ignore[attr-defined] + if resource_arn == getattr(config, arn_attribute): matched_config = config break @@ -1800,18 +1869,18 @@ class ConfigBackend(BaseBackend): raise ResourceNotFoundException(resource_arn) return matched_config - def tag_resource(self, resource_arn, tags): + def tag_resource(self, resource_arn: str, tags: List[Dict[str, str]]) -> None: """Add tags in config with a matching ARN.""" # Tag validation: - tags = validate_tags(tags) + tag_dict = validate_tags(tags) # Find config with a matching ARN. matched_config = self._match_arn(resource_arn) # Merge the new tags with the existing tags. - matched_config.tags.update(tags) + matched_config.tags.update(tag_dict) - def untag_resource(self, resource_arn, tag_keys): + def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None: """Remove tags in config with a matching ARN. If the tags in the tag_keys don't match any keys for that @@ -1827,8 +1896,8 @@ class ConfigBackend(BaseBackend): matched_config.tags.pop(tag_key, None) def list_tags_for_resource( - self, resource_arn, limit, next_token - ): # pylint: disable=unused-argument + self, resource_arn: str, limit: int + ) -> Dict[str, List[Dict[str, str]]]: """Return list of tags for AWS Config resource.""" # The limit argument is essentially ignored as a config instance # can only have 50 tags, but we'll check the argument anyway. @@ -1845,7 +1914,9 @@ class ConfigBackend(BaseBackend): ] } - def put_config_rule(self, config_rule, tags=None): + def put_config_rule( + self, config_rule: Dict[str, Any], tags: Optional[List[Dict[str, str]]] = None + ) -> str: """Add/Update config rule for evaluating resource compliance. TBD - Only the "accounting" of config rules are handled at the @@ -1880,7 +1951,7 @@ class ConfigBackend(BaseBackend): "Name or Id or Arn" ) - tags = validate_tags(tags or []) + tag_dict = validate_tags(tags or []) # With the rule_name, determine whether it's for an existing rule # or whether a new rule should be created. @@ -1896,20 +1967,22 @@ class ConfigBackend(BaseBackend): ) # Update the current rule. - rule.modify_fields(self.region_name, config_rule, tags) + rule.modify_fields(self.region_name, config_rule, tag_dict) else: # Create a new ConfigRule if the limit hasn't been reached. if len(self.config_rules) == ConfigRule.MAX_RULES: raise MaxNumberOfConfigRulesExceededException( rule_name, ConfigRule.MAX_RULES ) - rule = ConfigRule(self.account_id, self.region_name, config_rule, tags) + rule = ConfigRule(self.account_id, self.region_name, config_rule, tag_dict) self.config_rules[rule_name] = rule return "" - def describe_config_rules(self, config_rule_names, next_token): + def describe_config_rules( + self, config_rule_names: Optional[List[str]], next_token: Optional[str] + ) -> Dict[str, Any]: """Return details for the given ConfigRule names or for all rules.""" - result = {"ConfigRules": []} + result: Dict[str, Any] = {"ConfigRules": []} if not self.config_rules: return result @@ -1937,7 +2010,7 @@ class ConfigBackend(BaseBackend): result["NextToken"] = sorted_rules[start + CONFIG_RULE_PAGE_SIZE] return result - def delete_config_rule(self, rule_name): + def delete_config_rule(self, rule_name: str) -> None: """Delete config rule used for evaluating resource compliance.""" rule = self.config_rules.get(rule_name) if not rule: diff --git a/moto/config/responses.py b/moto/config/responses.py index cbe6a97ee..12119d26f 100644 --- a/moto/config/responses.py +++ b/moto/config/responses.py @@ -1,30 +1,30 @@ import json from moto.core.responses import BaseResponse -from .models import config_backends +from .models import config_backends, ConfigBackend class ConfigResponse(BaseResponse): - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="config") @property - def config_backend(self): + def config_backend(self) -> ConfigBackend: return config_backends[self.current_account][self.region] - def put_configuration_recorder(self): + def put_configuration_recorder(self) -> str: self.config_backend.put_configuration_recorder( self._get_param("ConfigurationRecorder") ) return "" - def put_configuration_aggregator(self): + def put_configuration_aggregator(self) -> str: aggregator = self.config_backend.put_configuration_aggregator( json.loads(self.body) ) schema = {"ConfigurationAggregator": aggregator} return json.dumps(schema) - def describe_configuration_aggregators(self): + def describe_configuration_aggregators(self) -> str: aggregators = self.config_backend.describe_configuration_aggregators( self._get_param("ConfigurationAggregatorNames"), self._get_param("NextToken"), @@ -32,13 +32,13 @@ class ConfigResponse(BaseResponse): ) return json.dumps(aggregators) - def delete_configuration_aggregator(self): + def delete_configuration_aggregator(self) -> str: self.config_backend.delete_configuration_aggregator( self._get_param("ConfigurationAggregatorName") ) return "" - def put_aggregation_authorization(self): + def put_aggregation_authorization(self) -> str: agg_auth = self.config_backend.put_aggregation_authorization( self._get_param("AuthorizedAccountId"), self._get_param("AuthorizedAwsRegion"), @@ -47,14 +47,14 @@ class ConfigResponse(BaseResponse): schema = {"AggregationAuthorization": agg_auth} return json.dumps(schema) - def describe_aggregation_authorizations(self): + def describe_aggregation_authorizations(self) -> str: authorizations = self.config_backend.describe_aggregation_authorizations( self._get_param("NextToken"), self._get_param("Limit") ) return json.dumps(authorizations) - def delete_aggregation_authorization(self): + def delete_aggregation_authorization(self) -> str: self.config_backend.delete_aggregation_authorization( self._get_param("AuthorizedAccountId"), self._get_param("AuthorizedAwsRegion"), @@ -62,59 +62,59 @@ class ConfigResponse(BaseResponse): return "" - def describe_configuration_recorders(self): + def describe_configuration_recorders(self) -> str: recorders = self.config_backend.describe_configuration_recorders( self._get_param("ConfigurationRecorderNames") ) schema = {"ConfigurationRecorders": recorders} return json.dumps(schema) - def describe_configuration_recorder_status(self): + def describe_configuration_recorder_status(self) -> str: recorder_statuses = self.config_backend.describe_configuration_recorder_status( self._get_param("ConfigurationRecorderNames") ) schema = {"ConfigurationRecordersStatus": recorder_statuses} return json.dumps(schema) - def put_delivery_channel(self): + def put_delivery_channel(self) -> str: self.config_backend.put_delivery_channel(self._get_param("DeliveryChannel")) return "" - def describe_delivery_channels(self): + def describe_delivery_channels(self) -> str: delivery_channels = self.config_backend.describe_delivery_channels( self._get_param("DeliveryChannelNames") ) schema = {"DeliveryChannels": delivery_channels} return json.dumps(schema) - def describe_delivery_channel_status(self): + def describe_delivery_channel_status(self) -> str: raise NotImplementedError() - def delete_delivery_channel(self): + def delete_delivery_channel(self) -> str: self.config_backend.delete_delivery_channel( self._get_param("DeliveryChannelName") ) return "" - def delete_configuration_recorder(self): + def delete_configuration_recorder(self) -> str: self.config_backend.delete_configuration_recorder( self._get_param("ConfigurationRecorderName") ) return "" - def start_configuration_recorder(self): + def start_configuration_recorder(self) -> str: self.config_backend.start_configuration_recorder( self._get_param("ConfigurationRecorderName") ) return "" - def stop_configuration_recorder(self): + def stop_configuration_recorder(self) -> str: self.config_backend.stop_configuration_recorder( self._get_param("ConfigurationRecorderName") ) return "" - def list_discovered_resources(self): + def list_discovered_resources(self) -> str: schema = self.config_backend.list_discovered_resources( self._get_param("resourceType"), self.region, @@ -125,7 +125,7 @@ class ConfigResponse(BaseResponse): ) return json.dumps(schema) - def list_aggregate_discovered_resources(self): + def list_aggregate_discovered_resources(self) -> str: schema = self.config_backend.list_aggregate_discovered_resources( self._get_param("ConfigurationAggregatorName"), self._get_param("ResourceType"), @@ -135,34 +135,32 @@ class ConfigResponse(BaseResponse): ) return json.dumps(schema) - def list_tags_for_resource(self): + def list_tags_for_resource(self) -> str: schema = self.config_backend.list_tags_for_resource( - self._get_param("ResourceArn"), - self._get_param("Limit"), - self._get_param("NextToken"), + self._get_param("ResourceArn"), self._get_param("Limit") ) return json.dumps(schema) - def get_resource_config_history(self): + def get_resource_config_history(self) -> str: schema = self.config_backend.get_resource_config_history( self._get_param("resourceType"), self._get_param("resourceId"), self.region ) return json.dumps(schema) - def batch_get_resource_config(self): + def batch_get_resource_config(self) -> str: schema = self.config_backend.batch_get_resource_config( self._get_param("resourceKeys"), self.region ) return json.dumps(schema) - def batch_get_aggregate_resource_config(self): + def batch_get_aggregate_resource_config(self) -> str: schema = self.config_backend.batch_get_aggregate_resource_config( self._get_param("ConfigurationAggregatorName"), self._get_param("ResourceIdentifiers"), ) return json.dumps(schema) - def put_evaluations(self): + def put_evaluations(self) -> str: evaluations = self.config_backend.put_evaluations( self._get_param("Evaluations"), self._get_param("ResultToken"), @@ -170,7 +168,7 @@ class ConfigResponse(BaseResponse): ) return json.dumps(evaluations) - def put_organization_conformance_pack(self): + def put_organization_conformance_pack(self) -> str: conformance_pack = self.config_backend.put_organization_conformance_pack( name=self._get_param("OrganizationConformancePackName"), template_s3_uri=self._get_param("TemplateS3Uri"), @@ -183,19 +181,19 @@ class ConfigResponse(BaseResponse): return json.dumps(conformance_pack) - def describe_organization_conformance_packs(self): + def describe_organization_conformance_packs(self) -> str: conformance_packs = self.config_backend.describe_organization_conformance_packs( self._get_param("OrganizationConformancePackNames") ) return json.dumps(conformance_packs) - def describe_organization_conformance_pack_statuses(self): + def describe_organization_conformance_pack_statuses(self) -> str: statuses = self.config_backend.describe_organization_conformance_pack_statuses( self._get_param("OrganizationConformancePackNames") ) return json.dumps(statuses) - def get_organization_conformance_pack_detailed_status(self): + def get_organization_conformance_pack_detailed_status(self) -> str: # 'Filters' parameter is not implemented yet statuses = ( self.config_backend.get_organization_conformance_pack_detailed_status( @@ -204,36 +202,36 @@ class ConfigResponse(BaseResponse): ) return json.dumps(statuses) - def delete_organization_conformance_pack(self): + def delete_organization_conformance_pack(self) -> str: self.config_backend.delete_organization_conformance_pack( self._get_param("OrganizationConformancePackName") ) return "" - def tag_resource(self): + def tag_resource(self) -> str: self.config_backend.tag_resource( self._get_param("ResourceArn"), self._get_param("Tags") ) return "" - def untag_resource(self): + def untag_resource(self) -> str: self.config_backend.untag_resource( self._get_param("ResourceArn"), self._get_param("TagKeys") ) return "" - def put_config_rule(self): + def put_config_rule(self) -> str: self.config_backend.put_config_rule( self._get_param("ConfigRule"), self._get_param("Tags") ) return "" - def describe_config_rules(self): + def describe_config_rules(self) -> str: rules = self.config_backend.describe_config_rules( self._get_param("ConfigRuleNames"), self._get_param("NextToken") ) return json.dumps(rules) - def delete_config_rule(self): + def delete_config_rule(self) -> str: self.config_backend.delete_config_rule(self._get_param("ConfigRuleName")) return "" diff --git a/moto/core/common_models.py b/moto/core/common_models.py index 6d85627dd..8c875aad7 100644 --- a/moto/core/common_models.py +++ b/moto/core/common_models.py @@ -1,5 +1,5 @@ from abc import abstractmethod -from typing import Any, Dict +from typing import Any, Dict, List, Optional from .base_backend import InstanceTrackerMeta @@ -98,14 +98,14 @@ class ConfigQueryModel: def list_config_service_resources( self, - account_id, - resource_ids, - resource_name, - limit, - next_token, - backend_region=None, - resource_region=None, - aggregator=None, + account_id: str, + resource_ids: Optional[List[str]], + resource_name: Optional[str], + limit: int, + next_token: Optional[str], + backend_region: Optional[str] = None, + resource_region: Optional[str] = None, + aggregator: Optional[Dict[str, Any]] = None, ): """For AWS Config. This will list all of the resources of the given type and optional resource name and region. @@ -158,12 +158,12 @@ class ConfigQueryModel: def get_config_resource( self, - account_id, - resource_id, - resource_name=None, - backend_region=None, - resource_region=None, - ): + account_id: str, + resource_id: str, + resource_name: Optional[str] = None, + backend_region: Optional[str] = None, + resource_region: Optional[str] = None, + ) -> Dict[str, Any]: """For AWS Config. This will query the backend for the specific resource type configuration. This supports both aggregated, and non-aggregated fetching -- for batched fetching -- the Config batching requests diff --git a/moto/core/responses.py b/moto/core/responses.py index 7d4b7fe7d..76c42908e 100644 --- a/moto/core/responses.py +++ b/moto/core/responses.py @@ -870,7 +870,7 @@ class AWSServiceSpec(object): """ - def __init__(self, path): + def __init__(self, path: str): spec = load_resource("botocore", path) self.metadata = spec["metadata"] diff --git a/moto/ec2/models/instance_types.py b/moto/ec2/models/instance_types.py index 90b9e8e2d..fd081a15d 100644 --- a/moto/ec2/models/instance_types.py +++ b/moto/ec2/models/instance_types.py @@ -1,12 +1,14 @@ import pathlib - +from typing import Any, Dict from os import listdir from ..utils import generic_filter from moto.utilities.utils import load_resource from ..exceptions import InvalidFilter, InvalidInstanceTypeError -INSTANCE_TYPES = load_resource(__name__, "../resources/instance_types.json") +INSTANCE_TYPES: Dict[str, Any] = load_resource( + __name__, "../resources/instance_types.json" +) INSTANCE_FAMILIES = list(set([i.split(".")[0] for i in INSTANCE_TYPES.keys()])) root = pathlib.Path(__file__).parent diff --git a/moto/utilities/utils.py b/moto/utilities/utils.py index d010daa5e..f0d5118b0 100644 --- a/moto/utilities/utils.py +++ b/moto/utilities/utils.py @@ -3,7 +3,7 @@ import hashlib import pkgutil from collections.abc import MutableMapping -from typing import Any, Dict +from typing import Any, Dict, Union def str2bool(v): @@ -13,7 +13,9 @@ def str2bool(v): return False -def load_resource(package, resource, as_json=True): +def load_resource( + package: str, resource: str, as_json: bool = True +) -> Union[Dict[str, Any], str]: """ Open a file, and return the contents as JSON. Usage: diff --git a/setup.cfg b/setup.cfg index e8ecd1c41..a2a8fa254 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,7 +18,7 @@ disable = W,C,R,E enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import [mypy] -files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild,moto/cloudwatch,moto/codepipeline,moto/codecommit,moto/cognito*,moto/comprehend +files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild,moto/cloudwatch,moto/codepipeline,moto/codecommit,moto/cognito*,moto/comprehend,moto/config show_column_numbers=True show_error_codes = True disable_error_code=abstract