diff --git a/moto/ds/exceptions.py b/moto/ds/exceptions.py index b5d1ff05f..ac1dbbc9c 100644 --- a/moto/ds/exceptions.py +++ b/moto/ds/exceptions.py @@ -1,5 +1,6 @@ """Exceptions raised by the Directory Service service.""" from moto.core.exceptions import JsonRESTError +from typing import List, Tuple class DsValidationException(JsonRESTError): @@ -7,7 +8,7 @@ class DsValidationException(JsonRESTError): code = 400 - def __init__(self, error_tuples): + def __init__(self, error_tuples: List[Tuple[str, str, str]]): """Validation errors are concatenated into one exception message. error_tuples is a list of tuples. Each tuple contains: @@ -35,7 +36,7 @@ class ClientException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("ClientException", message) @@ -44,7 +45,7 @@ class DirectoryLimitExceededException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("DirectoryLimitExceededException", message) @@ -53,7 +54,7 @@ class EntityDoesNotExistException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("EntityDoesNotExistException", message) @@ -62,7 +63,7 @@ class EntityAlreadyExistsException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("EntityAlreadyExistsException", message) @@ -71,7 +72,7 @@ class InvalidNextTokenException(JsonRESTError): code = 400 - def __init__(self): + def __init__(self) -> None: super().__init__( "InvalidNextTokenException", "Invalid value passed for the NextToken parameter", @@ -83,7 +84,7 @@ class InvalidParameterException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("InvalidParameterException", message) @@ -92,7 +93,7 @@ class TagLimitExceededException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("TagLimitExceededException", message) @@ -101,5 +102,5 @@ class ValidationException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("ValidationException", message) diff --git a/moto/ds/models.py b/moto/ds/models.py index 13126cb69..e2e987115 100644 --- a/moto/ds/models.py +++ b/moto/ds/models.py @@ -1,5 +1,6 @@ """DirectoryServiceBackend class with methods for supported APIs.""" from datetime import datetime, timezone +from typing import Any, Dict, Optional, Tuple, List from moto.core import BaseBackend, BackendDict, BaseModel from moto.ds.exceptions import ( @@ -46,17 +47,17 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes def __init__( self, - account_id, - region, - name, - password, - directory_type, - size=None, - vpc_settings=None, - connect_settings=None, - short_name=None, - description=None, - edition=None, + account_id: str, + region: str, + name: str, + password: str, + directory_type: str, + size: Optional[str] = None, + vpc_settings: Optional[Dict[str, Any]] = None, + connect_settings: Optional[Dict[str, Any]] = None, + short_name: Optional[str] = None, + description: Optional[str] = None, + edition: Optional[str] = None, ): # pylint: disable=too-many-arguments self.account_id = account_id self.region = region @@ -82,26 +83,26 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes if self.directory_type == "ADConnector": self.security_group_id = self.create_security_group( - self.connect_settings["VpcId"] + self.connect_settings["VpcId"] # type: ignore[index] ) self.eni_ids, self.subnet_ips = self.create_eni( - self.security_group_id, self.connect_settings["SubnetIds"] + self.security_group_id, self.connect_settings["SubnetIds"] # type: ignore[index] ) - self.connect_settings["SecurityGroupId"] = self.security_group_id - self.connect_settings["ConnectIps"] = self.subnet_ips - self.dns_ip_addrs = self.connect_settings["CustomerDnsIps"] + self.connect_settings["SecurityGroupId"] = self.security_group_id # type: ignore[index] + self.connect_settings["ConnectIps"] = self.subnet_ips # type: ignore[index] + self.dns_ip_addrs = self.connect_settings["CustomerDnsIps"] # type: ignore[index] else: self.security_group_id = self.create_security_group( - self.vpc_settings["VpcId"] + self.vpc_settings["VpcId"] # type: ignore[index] ) self.eni_ids, self.subnet_ips = self.create_eni( - self.security_group_id, self.vpc_settings["SubnetIds"] + self.security_group_id, self.vpc_settings["SubnetIds"] # type: ignore[index] ) - self.vpc_settings["SecurityGroupId"] = self.security_group_id + self.vpc_settings["SecurityGroupId"] = self.security_group_id # type: ignore[index] self.dns_ip_addrs = self.subnet_ips - def create_security_group(self, vpc_id): + def create_security_group(self, vpc_id: str) -> str: """Create security group for the network interface.""" security_group_info = ec2_backends[self.account_id][ self.region @@ -115,13 +116,15 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes ) return security_group_info.id - def delete_security_group(self): + def delete_security_group(self) -> None: """Delete the given security group.""" ec2_backends[self.account_id][self.region].delete_security_group( group_id=self.security_group_id ) - def create_eni(self, security_group_id, subnet_ids): + def create_eni( + self, security_group_id: str, subnet_ids: List[str] + ) -> Tuple[List[str], List[str]]: """Return ENI ids and primary addresses created for each subnet.""" eni_ids = [] subnet_ips = [] @@ -138,21 +141,21 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes subnet_ips.append(eni_info.private_ip_address) return eni_ids, subnet_ips - def delete_eni(self): + def delete_eni(self) -> None: """Delete ENI for each subnet and the security group.""" for eni_id in self.eni_ids: ec2_backends[self.account_id][self.region].delete_network_interface(eni_id) - def update_alias(self, alias): + def update_alias(self, alias: str) -> None: """Change default alias to given alias.""" self.alias = alias self.access_url = f"{alias}.awsapps.com" - def enable_sso(self, new_state): + def enable_sso(self, new_state: bool) -> None: """Enable/disable sso based on whether new_state is True or False.""" self.sso_enabled = new_state - def to_dict(self): + def to_dict(self) -> Dict[str, Any]: """Create a dictionary of attributes for Directory.""" attributes = { "AccessUrl": self.access_url, @@ -188,19 +191,21 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes class DirectoryServiceBackend(BaseBackend): """Implementation of DirectoryService APIs.""" - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.directories = {} + self.directories: Dict[str, Directory] = {} self.tagger = TaggingService() @staticmethod - def default_vpc_endpoint_service(service_region, zones): + def default_vpc_endpoint_service( + service_region: str, zones: List[str] + ) -> List[Dict[str, str]]: """List of dicts representing default VPC endpoints for this service.""" return BaseBackend.default_vpc_endpoint_service_factory( service_region, zones, "ds" ) - def _verify_subnets(self, region, vpc_settings): + def _verify_subnets(self, region: str, vpc_settings: Dict[str, Any]) -> None: """Verify subnets are valid, else raise an exception. If settings are valid, add AvailabilityZones to vpc_settings. @@ -237,15 +242,15 @@ class DirectoryServiceBackend(BaseBackend): def connect_directory( self, - region, - name, - short_name, - password, - description, - size, - connect_settings, - tags, - ): # pylint: disable=too-many-arguments + region: str, + name: str, + short_name: str, + password: str, + description: str, + size: str, + connect_settings: Dict[str, Any], + tags: List[Dict[str, str]], + ) -> str: # pylint: disable=too-many-arguments """Create a fake AD Connector.""" if len(self.directories) > Directory.CONNECTED_DIRECTORIES_LIMIT: raise DirectoryLimitExceededException( @@ -296,8 +301,16 @@ class DirectoryServiceBackend(BaseBackend): return directory.directory_id def create_directory( - self, region, name, short_name, password, description, size, vpc_settings, tags - ): # pylint: disable=too-many-arguments + self, + region: str, + name: str, + short_name: str, + password: str, + description: str, + size: str, + vpc_settings: Dict[str, Any], + tags: List[Dict[str, str]], + ) -> str: # pylint: disable=too-many-arguments """Create a fake Simple Ad Directory.""" if len(self.directories) > Directory.CLOUDONLY_DIRECTORIES_LIMIT: raise DirectoryLimitExceededException( @@ -341,7 +354,7 @@ class DirectoryServiceBackend(BaseBackend): self.tagger.tag_resource(directory.directory_id, tags or []) return directory.directory_id - def _validate_directory_id(self, directory_id): + def _validate_directory_id(self, directory_id: str) -> None: """Raise an exception if the directory id is invalid or unknown.""" # Validation of ID takes precedence over a check for its existence. validate_args([("directoryId", directory_id)]) @@ -350,7 +363,7 @@ class DirectoryServiceBackend(BaseBackend): f"Directory {directory_id} does not exist" ) - def create_alias(self, directory_id, alias): + def create_alias(self, directory_id: str, alias: str) -> Dict[str, str]: """Create and assign an alias to a directory.""" self._validate_directory_id(directory_id) @@ -373,15 +386,15 @@ class DirectoryServiceBackend(BaseBackend): def create_microsoft_ad( self, - region, - name, - short_name, - password, - description, - vpc_settings, - edition, - tags, - ): # pylint: disable=too-many-arguments + region: str, + name: str, + short_name: str, + password: str, + description: str, + vpc_settings: Dict[str, Any], + edition: str, + tags: List[Dict[str, str]], + ) -> str: # pylint: disable=too-many-arguments """Create a fake Microsoft Ad Directory.""" if len(self.directories) > Directory.CLOUDONLY_MICROSOFT_AD_LIMIT: raise DirectoryLimitExceededException( @@ -423,7 +436,7 @@ class DirectoryServiceBackend(BaseBackend): self.tagger.tag_resource(directory.directory_id, tags or []) return directory.directory_id - def delete_directory(self, directory_id): + def delete_directory(self, directory_id: str) -> str: """Delete directory with the matching ID.""" self._validate_directory_id(directory_id) self.directories[directory_id].delete_eni() @@ -432,14 +445,24 @@ class DirectoryServiceBackend(BaseBackend): self.directories.pop(directory_id) return directory_id - def disable_sso(self, directory_id, username=None, password=None): + def disable_sso( + self, + directory_id: str, + username: Optional[str] = None, + password: Optional[str] = None, + ) -> None: """Disable single-sign on for a directory.""" self._validate_directory_id(directory_id) validate_args([("ssoPassword", password), ("userName", username)]) directory = self.directories[directory_id] directory.enable_sso(False) - def enable_sso(self, directory_id, username=None, password=None): + def enable_sso( + self, + directory_id: str, + username: Optional[str] = None, + password: Optional[str] = None, + ) -> None: """Enable single-sign on for a directory.""" self._validate_directory_id(directory_id) validate_args([("ssoPassword", password), ("userName", username)]) @@ -454,7 +477,7 @@ class DirectoryServiceBackend(BaseBackend): directory.enable_sso(True) @paginate(pagination_model=PAGINATION_MODEL) - def describe_directories(self, directory_ids=None): + def describe_directories(self, directory_ids: Optional[List[str]] = None) -> List[Directory]: # type: ignore[misc] """Return info on all directories or directories with matching IDs.""" for directory_id in directory_ids or self.directories: self._validate_directory_id(directory_id) @@ -464,7 +487,7 @@ class DirectoryServiceBackend(BaseBackend): directories = [x for x in directories if x.directory_id in directory_ids] return sorted(directories, key=lambda x: x.launch_time) - def get_directory_limits(self): + def get_directory_limits(self) -> Dict[str, Any]: """Return hard-coded limits for the directories.""" counts = {"SimpleAD": 0, "MicrosoftAD": 0, "ConnectedAD": 0} for directory in self.directories.values(): @@ -490,7 +513,9 @@ class DirectoryServiceBackend(BaseBackend): == Directory.CONNECTED_DIRECTORIES_LIMIT, } - def add_tags_to_resource(self, resource_id, tags): + def add_tags_to_resource( + self, resource_id: str, tags: List[Dict[str, str]] + ) -> None: """Add or overwrite one or more tags for specified directory.""" self._validate_directory_id(resource_id) errmsg = self.tagger.validate_tags(tags) @@ -500,16 +525,16 @@ class DirectoryServiceBackend(BaseBackend): raise TagLimitExceededException("Tag limit exceeded") self.tagger.tag_resource(resource_id, tags) - def remove_tags_from_resource(self, resource_id, tag_keys): + def remove_tags_from_resource(self, resource_id: str, tag_keys: List[str]) -> None: """Removes tags from a directory.""" self._validate_directory_id(resource_id) self.tagger.untag_resource_using_names(resource_id, tag_keys) @paginate(pagination_model=PAGINATION_MODEL) - def list_tags_for_resource(self, resource_id): + def list_tags_for_resource(self, resource_id: str) -> List[Dict[str, str]]: # type: ignore[misc] """List all tags on a directory.""" self._validate_directory_id(resource_id) - return self.tagger.list_tags_for_resource(resource_id).get("Tags") + return self.tagger.list_tags_for_resource(resource_id).get("Tags") # type: ignore[return-value] ds_backends = BackendDict(DirectoryServiceBackend, service_name="ds") diff --git a/moto/ds/responses.py b/moto/ds/responses.py index 0516a4d97..088f94ddb 100644 --- a/moto/ds/responses.py +++ b/moto/ds/responses.py @@ -4,21 +4,21 @@ import json from moto.core.exceptions import InvalidToken from moto.core.responses import BaseResponse from moto.ds.exceptions import InvalidNextTokenException -from moto.ds.models import ds_backends +from moto.ds.models import ds_backends, DirectoryServiceBackend class DirectoryServiceResponse(BaseResponse): """Handler for DirectoryService requests and responses.""" - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="ds") @property - def ds_backend(self): + def ds_backend(self) -> DirectoryServiceBackend: """Return backend instance specific for this region.""" return ds_backends[self.current_account][self.region] - def connect_directory(self): + def connect_directory(self) -> str: """Create an AD Connector to connect to a self-managed directory.""" name = self._get_param("Name") short_name = self._get_param("ShortName") @@ -39,7 +39,7 @@ class DirectoryServiceResponse(BaseResponse): ) return json.dumps({"DirectoryId": directory_id}) - def create_directory(self): + def create_directory(self) -> str: """Create a Simple AD directory.""" name = self._get_param("Name") short_name = self._get_param("ShortName") @@ -60,14 +60,14 @@ class DirectoryServiceResponse(BaseResponse): ) return json.dumps({"DirectoryId": directory_id}) - def create_alias(self): + def create_alias(self) -> str: """Create an alias and assign the alias to the directory.""" directory_id = self._get_param("DirectoryId") alias = self._get_param("Alias") response = self.ds_backend.create_alias(directory_id, alias) return json.dumps(response) - def create_microsoft_ad(self): + def create_microsoft_ad(self) -> str: """Create a Microsoft AD directory.""" name = self._get_param("Name") short_name = self._get_param("ShortName") @@ -88,13 +88,13 @@ class DirectoryServiceResponse(BaseResponse): ) return json.dumps({"DirectoryId": directory_id}) - def delete_directory(self): + def delete_directory(self) -> str: """Delete a Directory Service directory.""" directory_id_arg = self._get_param("DirectoryId") directory_id = self.ds_backend.delete_directory(directory_id_arg) return json.dumps({"DirectoryId": directory_id}) - def describe_directories(self): + def describe_directories(self) -> str: """Return directory info for the given IDs or all IDs.""" directory_ids = self._get_param("DirectoryIds") next_token = self._get_param("NextToken") @@ -111,7 +111,7 @@ class DirectoryServiceResponse(BaseResponse): response["NextToken"] = next_token return json.dumps(response) - def disable_sso(self): + def disable_sso(self) -> str: """Disable single-sign on for a directory.""" directory_id = self._get_param("DirectoryId") username = self._get_param("UserName") @@ -119,7 +119,7 @@ class DirectoryServiceResponse(BaseResponse): self.ds_backend.disable_sso(directory_id, username, password) return "" - def enable_sso(self): + def enable_sso(self) -> str: """Enable single-sign on for a directory.""" directory_id = self._get_param("DirectoryId") username = self._get_param("UserName") @@ -127,19 +127,19 @@ class DirectoryServiceResponse(BaseResponse): self.ds_backend.enable_sso(directory_id, username, password) return "" - def get_directory_limits(self): + def get_directory_limits(self) -> str: """Return directory limit information for the current region.""" limits = self.ds_backend.get_directory_limits() return json.dumps({"DirectoryLimits": limits}) - def add_tags_to_resource(self): + def add_tags_to_resource(self) -> str: """Add or overwrite on or more tags for specified directory.""" resource_id = self._get_param("ResourceId") tags = self._get_param("Tags") self.ds_backend.add_tags_to_resource(resource_id=resource_id, tags=tags) return "" - def remove_tags_from_resource(self): + def remove_tags_from_resource(self) -> str: """Removes tags from a directory.""" resource_id = self._get_param("ResourceId") tag_keys = self._get_param("TagKeys") @@ -148,7 +148,7 @@ class DirectoryServiceResponse(BaseResponse): ) return "" - def list_tags_for_resource(self): + def list_tags_for_resource(self) -> str: """Lists all tags on a directory.""" resource_id = self._get_param("ResourceId") next_token = self._get_param("NextToken") diff --git a/moto/ds/validations.py b/moto/ds/validations.py index 03d6b4216..e44fc3f8e 100644 --- a/moto/ds/validations.py +++ b/moto/ds/validations.py @@ -3,11 +3,12 @@ Note that ValidationExceptions are accumulative. """ import re +from typing import Any from moto.ds.exceptions import DsValidationException -def validate_args(validators): +def validate_args(validators: Any) -> None: """Raise exception if any of the validations fails. validators is a list of tuples each containing the following: @@ -42,7 +43,7 @@ def validate_args(validators): raise DsValidationException(err_msgs) -def validate_alias(value): +def validate_alias(value: str) -> str: """Raise exception if alias fails to conform to length and constraints.""" if len(value) > 62: return "have length less than or equal to 62" @@ -53,14 +54,14 @@ def validate_alias(value): return "" -def validate_description(value): +def validate_description(value: str) -> str: """Raise exception if description exceeds length.""" if value and len(value) > 128: return "have length less than or equal to 128" return "" -def validate_directory_id(value): +def validate_directory_id(value: str) -> str: """Raise exception if the directory id is invalid.""" id_pattern = r"^d-[0-9a-f]{10}$" if not re.match(id_pattern, value): @@ -68,7 +69,7 @@ def validate_directory_id(value): return "" -def validate_dns_ips(value): +def validate_dns_ips(value: str) -> str: """Raise exception if DNS IPs fail to match constraints.""" dnsip_pattern = ( r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}" @@ -80,14 +81,14 @@ def validate_dns_ips(value): return "" -def validate_edition(value): +def validate_edition(value: str) -> str: """Raise exception if edition not one of the allowed values.""" if value and value not in ["Enterprise", "Standard"]: return "satisfy enum value set: [Enterprise, Standard]" return "" -def validate_name(value): +def validate_name(value: str) -> str: """Raise exception if name fails to match constraints.""" name_pattern = r"^([a-zA-Z0-9]+[\.-])+([a-zA-Z0-9])+$" if not re.match(name_pattern, value): @@ -95,7 +96,7 @@ def validate_name(value): return "" -def validate_password(value): +def validate_password(value: str) -> str: """Raise exception if password fails to match constraints.""" passwd_pattern = ( r"^(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|" @@ -108,7 +109,7 @@ def validate_password(value): return "" -def validate_short_name(value): +def validate_short_name(value: str) -> str: """Raise exception if short name fails to match constraints.""" short_name_pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$' if value and not re.match(short_name_pattern, value): @@ -116,21 +117,21 @@ def validate_short_name(value): return "" -def validate_size(value): +def validate_size(value: str) -> str: """Raise exception if size fails to match constraints.""" if value.lower() not in ["small", "large"]: return "satisfy enum value set: [Small, Large]" return "" -def validate_sso_password(value): +def validate_sso_password(value: str) -> str: """Raise exception is SSO password exceeds length.""" if value and len(value) > 128: return "have length less than or equal to 128" return "" -def validate_subnet_ids(value): +def validate_subnet_ids(value: str) -> str: """Raise exception is subnet IDs fail to match constraints.""" subnet_id_pattern = r"^(subnet-[0-9a-f]{8}|subnet-[0-9a-f]{17})$" for subnet in value: @@ -139,7 +140,7 @@ def validate_subnet_ids(value): return "" -def validate_user_name(value): +def validate_user_name(value: str) -> str: """Raise exception is username fails to match constraints.""" username_pattern = r"^[a-zA-Z0-9._-]+$" if value and not re.match(username_pattern, value):