TechDebt: MyPy Config (#5633)

This commit is contained in:
Bert Blommers 2022-11-03 18:32:00 -01:00 committed by GitHub
parent 79aaf2bc03
commit 2500affb1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 358 additions and 282 deletions

View File

@ -1,10 +1,11 @@
from moto.core.exceptions import JsonRESTError
from typing import Any, List, Optional
class NameTooLongException(JsonRESTError):
code = 400
def __init__(self, name, location, max_limit=256):
def __init__(self, name: str, location: str, max_limit: int = 256):
message = (
f"1 validation error detected: Value '{name}' at '{location}' "
f"failed to satisfy constraint: Member must have length less "
@ -16,7 +17,7 @@ class NameTooLongException(JsonRESTError):
class InvalidConfigurationRecorderNameException(JsonRESTError):
code = 400
def __init__(self, name):
def __init__(self, name: Optional[str]):
message = (
f"The configuration recorder name '{name}' is not valid, blank string."
)
@ -26,7 +27,7 @@ class InvalidConfigurationRecorderNameException(JsonRESTError):
class MaxNumberOfConfigurationRecordersExceededException(JsonRESTError):
code = 400
def __init__(self, name):
def __init__(self, name: str):
message = (
f"Failed to put configuration recorder '{name}' because the maximum number of "
"configuration recorders: 1 is reached."
@ -37,7 +38,7 @@ class MaxNumberOfConfigurationRecordersExceededException(JsonRESTError):
class InvalidRecordingGroupException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
message = "The recording group provided is not valid"
super().__init__("InvalidRecordingGroupException", message)
@ -45,7 +46,7 @@ class InvalidRecordingGroupException(JsonRESTError):
class InvalidResourceTypeException(JsonRESTError):
code = 400
def __init__(self, bad_list, good_list):
def __init__(self, bad_list: List[str], good_list: Any):
message = (
f"{len(bad_list)} validation error detected: Value '{bad_list}' at "
"'configurationRecorder.recordingGroup.resourceTypes' failed to satisfy constraint: "
@ -58,7 +59,7 @@ class InvalidResourceTypeException(JsonRESTError):
class NoSuchConfigurationAggregatorException(JsonRESTError):
code = 400
def __init__(self, number=1):
def __init__(self, number: int = 1):
if number == 1:
message = "The configuration aggregator does not exist. Check the configuration aggregator name and try again."
else:
@ -72,7 +73,7 @@ class NoSuchConfigurationAggregatorException(JsonRESTError):
class NoSuchConfigurationRecorderException(JsonRESTError):
code = 400
def __init__(self, name):
def __init__(self, name: str):
message = (
f"Cannot find configuration recorder with the specified name '{name}'."
)
@ -82,7 +83,7 @@ class NoSuchConfigurationRecorderException(JsonRESTError):
class InvalidDeliveryChannelNameException(JsonRESTError):
code = 400
def __init__(self, name):
def __init__(self, name: Optional[str]):
message = f"The delivery channel name '{name}' is not valid, blank string."
super().__init__("InvalidDeliveryChannelNameException", message)
@ -92,7 +93,7 @@ class NoSuchBucketException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
message = "Cannot find a S3 bucket with an empty bucket name."
super().__init__("NoSuchBucketException", message)
@ -100,7 +101,7 @@ class NoSuchBucketException(JsonRESTError):
class InvalidNextTokenException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
message = "The nextToken provided is invalid"
super().__init__("InvalidNextTokenException", message)
@ -108,7 +109,7 @@ class InvalidNextTokenException(JsonRESTError):
class InvalidS3KeyPrefixException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
message = "The s3 key prefix '' is not valid, empty s3 key prefix."
super().__init__("InvalidS3KeyPrefixException", message)
@ -118,7 +119,7 @@ class InvalidSNSTopicARNException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
message = "The sns topic arn '' is not valid."
super().__init__("InvalidSNSTopicARNException", message)
@ -126,7 +127,7 @@ class InvalidSNSTopicARNException(JsonRESTError):
class InvalidDeliveryFrequency(JsonRESTError):
code = 400
def __init__(self, value, good_list):
def __init__(self, value: str, good_list: Any):
message = (
f"1 validation error detected: Value '{value}' at "
"'deliveryChannel.configSnapshotDeliveryProperties.deliveryFrequency' failed to satisfy "
@ -138,7 +139,7 @@ class InvalidDeliveryFrequency(JsonRESTError):
class MaxNumberOfDeliveryChannelsExceededException(JsonRESTError):
code = 400
def __init__(self, name):
def __init__(self, name: str):
message = f"Failed to put delivery channel '{name}' because the maximum number of delivery channels: 1 is reached."
super().__init__("MaxNumberOfDeliveryChannelsExceededException", message)
@ -146,7 +147,7 @@ class MaxNumberOfDeliveryChannelsExceededException(JsonRESTError):
class NoSuchDeliveryChannelException(JsonRESTError):
code = 400
def __init__(self, name):
def __init__(self, name: str):
message = f"Cannot find delivery channel with specified name '{name}'."
super().__init__("NoSuchDeliveryChannelException", message)
@ -154,7 +155,7 @@ class NoSuchDeliveryChannelException(JsonRESTError):
class NoAvailableConfigurationRecorderException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
message = "Configuration recorder is not available to put delivery channel."
super().__init__("NoAvailableConfigurationRecorderException", message)
@ -162,7 +163,7 @@ class NoAvailableConfigurationRecorderException(JsonRESTError):
class NoAvailableDeliveryChannelException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
message = "Delivery channel is not available to start configuration recorder."
super().__init__("NoAvailableDeliveryChannelException", message)
@ -170,7 +171,7 @@ class NoAvailableDeliveryChannelException(JsonRESTError):
class LastDeliveryChannelDeleteFailedException(JsonRESTError):
code = 400
def __init__(self, name):
def __init__(self, name: str):
message = (
f"Failed to delete last specified delivery channel with name '{name}', because there, "
"because there is a running configuration recorder."
@ -181,7 +182,7 @@ class LastDeliveryChannelDeleteFailedException(JsonRESTError):
class TooManyAccountSources(JsonRESTError):
code = 400
def __init__(self, length):
def __init__(self, length: int):
locations = ["com.amazonaws.xyz"] * length
message = (
@ -196,7 +197,7 @@ class TooManyAccountSources(JsonRESTError):
class DuplicateTags(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
"InvalidInput",
"Duplicate tag keys found. Please note that Tag keys are case insensitive.",
@ -206,7 +207,7 @@ class DuplicateTags(JsonRESTError):
class TagKeyTooBig(JsonRESTError):
code = 400
def __init__(self, tag, param="tags.X.member.key"):
def __init__(self, tag: str, param: str = "tags.X.member.key"):
super().__init__(
"ValidationException",
f"1 validation error detected: Value '{tag}' at '{param}' failed to satisfy "
@ -217,7 +218,7 @@ class TagKeyTooBig(JsonRESTError):
class TagValueTooBig(JsonRESTError):
code = 400
def __init__(self, tag, param="tags.X.member.value"):
def __init__(self, tag: str, param: str = "tags.X.member.value"):
super().__init__(
"ValidationException",
f"1 validation error detected: Value '{tag}' at '{param}' failed to satisfy "
@ -228,14 +229,14 @@ class TagValueTooBig(JsonRESTError):
class InvalidParameterValueException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("InvalidParameterValueException", message)
class InvalidTagCharacters(JsonRESTError):
code = 400
def __init__(self, tag, param="tags.X.member.key"):
def __init__(self, tag: str, param: str = "tags.X.member.key"):
message = f"1 validation error detected: Value '{tag}' at '{param}' failed to satisfy "
message += "constraint: Member must satisfy regular expression pattern: [\\\\p{L}\\\\p{Z}\\\\p{N}_.:/=+\\\\-@]+"
@ -245,7 +246,7 @@ class InvalidTagCharacters(JsonRESTError):
class TooManyTags(JsonRESTError):
code = 400
def __init__(self, tags, param="tags"):
def __init__(self, tags: Any, param: str = "tags"):
super().__init__(
"ValidationException",
f"1 validation error detected: Value '{tags}' at '{param}' failed to satisfy "
@ -256,7 +257,7 @@ class TooManyTags(JsonRESTError):
class InvalidResourceParameters(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
"ValidationException",
"Both Resource ID and Resource Name " "cannot be specified in the request",
@ -266,7 +267,7 @@ class InvalidResourceParameters(JsonRESTError):
class InvalidLimitException(JsonRESTError):
code = 400
def __init__(self, value):
def __init__(self, value: int):
super().__init__(
"InvalidLimitException",
f"Value '{value}' at 'limit' failed to satisfy constraint: Member"
@ -277,7 +278,7 @@ class InvalidLimitException(JsonRESTError):
class TooManyResourceIds(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
"ValidationException",
"The specified list had more than 20 resource ID's. "
@ -288,7 +289,7 @@ class TooManyResourceIds(JsonRESTError):
class ResourceNotDiscoveredException(JsonRESTError):
code = 400
def __init__(self, resource_type, resource):
def __init__(self, resource_type: str, resource: str):
super().__init__(
"ResourceNotDiscoveredException",
f"Resource {resource} of resourceType:{resource_type} is unknown or has not been discovered",
@ -298,7 +299,7 @@ class ResourceNotDiscoveredException(JsonRESTError):
class ResourceNotFoundException(JsonRESTError):
code = 400
def __init__(self, resource_arn):
def __init__(self, resource_arn: str):
super().__init__(
"ResourceNotFoundException", f"ResourceArn '{resource_arn}' does not exist"
)
@ -307,7 +308,7 @@ class ResourceNotFoundException(JsonRESTError):
class TooManyResourceKeys(JsonRESTError):
code = 400
def __init__(self, bad_list):
def __init__(self, bad_list: List[str]):
message = (
f"1 validation error detected: Value '{bad_list}' at "
"'resourceKeys' failed to satisfy constraint: "
@ -319,7 +320,7 @@ class TooManyResourceKeys(JsonRESTError):
class InvalidResultTokenException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
message = "The resultToken provided is invalid"
super().__init__("InvalidResultTokenException", message)
@ -327,21 +328,21 @@ class InvalidResultTokenException(JsonRESTError):
class ValidationException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("ValidationException", message)
class NoSuchOrganizationConformancePackException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("NoSuchOrganizationConformancePackException", message)
class MaxNumberOfConfigRulesExceededException(JsonRESTError):
code = 400
def __init__(self, name, max_limit):
def __init__(self, name: str, max_limit: int):
message = f"Failed to put config rule '{name}' because the maximum number of config rules: {max_limit} is reached."
super().__init__("MaxNumberOfConfigRulesExceededException", message)
@ -349,21 +350,21 @@ class MaxNumberOfConfigRulesExceededException(JsonRESTError):
class ResourceInUseException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("ResourceInUseException", message)
class InsufficientPermissionsException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("InsufficientPermissionsException", message)
class NoSuchConfigRuleException(JsonRESTError):
code = 400
def __init__(self, rule_name):
def __init__(self, rule_name: str):
message = f"The ConfigRule '{rule_name}' provided in the request is invalid. Please check the configRule name"
super().__init__("NoSuchConfigRuleException", message)
@ -371,5 +372,5 @@ class NoSuchConfigRuleException(JsonRESTError):
class MissingRequiredConfigRuleParameterException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("ParamValidationError", message)

View File

@ -4,6 +4,7 @@ import re
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from moto.config.exceptions import (
InvalidResourceTypeException,
@ -48,6 +49,7 @@ from moto.config.exceptions import (
)
from moto.core import BaseBackend, BaseModel
from moto.core.common_models import ConfigQueryModel
from moto.core.responses import AWSServiceSpec
from moto.core.utils import BackendDict
from moto.iam.config import role_config_query, policy_config_query
@ -69,7 +71,7 @@ DEFAULT_PAGE_SIZE = 100
CONFIG_RULE_PAGE_SIZE = 25
# Map the Config resource type to a backend:
RESOURCE_MAP = {
RESOURCE_MAP: Dict[str, ConfigQueryModel] = {
"AWS::S3::Bucket": s3_config_query,
"AWS::S3::AccountPublicAccessBlock": s3_account_public_access_block_query,
"AWS::IAM::Role": role_config_query,
@ -81,14 +83,14 @@ CAMEL_TO_SNAKE_REGEX = re.compile(r"(?<!^)(?=[A-Z])")
MAX_TAGS_IN_ARG = 50
MANAGED_RULES = load_resource(__name__, "resources/aws_managed_rules.json")
MANAGED_RULES_CONSTRAINTS = MANAGED_RULES["ManagedRules"]
MANAGED_RULES_CONSTRAINTS = MANAGED_RULES["ManagedRules"] # type: ignore[index]
def datetime2int(date):
def datetime2int(date: datetime) -> int:
return int(time.mktime(date.timetuple()))
def snake_to_camels(original, cap_start, cap_arn):
def snake_to_camels(original: str, cap_start: bool, cap_arn: bool) -> str:
parts = original.split("_")
camel_cased = parts[0].lower() + "".join(p.title() for p in parts[1:])
@ -104,12 +106,12 @@ def snake_to_camels(original, cap_start, cap_arn):
return camel_cased
def random_string():
def random_string() -> str:
"""Returns a random set of 8 lowercase letters for the Config Aggregator ARN"""
return random.get_random_string(length=8, include_digits=False, lower_case=True)
def validate_tag_key(tag_key, exception_param="tags.X.member.key"):
def validate_tag_key(tag_key: str, exception_param: str = "tags.X.member.key") -> None:
"""Validates the tag key.
:param tag_key: The tag key to check against.
@ -131,7 +133,7 @@ def validate_tag_key(tag_key, exception_param="tags.X.member.key"):
raise InvalidTagCharacters(tag_key, param=exception_param)
def check_tag_duplicate(all_tags, tag_key):
def check_tag_duplicate(all_tags: Dict[str, str], tag_key: str) -> None:
"""Validates that a tag key is not a duplicate
:param all_tags: Dict to check if there is a duplicate tag.
@ -142,8 +144,8 @@ def check_tag_duplicate(all_tags, tag_key):
raise DuplicateTags()
def validate_tags(tags):
proper_tags = {}
def validate_tags(tags: List[Dict[str, str]]) -> Dict[str, str]:
proper_tags: Dict[str, str] = {}
if len(tags) > MAX_TAGS_IN_ARG:
raise TooManyTags(tags)
@ -162,7 +164,7 @@ def validate_tags(tags):
return proper_tags
def convert_to_class_args(dict_arg):
def convert_to_class_args(dict_arg: Dict[str, Any]) -> Dict[str, Any]:
"""Return dict that can be used to instantiate it's representative class.
Given a dictionary in the incoming API request, convert the keys to
@ -184,7 +186,7 @@ class ConfigEmptyDictable(BaseModel):
This assumes that the sub-class will NOT return 'None's in the JSON.
"""
def __init__(self, capitalize_start=False, capitalize_arn=True):
def __init__(self, capitalize_start: bool = False, capitalize_arn: bool = True):
"""Assists with the serialization of the config object
:param capitalize_start: For some Config services, the first letter
is lowercase -- for others it's capital
@ -194,8 +196,8 @@ class ConfigEmptyDictable(BaseModel):
self.capitalize_start = capitalize_start
self.capitalize_arn = capitalize_arn
def to_dict(self):
data = {}
def to_dict(self) -> Dict[str, Any]:
data: Dict[str, Any] = {}
for item, value in self.__dict__.items():
# ignore private attributes
if not item.startswith("_") and value is not None:
@ -220,32 +222,32 @@ class ConfigEmptyDictable(BaseModel):
class ConfigRecorderStatus(ConfigEmptyDictable):
def __init__(self, name):
def __init__(self, name: str):
super().__init__()
self.name = name
self.recording = False
self.last_start_time = None
self.last_stop_time = None
self.last_status = None
self.last_error_code = None
self.last_error_message = None
self.last_status_change_time = None
self.last_start_time: Optional[int] = None
self.last_stop_time: Optional[int] = None
self.last_status: Optional[str] = None
self.last_error_code: Optional[str] = None
self.last_error_message: Optional[str] = None
self.last_status_change_time: Optional[int] = None
def start(self):
def start(self) -> None:
self.recording = True
self.last_status = "PENDING"
self.last_start_time = datetime2int(datetime.utcnow())
self.last_status_change_time = datetime2int(datetime.utcnow())
def stop(self):
def stop(self) -> None:
self.recording = False
self.last_stop_time = datetime2int(datetime.utcnow())
self.last_status_change_time = datetime2int(datetime.utcnow())
class ConfigDeliverySnapshotProperties(ConfigEmptyDictable):
def __init__(self, delivery_frequency):
def __init__(self, delivery_frequency: str):
super().__init__()
self.delivery_frequency = delivery_frequency
@ -253,7 +255,12 @@ class ConfigDeliverySnapshotProperties(ConfigEmptyDictable):
class ConfigDeliveryChannel(ConfigEmptyDictable):
def __init__(
self, name, s3_bucket_name, prefix=None, sns_arn=None, snapshot_properties=None
self,
name: str,
s3_bucket_name: str,
prefix: Optional[str] = None,
sns_arn: Optional[str] = None,
snapshot_properties: Optional[ConfigDeliverySnapshotProperties] = None,
):
super().__init__()
@ -267,9 +274,9 @@ class ConfigDeliveryChannel(ConfigEmptyDictable):
class RecordingGroup(ConfigEmptyDictable):
def __init__(
self,
all_supported=True,
include_global_resource_types=False,
resource_types=None,
all_supported: bool = True,
include_global_resource_types: bool = False,
resource_types: Optional[List[str]] = None,
):
super().__init__()
@ -279,7 +286,13 @@ class RecordingGroup(ConfigEmptyDictable):
class ConfigRecorder(ConfigEmptyDictable):
def __init__(self, role_arn, recording_group, name="default", status=None):
def __init__(
self,
role_arn: str,
recording_group: RecordingGroup,
name: str = "default",
status: Optional[ConfigRecorderStatus] = None,
):
super().__init__()
self.name = name
@ -293,7 +306,12 @@ class ConfigRecorder(ConfigEmptyDictable):
class AccountAggregatorSource(ConfigEmptyDictable):
def __init__(self, account_ids, aws_regions=None, all_aws_regions=None):
def __init__(
self,
account_ids: List[str],
aws_regions: Optional[List[str]] = None,
all_aws_regions: Optional[bool] = None,
):
super().__init__(capitalize_start=True)
# Can't have both the regions and all_regions flag present -- also
@ -321,7 +339,12 @@ class AccountAggregatorSource(ConfigEmptyDictable):
class OrganizationAggregationSource(ConfigEmptyDictable):
def __init__(self, role_arn, aws_regions=None, all_aws_regions=None):
def __init__(
self,
role_arn: str,
aws_regions: Optional[List[str]] = None,
all_aws_regions: Optional[bool] = None,
):
super().__init__(capitalize_start=True, capitalize_arn=False)
# Can't have both the regions and all_regions flag present -- also
@ -349,7 +372,13 @@ class OrganizationAggregationSource(ConfigEmptyDictable):
class ConfigAggregator(ConfigEmptyDictable):
def __init__(
self, name, account_id, region, account_sources=None, org_source=None, tags=None
self,
name: str,
account_id: str,
region: str,
account_sources: Optional[List[AccountAggregatorSource]] = None,
org_source: Optional[OrganizationAggregationSource] = None,
tags: Optional[Dict[str, str]] = None,
):
super().__init__(capitalize_start=True, capitalize_arn=False)
@ -364,7 +393,7 @@ class ConfigAggregator(ConfigEmptyDictable):
self.tags = tags or {}
# Override the to_dict so that we can format the tags properly...
def to_dict(self):
def to_dict(self) -> Dict[str, Any]:
result = super().to_dict()
# Override the account aggregation sources if present:
@ -384,11 +413,11 @@ class ConfigAggregator(ConfigEmptyDictable):
class ConfigAggregationAuthorization(ConfigEmptyDictable):
def __init__(
self,
account_id,
current_region,
authorized_account_id,
authorized_aws_region,
tags=None,
account_id: str,
current_region: str,
authorized_account_id: str,
authorized_aws_region: str,
tags: Dict[str, str],
):
super().__init__(capitalize_start=True, capitalize_arn=False)
@ -412,13 +441,13 @@ class ConfigAggregationAuthorization(ConfigEmptyDictable):
class OrganizationConformancePack(ConfigEmptyDictable):
def __init__(
self,
account_id,
region,
name,
delivery_s3_bucket,
delivery_s3_key_prefix=None,
input_parameters=None,
excluded_accounts=None,
account_id: str,
region: str,
name: str,
delivery_s3_bucket: str,
delivery_s3_key_prefix: Optional[str] = None,
input_parameters: Optional[List[Dict[str, Any]]] = None,
excluded_accounts: Optional[List[str]] = None,
):
super().__init__(capitalize_start=True, capitalize_arn=False)
@ -435,11 +464,11 @@ class OrganizationConformancePack(ConfigEmptyDictable):
def update(
self,
delivery_s3_bucket,
delivery_s3_key_prefix,
input_parameters,
excluded_accounts,
):
delivery_s3_bucket: str,
delivery_s3_key_prefix: str,
input_parameters: List[Dict[str, Any]],
excluded_accounts: List[str],
) -> None:
self._status = "UPDATE_SUCCESSFUL"
self.conformance_pack_input_parameters = input_parameters
@ -464,10 +493,10 @@ class Scope(ConfigEmptyDictable):
def __init__(
self,
compliance_resource_types=None,
tag_key=None,
tag_value=None,
compliance_resource_id=None,
compliance_resource_types: Optional[List[str]] = None,
tag_key: Optional[str] = None,
tag_value: Optional[str] = None,
compliance_resource_id: Optional[str] = None,
):
super().__init__(capitalize_start=True, capitalize_arn=False)
self.tags = None
@ -489,7 +518,7 @@ class Scope(ConfigEmptyDictable):
"Scope cannot be applied to both resource and tag"
)
if compliance_resource_id and len(compliance_resource_types) != 1:
if compliance_resource_id and len(compliance_resource_types) != 1: # type: ignore[arg-type]
raise InvalidParameterValueException(
"A single resourceType should be provided when resourceId "
"is provided in scope"
@ -522,7 +551,10 @@ class SourceDetail(ConfigEmptyDictable):
EVENT_SOURCES = ["aws.config"]
def __init__(
self, event_source=None, message_type=None, maximum_execution_frequency=None
self,
event_source: Optional[str] = None,
message_type: Optional[str] = None,
maximum_execution_frequency: Optional[str] = None,
):
super().__init__(capitalize_start=True, capitalize_arn=False)
@ -599,7 +631,12 @@ class Source(ConfigEmptyDictable):
OWNERS = {"AWS", "CUSTOM_LAMBDA"}
def __init__(
self, account_id, region, owner, source_identifier, source_details=None
self,
account_id: str,
region: str,
owner: str,
source_identifier: str,
source_details: Optional[SourceDetail] = None,
):
super().__init__(capitalize_start=True, capitalize_arn=False)
if owner not in Source.OWNERS:
@ -651,7 +688,7 @@ class Source(ConfigEmptyDictable):
)
details = []
for detail in source_details:
for detail in source_details: # type: ignore[attr-defined]
detail_dict = convert_to_class_args(detail)
details.append(SourceDetail(**detail_dict))
@ -659,7 +696,7 @@ class Source(ConfigEmptyDictable):
self.owner = owner
self.source_identifier = source_identifier
def to_dict(self):
def to_dict(self) -> Dict[str, Any]:
"""Format the SourceDetails properly."""
result = super().to_dict()
if self.source_details:
@ -678,7 +715,13 @@ class ConfigRule(ConfigEmptyDictable):
MAX_RULES = 150
RULE_STATES = {"ACTIVE", "DELETING", "DELETING_RESULTS", "EVALUATING"}
def __init__(self, account_id, region, config_rule, tags):
def __init__(
self,
account_id: str,
region: str,
config_rule: Dict[str, Any],
tags: Dict[str, str],
):
super().__init__(capitalize_start=True, capitalize_arn=False)
self.account_id = account_id
self.config_rule_name = config_rule.get("ConfigRuleName")
@ -697,7 +740,9 @@ class ConfigRule(ConfigEmptyDictable):
f"arn:aws:config:{region}:{account_id}:config-rule/{self.config_rule_id}"
)
def modify_fields(self, region, config_rule, tags):
def modify_fields(
self, region: str, config_rule: Dict[str, Any], tags: Dict[str, str]
) -> None:
"""Initialize or update ConfigRule fields."""
self.config_rule_state = config_rule.get("ConfigRuleState", "ACTIVE")
if self.config_rule_state not in ConfigRule.RULE_STATES:
@ -787,23 +832,22 @@ class ConfigRule(ConfigEmptyDictable):
self.last_updated_time = datetime2int(datetime.utcnow())
self.tags = tags
def validate_managed_rule(self):
def validate_managed_rule(self) -> None:
"""Validate parameters specific to managed rules."""
rule_info = MANAGED_RULES_CONSTRAINTS[self.source.source_identifier]
rule_info = MANAGED_RULES_CONSTRAINTS[self.source.source_identifier] # type: ignore[index]
param_names = self.input_parameters_dict.keys()
# Verify input parameter names are actual parameters for the rule ID.
if param_names:
allowed_names = {x["Name"] for x in rule_info["Parameters"]}
allowed_names = {x["Name"] for x in rule_info["Parameters"]} # type: ignore[index]
if not set(param_names).issubset(allowed_names):
raise InvalidParameterValueException(
"Unknown parameters provided in the inputParameters: "
+ self.input_parameters
f"Unknown parameters provided in the inputParameters: {self.input_parameters}"
)
# Verify all the required parameters are specified.
required_names = {
x["Name"] for x in rule_info["Parameters"] if not x["Optional"]
x["Name"] for x in rule_info["Parameters"] if not x["Optional"] # type: ignore[index]
}
diffs = required_names.difference(set(param_names))
if diffs:
@ -849,24 +893,24 @@ class ConfigRule(ConfigEmptyDictable):
class ConfigBackend(BaseBackend):
def __init__(self, region_name, account_id):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.recorders = {}
self.delivery_channels = {}
self.config_aggregators = {}
self.aggregation_authorizations = {}
self.organization_conformance_packs = {}
self.config_rules = {}
self.config_schema = None
self.recorders: Dict[str, ConfigRecorder] = {}
self.delivery_channels: Dict[str, ConfigDeliveryChannel] = {}
self.config_aggregators: Dict[str, ConfigAggregator] = {}
self.aggregation_authorizations: Dict[str, ConfigAggregationAuthorization] = {}
self.organization_conformance_packs: Dict[str, OrganizationConformancePack] = {}
self.config_rules: Dict[str, ConfigRule] = {}
self.config_schema: Optional[AWSServiceSpec] = None
@staticmethod
def default_vpc_endpoint_service(service_region, zones):
def default_vpc_endpoint_service(service_region: str, zones: List[str]) -> List[Dict[str, Any]]: # type: ignore[misc]
"""List of dicts representing default VPC endpoints for this service."""
return BaseBackend.default_vpc_endpoint_service_factory(
service_region, zones, "config"
)
def _validate_resource_types(self, resource_list):
def _validate_resource_types(self, resource_list: List[str]) -> None:
if not self.config_schema:
self.config_schema = AWSServiceSpec(
path="data/config/2014-11-12/service-2.json"
@ -875,15 +919,17 @@ class ConfigBackend(BaseBackend):
# Verify that each entry exists in the supported list:
bad_list = []
for resource in resource_list:
if resource not in self.config_schema.shapes["ResourceType"]["enum"]:
if resource not in self.config_schema.shapes["ResourceType"]["enum"]: # type: ignore[index]
bad_list.append(resource)
if bad_list:
raise InvalidResourceTypeException(
bad_list, self.config_schema.shapes["ResourceType"]["enum"]
bad_list, self.config_schema.shapes["ResourceType"]["enum"] # type: ignore[index]
)
def _validate_delivery_snapshot_properties(self, properties):
def _validate_delivery_snapshot_properties(
self, properties: Dict[str, Any]
) -> None:
if not self.config_schema:
self.config_schema = AWSServiceSpec(
path="data/config/2014-11-12/service-2.json"
@ -892,23 +938,23 @@ class ConfigBackend(BaseBackend):
# Verify that the deliveryFrequency is set to an acceptable value:
if (
properties.get("deliveryFrequency", None)
not in self.config_schema.shapes["MaximumExecutionFrequency"]["enum"]
not in self.config_schema.shapes["MaximumExecutionFrequency"]["enum"] # type: ignore[index]
):
raise InvalidDeliveryFrequency(
properties.get("deliveryFrequency", None),
self.config_schema.shapes["MaximumExecutionFrequency"]["enum"],
self.config_schema.shapes["MaximumExecutionFrequency"]["enum"], # type: ignore[index]
)
def put_configuration_aggregator(self, config_aggregator):
def put_configuration_aggregator(
self, config_aggregator: Dict[str, Any]
) -> Dict[str, Any]:
# Validate the name:
if len(config_aggregator["ConfigurationAggregatorName"]) > 256:
raise NameTooLongException(
config_aggregator["ConfigurationAggregatorName"],
"configurationAggregatorName",
)
config_aggr_name = config_aggregator["ConfigurationAggregatorName"]
if len(config_aggr_name) > 256:
raise NameTooLongException(config_aggr_name, "configurationAggregatorName")
account_sources = None
org_source = None
account_sources: Optional[List[AccountAggregatorSource]] = None
org_source: Optional[OrganizationAggregationSource] = None
# Tag validation:
tags = validate_tags(config_aggregator.get("Tags", []))
@ -965,25 +1011,19 @@ class ConfigBackend(BaseBackend):
)
# Grab the existing one if it exists and update it:
if not self.config_aggregators.get(
config_aggregator["ConfigurationAggregatorName"]
):
if not self.config_aggregators.get(config_aggr_name):
aggregator = ConfigAggregator(
config_aggregator["ConfigurationAggregatorName"],
config_aggr_name,
account_id=self.account_id,
region=self.region_name,
account_sources=account_sources,
org_source=org_source,
tags=tags,
)
self.config_aggregators[
config_aggregator["ConfigurationAggregatorName"]
] = aggregator
self.config_aggregators[config_aggr_name] = aggregator
else:
aggregator = self.config_aggregators[
config_aggregator["ConfigurationAggregatorName"]
]
aggregator = self.config_aggregators[config_aggr_name]
aggregator.tags = tags
aggregator.account_aggregation_sources = account_sources
aggregator.organization_aggregation_source = org_source
@ -991,10 +1031,12 @@ class ConfigBackend(BaseBackend):
return aggregator.to_dict()
def describe_configuration_aggregators(self, names, token, limit):
def describe_configuration_aggregators(
self, names: List[str], token: str, limit: Optional[int]
) -> Dict[str, Any]:
limit = DEFAULT_PAGE_SIZE if not limit or limit < 0 else limit
agg_list = []
result = {"ConfigurationAggregators": []}
result: Dict[str, Any] = {"ConfigurationAggregators": []}
if names:
for name in names:
@ -1034,17 +1076,20 @@ class ConfigBackend(BaseBackend):
return result
def delete_configuration_aggregator(self, config_aggregator):
def delete_configuration_aggregator(self, config_aggregator: str) -> None:
if not self.config_aggregators.get(config_aggregator):
raise NoSuchConfigurationAggregatorException()
del self.config_aggregators[config_aggregator]
def put_aggregation_authorization(
self, authorized_account, authorized_region, tags
):
self,
authorized_account: str,
authorized_region: str,
tags: List[Dict[str, str]],
) -> Dict[str, Any]:
# Tag validation:
tags = validate_tags(tags or [])
tag_dict = validate_tags(tags or [])
# Does this already exist?
key = "{}/{}".format(authorized_account, authorized_region)
@ -1055,20 +1100,22 @@ class ConfigBackend(BaseBackend):
self.region_name,
authorized_account,
authorized_region,
tags=tags,
tags=tag_dict,
)
self.aggregation_authorizations[
"{}/{}".format(authorized_account, authorized_region)
] = agg_auth
else:
# Only update the tags:
agg_auth.tags = tags
agg_auth.tags = tag_dict
return agg_auth.to_dict()
def describe_aggregation_authorizations(self, token, limit):
def describe_aggregation_authorizations(
self, token: Optional[str], limit: Optional[int]
) -> Dict[str, Any]:
limit = DEFAULT_PAGE_SIZE if not limit or limit < 0 else limit
result = {"AggregationAuthorizations": []}
result: Dict[str, Any] = {"AggregationAuthorizations": []}
if not self.aggregation_authorizations:
return result
@ -1097,19 +1144,21 @@ class ConfigBackend(BaseBackend):
return result
def delete_aggregation_authorization(self, authorized_account, authorized_region):
def delete_aggregation_authorization(
self, authorized_account: str, authorized_region: str
) -> None:
# This will always return a 200 -- regardless if there is or isn't an existing
# aggregation authorization.
key = "{}/{}".format(authorized_account, authorized_region)
self.aggregation_authorizations.pop(key, None)
def put_configuration_recorder(self, config_recorder):
def put_configuration_recorder(self, config_recorder: Dict[str, Any]) -> None:
# Validate the name:
if not config_recorder.get("name"):
raise InvalidConfigurationRecorderNameException(config_recorder.get("name"))
if len(config_recorder.get("name")) > 256:
if len(config_recorder["name"]) > 256:
raise NameTooLongException(
config_recorder.get("name"), "configurationRecorder.name"
config_recorder["name"], "configurationRecorder.name"
)
# We're going to assume that the passed in Role ARN is correct.
@ -1164,8 +1213,10 @@ class ConfigBackend(BaseBackend):
status=recorder_status,
)
def describe_configuration_recorders(self, recorder_names):
recorders = []
def describe_configuration_recorders(
self, recorder_names: Optional[List[str]]
) -> List[Dict[str, Any]]:
recorders: List[Dict[str, Any]] = []
if recorder_names:
for rname in recorder_names:
@ -1181,8 +1232,10 @@ class ConfigBackend(BaseBackend):
return recorders
def describe_configuration_recorder_status(self, recorder_names):
recorders = []
def describe_configuration_recorder_status(
self, recorder_names: List[str]
) -> List[Dict[str, Any]]:
recorders: List[Dict[str, Any]] = []
if recorder_names:
for rname in recorder_names:
@ -1198,7 +1251,7 @@ class ConfigBackend(BaseBackend):
return recorders
def put_delivery_channel(self, delivery_channel):
def put_delivery_channel(self, delivery_channel: Dict[str, Any]) -> None:
# Must have a configuration recorder:
if not self.recorders:
raise NoAvailableConfigurationRecorderException()
@ -1206,10 +1259,8 @@ class ConfigBackend(BaseBackend):
# Validate the name:
if not delivery_channel.get("name"):
raise InvalidDeliveryChannelNameException(delivery_channel.get("name"))
if len(delivery_channel.get("name")) > 256:
raise NameTooLongException(
delivery_channel.get("name"), "deliveryChannel.name"
)
if len(delivery_channel["name"]) > 256:
raise NameTooLongException(delivery_channel["name"], "deliveryChannel.name")
# We are going to assume that the bucket exists -- but will verify if
# the bucket provided is blank:
@ -1256,8 +1307,10 @@ class ConfigBackend(BaseBackend):
snapshot_properties=dprop,
)
def describe_delivery_channels(self, channel_names):
channels = []
def describe_delivery_channels(
self, channel_names: List[str]
) -> List[Dict[str, Any]]:
channels: List[Dict[str, Any]] = []
if channel_names:
for cname in channel_names:
@ -1273,7 +1326,7 @@ class ConfigBackend(BaseBackend):
return channels
def start_configuration_recorder(self, recorder_name):
def start_configuration_recorder(self, recorder_name: str) -> None:
if not self.recorders.get(recorder_name):
raise NoSuchConfigurationRecorderException(recorder_name)
@ -1284,20 +1337,20 @@ class ConfigBackend(BaseBackend):
# Start recording:
self.recorders[recorder_name].status.start()
def stop_configuration_recorder(self, recorder_name):
def stop_configuration_recorder(self, recorder_name: str) -> None:
if not self.recorders.get(recorder_name):
raise NoSuchConfigurationRecorderException(recorder_name)
# Stop recording:
self.recorders[recorder_name].status.stop()
def delete_configuration_recorder(self, recorder_name):
def delete_configuration_recorder(self, recorder_name: str) -> None:
if not self.recorders.get(recorder_name):
raise NoSuchConfigurationRecorderException(recorder_name)
del self.recorders[recorder_name]
def delete_delivery_channel(self, channel_name):
def delete_delivery_channel(self, channel_name: str) -> None:
if not self.delivery_channels.get(channel_name):
raise NoSuchDeliveryChannelException(channel_name)
@ -1310,13 +1363,13 @@ class ConfigBackend(BaseBackend):
def list_discovered_resources(
self,
resource_type,
backend_region,
resource_ids,
resource_name,
limit,
next_token,
):
resource_type: str,
backend_region: str,
resource_ids: List[str],
resource_name: str,
limit: int,
next_token: str,
) -> Dict[str, Any]:
"""Queries against AWS Config (non-aggregated) listing function.
The listing function must exist for the resource backend.
@ -1392,8 +1445,13 @@ class ConfigBackend(BaseBackend):
return result
def list_aggregate_discovered_resources(
self, aggregator_name, resource_type, filters, limit, next_token
):
self,
aggregator_name: str,
resource_type: str,
filters: Dict[str, str],
limit: Optional[int],
next_token: Optional[str],
) -> Dict[str, Any]:
"""Queries AWS Config listing function that must exist for resource backend.
As far a moto goes -- the only real difference between this function
@ -1453,14 +1511,16 @@ class ConfigBackend(BaseBackend):
resource_identifiers.append(item)
result = {"ResourceIdentifiers": resource_identifiers}
result: Dict[str, Any] = {"ResourceIdentifiers": resource_identifiers}
if new_token:
result["NextToken"] = new_token
return result
def get_resource_config_history(self, resource_type, resource_id, backend_region):
def get_resource_config_history(
self, resource_type: str, resource_id: str, backend_region: str
) -> Dict[str, Any]:
"""Returns configuration of resource for the current regional backend.
Item returned in AWS Config format.
@ -1502,7 +1562,9 @@ class ConfigBackend(BaseBackend):
return {"configurationItems": [item]}
def batch_get_resource_config(self, resource_keys, backend_region):
def batch_get_resource_config(
self, resource_keys: List[Dict[str, str]], backend_region: str
) -> Dict[str, Any]:
"""Returns configuration of resource for the current regional backend.
Item is returned in AWS Config format.
@ -1562,8 +1624,8 @@ class ConfigBackend(BaseBackend):
} # At this time, moto is not adding unprocessed items.
def batch_get_aggregate_resource_config(
self, aggregator_name, resource_identifiers
):
self, aggregator_name: str, resource_identifiers: List[Dict[str, str]]
) -> Dict[str, Any]:
"""Returns configuration of resource for current regional backend.
Item is returned in AWS Config format.
@ -1621,7 +1683,12 @@ class ConfigBackend(BaseBackend):
"UnprocessedResourceIdentifiers": not_found,
}
def put_evaluations(self, evaluations=None, result_token=None, test_mode=False):
def put_evaluations(
self,
evaluations: Optional[List[Dict[str, Any]]] = None,
result_token: Optional[str] = None,
test_mode: Optional[bool] = False,
) -> Dict[str, List[Any]]:
if not evaluations:
raise InvalidParameterValueException(
"The Evaluations object in your request cannot be null."
@ -1644,14 +1711,14 @@ class ConfigBackend(BaseBackend):
def put_organization_conformance_pack(
self,
name,
template_s3_uri,
template_body,
delivery_s3_bucket,
delivery_s3_key_prefix,
input_parameters,
excluded_accounts,
):
name: str,
template_s3_uri: str,
template_body: str,
delivery_s3_bucket: str,
delivery_s3_key_prefix: str,
input_parameters: List[Dict[str, str]],
excluded_accounts: List[str],
) -> Dict[str, Any]:
# a real validation of the content of the template is missing at the moment
if not template_s3_uri and not template_body:
raise ValidationException("Template body is invalid")
@ -1690,7 +1757,9 @@ class ConfigBackend(BaseBackend):
"OrganizationConformancePackArn": pack.organization_conformance_pack_arn
}
def describe_organization_conformance_packs(self, names):
def describe_organization_conformance_packs(
self, names: List[str]
) -> Dict[str, Any]:
packs = []
for name in names:
@ -1707,7 +1776,9 @@ class ConfigBackend(BaseBackend):
return {"OrganizationConformancePacks": packs}
def describe_organization_conformance_pack_statuses(self, names):
def describe_organization_conformance_pack_statuses(
self, names: List[str]
) -> Dict[str, Any]:
packs = []
statuses = []
@ -1737,7 +1808,9 @@ class ConfigBackend(BaseBackend):
return {"OrganizationConformancePackStatuses": statuses}
def get_organization_conformance_pack_detailed_status(self, name):
def get_organization_conformance_pack_detailed_status(
self, name: str
) -> Dict[str, Any]:
pack = self.organization_conformance_packs.get(name)
if not pack:
@ -1760,7 +1833,7 @@ class ConfigBackend(BaseBackend):
return {"OrganizationConformancePackDetailedStatuses": statuses}
def delete_organization_conformance_pack(self, name):
def delete_organization_conformance_pack(self, name: str) -> None:
pack = self.organization_conformance_packs.get(name)
if not pack:
@ -1771,28 +1844,24 @@ class ConfigBackend(BaseBackend):
self.organization_conformance_packs.pop(name)
def _match_arn(self, resource_arn):
def _match_arn(
self, resource_arn: str
) -> Union[ConfigRule, ConfigAggregator, ConfigAggregationAuthorization]:
"""Return config instance that has a matching ARN."""
# The allowed resources are ConfigRule, ConfigurationAggregator,
# and AggregatorAuthorization.
allowed_resources = [
{
"configs": self.config_aggregators,
"arn_attribute": "configuration_aggregator_arn",
},
{
"configs": self.aggregation_authorizations,
"arn_attribute": "aggregation_authorization_arn",
},
{"configs": self.config_rules, "arn_attribute": "config_rule_arn"},
]
allowed_resources = {
"configuration_aggregator_arn": self.config_aggregators,
"aggregation_authorization_arn": self.aggregation_authorizations,
"config_rule_arn": self.config_rules,
}
# Find matching config for given resource_arn among all the
# allowed config resources.
matched_config = None
for resource in allowed_resources:
for config in resource["configs"].values():
if resource_arn == getattr(config, resource["arn_attribute"]):
for arn_attribute, configs in allowed_resources.items():
for config in configs.values(): # type: ignore[attr-defined]
if resource_arn == getattr(config, arn_attribute):
matched_config = config
break
@ -1800,18 +1869,18 @@ class ConfigBackend(BaseBackend):
raise ResourceNotFoundException(resource_arn)
return matched_config
def tag_resource(self, resource_arn, tags):
def tag_resource(self, resource_arn: str, tags: List[Dict[str, str]]) -> None:
"""Add tags in config with a matching ARN."""
# Tag validation:
tags = validate_tags(tags)
tag_dict = validate_tags(tags)
# Find config with a matching ARN.
matched_config = self._match_arn(resource_arn)
# Merge the new tags with the existing tags.
matched_config.tags.update(tags)
matched_config.tags.update(tag_dict)
def untag_resource(self, resource_arn, tag_keys):
def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None:
"""Remove tags in config with a matching ARN.
If the tags in the tag_keys don't match any keys for that
@ -1827,8 +1896,8 @@ class ConfigBackend(BaseBackend):
matched_config.tags.pop(tag_key, None)
def list_tags_for_resource(
self, resource_arn, limit, next_token
): # pylint: disable=unused-argument
self, resource_arn: str, limit: int
) -> Dict[str, List[Dict[str, str]]]:
"""Return list of tags for AWS Config resource."""
# The limit argument is essentially ignored as a config instance
# can only have 50 tags, but we'll check the argument anyway.
@ -1845,7 +1914,9 @@ class ConfigBackend(BaseBackend):
]
}
def put_config_rule(self, config_rule, tags=None):
def put_config_rule(
self, config_rule: Dict[str, Any], tags: Optional[List[Dict[str, str]]] = None
) -> str:
"""Add/Update config rule for evaluating resource compliance.
TBD - Only the "accounting" of config rules are handled at the
@ -1880,7 +1951,7 @@ class ConfigBackend(BaseBackend):
"Name or Id or Arn"
)
tags = validate_tags(tags or [])
tag_dict = validate_tags(tags or [])
# With the rule_name, determine whether it's for an existing rule
# or whether a new rule should be created.
@ -1896,20 +1967,22 @@ class ConfigBackend(BaseBackend):
)
# Update the current rule.
rule.modify_fields(self.region_name, config_rule, tags)
rule.modify_fields(self.region_name, config_rule, tag_dict)
else:
# Create a new ConfigRule if the limit hasn't been reached.
if len(self.config_rules) == ConfigRule.MAX_RULES:
raise MaxNumberOfConfigRulesExceededException(
rule_name, ConfigRule.MAX_RULES
)
rule = ConfigRule(self.account_id, self.region_name, config_rule, tags)
rule = ConfigRule(self.account_id, self.region_name, config_rule, tag_dict)
self.config_rules[rule_name] = rule
return ""
def describe_config_rules(self, config_rule_names, next_token):
def describe_config_rules(
self, config_rule_names: Optional[List[str]], next_token: Optional[str]
) -> Dict[str, Any]:
"""Return details for the given ConfigRule names or for all rules."""
result = {"ConfigRules": []}
result: Dict[str, Any] = {"ConfigRules": []}
if not self.config_rules:
return result
@ -1937,7 +2010,7 @@ class ConfigBackend(BaseBackend):
result["NextToken"] = sorted_rules[start + CONFIG_RULE_PAGE_SIZE]
return result
def delete_config_rule(self, rule_name):
def delete_config_rule(self, rule_name: str) -> None:
"""Delete config rule used for evaluating resource compliance."""
rule = self.config_rules.get(rule_name)
if not rule:

View File

@ -1,30 +1,30 @@
import json
from moto.core.responses import BaseResponse
from .models import config_backends
from .models import config_backends, ConfigBackend
class ConfigResponse(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="config")
@property
def config_backend(self):
def config_backend(self) -> ConfigBackend:
return config_backends[self.current_account][self.region]
def put_configuration_recorder(self):
def put_configuration_recorder(self) -> str:
self.config_backend.put_configuration_recorder(
self._get_param("ConfigurationRecorder")
)
return ""
def put_configuration_aggregator(self):
def put_configuration_aggregator(self) -> str:
aggregator = self.config_backend.put_configuration_aggregator(
json.loads(self.body)
)
schema = {"ConfigurationAggregator": aggregator}
return json.dumps(schema)
def describe_configuration_aggregators(self):
def describe_configuration_aggregators(self) -> str:
aggregators = self.config_backend.describe_configuration_aggregators(
self._get_param("ConfigurationAggregatorNames"),
self._get_param("NextToken"),
@ -32,13 +32,13 @@ class ConfigResponse(BaseResponse):
)
return json.dumps(aggregators)
def delete_configuration_aggregator(self):
def delete_configuration_aggregator(self) -> str:
self.config_backend.delete_configuration_aggregator(
self._get_param("ConfigurationAggregatorName")
)
return ""
def put_aggregation_authorization(self):
def put_aggregation_authorization(self) -> str:
agg_auth = self.config_backend.put_aggregation_authorization(
self._get_param("AuthorizedAccountId"),
self._get_param("AuthorizedAwsRegion"),
@ -47,14 +47,14 @@ class ConfigResponse(BaseResponse):
schema = {"AggregationAuthorization": agg_auth}
return json.dumps(schema)
def describe_aggregation_authorizations(self):
def describe_aggregation_authorizations(self) -> str:
authorizations = self.config_backend.describe_aggregation_authorizations(
self._get_param("NextToken"), self._get_param("Limit")
)
return json.dumps(authorizations)
def delete_aggregation_authorization(self):
def delete_aggregation_authorization(self) -> str:
self.config_backend.delete_aggregation_authorization(
self._get_param("AuthorizedAccountId"),
self._get_param("AuthorizedAwsRegion"),
@ -62,59 +62,59 @@ class ConfigResponse(BaseResponse):
return ""
def describe_configuration_recorders(self):
def describe_configuration_recorders(self) -> str:
recorders = self.config_backend.describe_configuration_recorders(
self._get_param("ConfigurationRecorderNames")
)
schema = {"ConfigurationRecorders": recorders}
return json.dumps(schema)
def describe_configuration_recorder_status(self):
def describe_configuration_recorder_status(self) -> str:
recorder_statuses = self.config_backend.describe_configuration_recorder_status(
self._get_param("ConfigurationRecorderNames")
)
schema = {"ConfigurationRecordersStatus": recorder_statuses}
return json.dumps(schema)
def put_delivery_channel(self):
def put_delivery_channel(self) -> str:
self.config_backend.put_delivery_channel(self._get_param("DeliveryChannel"))
return ""
def describe_delivery_channels(self):
def describe_delivery_channels(self) -> str:
delivery_channels = self.config_backend.describe_delivery_channels(
self._get_param("DeliveryChannelNames")
)
schema = {"DeliveryChannels": delivery_channels}
return json.dumps(schema)
def describe_delivery_channel_status(self):
def describe_delivery_channel_status(self) -> str:
raise NotImplementedError()
def delete_delivery_channel(self):
def delete_delivery_channel(self) -> str:
self.config_backend.delete_delivery_channel(
self._get_param("DeliveryChannelName")
)
return ""
def delete_configuration_recorder(self):
def delete_configuration_recorder(self) -> str:
self.config_backend.delete_configuration_recorder(
self._get_param("ConfigurationRecorderName")
)
return ""
def start_configuration_recorder(self):
def start_configuration_recorder(self) -> str:
self.config_backend.start_configuration_recorder(
self._get_param("ConfigurationRecorderName")
)
return ""
def stop_configuration_recorder(self):
def stop_configuration_recorder(self) -> str:
self.config_backend.stop_configuration_recorder(
self._get_param("ConfigurationRecorderName")
)
return ""
def list_discovered_resources(self):
def list_discovered_resources(self) -> str:
schema = self.config_backend.list_discovered_resources(
self._get_param("resourceType"),
self.region,
@ -125,7 +125,7 @@ class ConfigResponse(BaseResponse):
)
return json.dumps(schema)
def list_aggregate_discovered_resources(self):
def list_aggregate_discovered_resources(self) -> str:
schema = self.config_backend.list_aggregate_discovered_resources(
self._get_param("ConfigurationAggregatorName"),
self._get_param("ResourceType"),
@ -135,34 +135,32 @@ class ConfigResponse(BaseResponse):
)
return json.dumps(schema)
def list_tags_for_resource(self):
def list_tags_for_resource(self) -> str:
schema = self.config_backend.list_tags_for_resource(
self._get_param("ResourceArn"),
self._get_param("Limit"),
self._get_param("NextToken"),
self._get_param("ResourceArn"), self._get_param("Limit")
)
return json.dumps(schema)
def get_resource_config_history(self):
def get_resource_config_history(self) -> str:
schema = self.config_backend.get_resource_config_history(
self._get_param("resourceType"), self._get_param("resourceId"), self.region
)
return json.dumps(schema)
def batch_get_resource_config(self):
def batch_get_resource_config(self) -> str:
schema = self.config_backend.batch_get_resource_config(
self._get_param("resourceKeys"), self.region
)
return json.dumps(schema)
def batch_get_aggregate_resource_config(self):
def batch_get_aggregate_resource_config(self) -> str:
schema = self.config_backend.batch_get_aggregate_resource_config(
self._get_param("ConfigurationAggregatorName"),
self._get_param("ResourceIdentifiers"),
)
return json.dumps(schema)
def put_evaluations(self):
def put_evaluations(self) -> str:
evaluations = self.config_backend.put_evaluations(
self._get_param("Evaluations"),
self._get_param("ResultToken"),
@ -170,7 +168,7 @@ class ConfigResponse(BaseResponse):
)
return json.dumps(evaluations)
def put_organization_conformance_pack(self):
def put_organization_conformance_pack(self) -> str:
conformance_pack = self.config_backend.put_organization_conformance_pack(
name=self._get_param("OrganizationConformancePackName"),
template_s3_uri=self._get_param("TemplateS3Uri"),
@ -183,19 +181,19 @@ class ConfigResponse(BaseResponse):
return json.dumps(conformance_pack)
def describe_organization_conformance_packs(self):
def describe_organization_conformance_packs(self) -> str:
conformance_packs = self.config_backend.describe_organization_conformance_packs(
self._get_param("OrganizationConformancePackNames")
)
return json.dumps(conformance_packs)
def describe_organization_conformance_pack_statuses(self):
def describe_organization_conformance_pack_statuses(self) -> str:
statuses = self.config_backend.describe_organization_conformance_pack_statuses(
self._get_param("OrganizationConformancePackNames")
)
return json.dumps(statuses)
def get_organization_conformance_pack_detailed_status(self):
def get_organization_conformance_pack_detailed_status(self) -> str:
# 'Filters' parameter is not implemented yet
statuses = (
self.config_backend.get_organization_conformance_pack_detailed_status(
@ -204,36 +202,36 @@ class ConfigResponse(BaseResponse):
)
return json.dumps(statuses)
def delete_organization_conformance_pack(self):
def delete_organization_conformance_pack(self) -> str:
self.config_backend.delete_organization_conformance_pack(
self._get_param("OrganizationConformancePackName")
)
return ""
def tag_resource(self):
def tag_resource(self) -> str:
self.config_backend.tag_resource(
self._get_param("ResourceArn"), self._get_param("Tags")
)
return ""
def untag_resource(self):
def untag_resource(self) -> str:
self.config_backend.untag_resource(
self._get_param("ResourceArn"), self._get_param("TagKeys")
)
return ""
def put_config_rule(self):
def put_config_rule(self) -> str:
self.config_backend.put_config_rule(
self._get_param("ConfigRule"), self._get_param("Tags")
)
return ""
def describe_config_rules(self):
def describe_config_rules(self) -> str:
rules = self.config_backend.describe_config_rules(
self._get_param("ConfigRuleNames"), self._get_param("NextToken")
)
return json.dumps(rules)
def delete_config_rule(self):
def delete_config_rule(self) -> str:
self.config_backend.delete_config_rule(self._get_param("ConfigRuleName"))
return ""

View File

@ -1,5 +1,5 @@
from abc import abstractmethod
from typing import Any, Dict
from typing import Any, Dict, List, Optional
from .base_backend import InstanceTrackerMeta
@ -98,14 +98,14 @@ class ConfigQueryModel:
def list_config_service_resources(
self,
account_id,
resource_ids,
resource_name,
limit,
next_token,
backend_region=None,
resource_region=None,
aggregator=None,
account_id: str,
resource_ids: Optional[List[str]],
resource_name: Optional[str],
limit: int,
next_token: Optional[str],
backend_region: Optional[str] = None,
resource_region: Optional[str] = None,
aggregator: Optional[Dict[str, Any]] = None,
):
"""For AWS Config. This will list all of the resources of the given type and optional resource name and region.
@ -158,12 +158,12 @@ class ConfigQueryModel:
def get_config_resource(
self,
account_id,
resource_id,
resource_name=None,
backend_region=None,
resource_region=None,
):
account_id: str,
resource_id: str,
resource_name: Optional[str] = None,
backend_region: Optional[str] = None,
resource_region: Optional[str] = None,
) -> Dict[str, Any]:
"""For AWS Config. This will query the backend for the specific resource type configuration.
This supports both aggregated, and non-aggregated fetching -- for batched fetching -- the Config batching requests

View File

@ -870,7 +870,7 @@ class AWSServiceSpec(object):
"""
def __init__(self, path):
def __init__(self, path: str):
spec = load_resource("botocore", path)
self.metadata = spec["metadata"]

View File

@ -1,12 +1,14 @@
import pathlib
from typing import Any, Dict
from os import listdir
from ..utils import generic_filter
from moto.utilities.utils import load_resource
from ..exceptions import InvalidFilter, InvalidInstanceTypeError
INSTANCE_TYPES = load_resource(__name__, "../resources/instance_types.json")
INSTANCE_TYPES: Dict[str, Any] = load_resource(
__name__, "../resources/instance_types.json"
)
INSTANCE_FAMILIES = list(set([i.split(".")[0] for i in INSTANCE_TYPES.keys()]))
root = pathlib.Path(__file__).parent

View File

@ -3,7 +3,7 @@ import hashlib
import pkgutil
from collections.abc import MutableMapping
from typing import Any, Dict
from typing import Any, Dict, Union
def str2bool(v):
@ -13,7 +13,9 @@ def str2bool(v):
return False
def load_resource(package, resource, as_json=True):
def load_resource(
package: str, resource: str, as_json: bool = True
) -> Union[Dict[str, Any], str]:
"""
Open a file, and return the contents as JSON.
Usage:

View File

@ -18,7 +18,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild,moto/cloudwatch,moto/codepipeline,moto/codecommit,moto/cognito*,moto/comprehend
files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild,moto/cloudwatch,moto/codepipeline,moto/codecommit,moto/cognito*,moto/comprehend,moto/config
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract