Techdebt: MyPy DS (#5792)

This commit is contained in:
Bert Blommers 2022-12-20 18:25:25 -01:00 committed by GitHub
parent 008d5b958e
commit 027572177d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 125 additions and 98 deletions

View File

@ -1,5 +1,6 @@
"""Exceptions raised by the Directory Service service.""" """Exceptions raised by the Directory Service service."""
from moto.core.exceptions import JsonRESTError from moto.core.exceptions import JsonRESTError
from typing import List, Tuple
class DsValidationException(JsonRESTError): class DsValidationException(JsonRESTError):
@ -7,7 +8,7 @@ class DsValidationException(JsonRESTError):
code = 400 code = 400
def __init__(self, error_tuples): def __init__(self, error_tuples: List[Tuple[str, str, str]]):
"""Validation errors are concatenated into one exception message. """Validation errors are concatenated into one exception message.
error_tuples is a list of tuples. Each tuple contains: error_tuples is a list of tuples. Each tuple contains:
@ -35,7 +36,7 @@ class ClientException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message: str):
super().__init__("ClientException", message) super().__init__("ClientException", message)
@ -44,7 +45,7 @@ class DirectoryLimitExceededException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message: str):
super().__init__("DirectoryLimitExceededException", message) super().__init__("DirectoryLimitExceededException", message)
@ -53,7 +54,7 @@ class EntityDoesNotExistException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message: str):
super().__init__("EntityDoesNotExistException", message) super().__init__("EntityDoesNotExistException", message)
@ -62,7 +63,7 @@ class EntityAlreadyExistsException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message: str):
super().__init__("EntityAlreadyExistsException", message) super().__init__("EntityAlreadyExistsException", message)
@ -71,7 +72,7 @@ class InvalidNextTokenException(JsonRESTError):
code = 400 code = 400
def __init__(self): def __init__(self) -> None:
super().__init__( super().__init__(
"InvalidNextTokenException", "InvalidNextTokenException",
"Invalid value passed for the NextToken parameter", "Invalid value passed for the NextToken parameter",
@ -83,7 +84,7 @@ class InvalidParameterException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message: str):
super().__init__("InvalidParameterException", message) super().__init__("InvalidParameterException", message)
@ -92,7 +93,7 @@ class TagLimitExceededException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message: str):
super().__init__("TagLimitExceededException", message) super().__init__("TagLimitExceededException", message)
@ -101,5 +102,5 @@ class ValidationException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message: str):
super().__init__("ValidationException", message) super().__init__("ValidationException", message)

View File

@ -1,5 +1,6 @@
"""DirectoryServiceBackend class with methods for supported APIs.""" """DirectoryServiceBackend class with methods for supported APIs."""
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any, Dict, Optional, Tuple, List
from moto.core import BaseBackend, BackendDict, BaseModel from moto.core import BaseBackend, BackendDict, BaseModel
from moto.ds.exceptions import ( from moto.ds.exceptions import (
@ -46,17 +47,17 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes
def __init__( def __init__(
self, self,
account_id, account_id: str,
region, region: str,
name, name: str,
password, password: str,
directory_type, directory_type: str,
size=None, size: Optional[str] = None,
vpc_settings=None, vpc_settings: Optional[Dict[str, Any]] = None,
connect_settings=None, connect_settings: Optional[Dict[str, Any]] = None,
short_name=None, short_name: Optional[str] = None,
description=None, description: Optional[str] = None,
edition=None, edition: Optional[str] = None,
): # pylint: disable=too-many-arguments ): # pylint: disable=too-many-arguments
self.account_id = account_id self.account_id = account_id
self.region = region self.region = region
@ -82,26 +83,26 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes
if self.directory_type == "ADConnector": if self.directory_type == "ADConnector":
self.security_group_id = self.create_security_group( self.security_group_id = self.create_security_group(
self.connect_settings["VpcId"] self.connect_settings["VpcId"] # type: ignore[index]
) )
self.eni_ids, self.subnet_ips = self.create_eni( self.eni_ids, self.subnet_ips = self.create_eni(
self.security_group_id, self.connect_settings["SubnetIds"] self.security_group_id, self.connect_settings["SubnetIds"] # type: ignore[index]
) )
self.connect_settings["SecurityGroupId"] = self.security_group_id self.connect_settings["SecurityGroupId"] = self.security_group_id # type: ignore[index]
self.connect_settings["ConnectIps"] = self.subnet_ips self.connect_settings["ConnectIps"] = self.subnet_ips # type: ignore[index]
self.dns_ip_addrs = self.connect_settings["CustomerDnsIps"] self.dns_ip_addrs = self.connect_settings["CustomerDnsIps"] # type: ignore[index]
else: else:
self.security_group_id = self.create_security_group( self.security_group_id = self.create_security_group(
self.vpc_settings["VpcId"] self.vpc_settings["VpcId"] # type: ignore[index]
) )
self.eni_ids, self.subnet_ips = self.create_eni( self.eni_ids, self.subnet_ips = self.create_eni(
self.security_group_id, self.vpc_settings["SubnetIds"] self.security_group_id, self.vpc_settings["SubnetIds"] # type: ignore[index]
) )
self.vpc_settings["SecurityGroupId"] = self.security_group_id self.vpc_settings["SecurityGroupId"] = self.security_group_id # type: ignore[index]
self.dns_ip_addrs = self.subnet_ips self.dns_ip_addrs = self.subnet_ips
def create_security_group(self, vpc_id): def create_security_group(self, vpc_id: str) -> str:
"""Create security group for the network interface.""" """Create security group for the network interface."""
security_group_info = ec2_backends[self.account_id][ security_group_info = ec2_backends[self.account_id][
self.region self.region
@ -115,13 +116,15 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes
) )
return security_group_info.id return security_group_info.id
def delete_security_group(self): def delete_security_group(self) -> None:
"""Delete the given security group.""" """Delete the given security group."""
ec2_backends[self.account_id][self.region].delete_security_group( ec2_backends[self.account_id][self.region].delete_security_group(
group_id=self.security_group_id group_id=self.security_group_id
) )
def create_eni(self, security_group_id, subnet_ids): def create_eni(
self, security_group_id: str, subnet_ids: List[str]
) -> Tuple[List[str], List[str]]:
"""Return ENI ids and primary addresses created for each subnet.""" """Return ENI ids and primary addresses created for each subnet."""
eni_ids = [] eni_ids = []
subnet_ips = [] subnet_ips = []
@ -138,21 +141,21 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes
subnet_ips.append(eni_info.private_ip_address) subnet_ips.append(eni_info.private_ip_address)
return eni_ids, subnet_ips return eni_ids, subnet_ips
def delete_eni(self): def delete_eni(self) -> None:
"""Delete ENI for each subnet and the security group.""" """Delete ENI for each subnet and the security group."""
for eni_id in self.eni_ids: for eni_id in self.eni_ids:
ec2_backends[self.account_id][self.region].delete_network_interface(eni_id) ec2_backends[self.account_id][self.region].delete_network_interface(eni_id)
def update_alias(self, alias): def update_alias(self, alias: str) -> None:
"""Change default alias to given alias.""" """Change default alias to given alias."""
self.alias = alias self.alias = alias
self.access_url = f"{alias}.awsapps.com" self.access_url = f"{alias}.awsapps.com"
def enable_sso(self, new_state): def enable_sso(self, new_state: bool) -> None:
"""Enable/disable sso based on whether new_state is True or False.""" """Enable/disable sso based on whether new_state is True or False."""
self.sso_enabled = new_state self.sso_enabled = new_state
def to_dict(self): def to_dict(self) -> Dict[str, Any]:
"""Create a dictionary of attributes for Directory.""" """Create a dictionary of attributes for Directory."""
attributes = { attributes = {
"AccessUrl": self.access_url, "AccessUrl": self.access_url,
@ -188,19 +191,21 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes
class DirectoryServiceBackend(BaseBackend): class DirectoryServiceBackend(BaseBackend):
"""Implementation of DirectoryService APIs.""" """Implementation of DirectoryService APIs."""
def __init__(self, region_name, account_id): def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id) super().__init__(region_name, account_id)
self.directories = {} self.directories: Dict[str, Directory] = {}
self.tagger = TaggingService() self.tagger = TaggingService()
@staticmethod @staticmethod
def default_vpc_endpoint_service(service_region, zones): def default_vpc_endpoint_service(
service_region: str, zones: List[str]
) -> List[Dict[str, str]]:
"""List of dicts representing default VPC endpoints for this service.""" """List of dicts representing default VPC endpoints for this service."""
return BaseBackend.default_vpc_endpoint_service_factory( return BaseBackend.default_vpc_endpoint_service_factory(
service_region, zones, "ds" service_region, zones, "ds"
) )
def _verify_subnets(self, region, vpc_settings): def _verify_subnets(self, region: str, vpc_settings: Dict[str, Any]) -> None:
"""Verify subnets are valid, else raise an exception. """Verify subnets are valid, else raise an exception.
If settings are valid, add AvailabilityZones to vpc_settings. If settings are valid, add AvailabilityZones to vpc_settings.
@ -237,15 +242,15 @@ class DirectoryServiceBackend(BaseBackend):
def connect_directory( def connect_directory(
self, self,
region, region: str,
name, name: str,
short_name, short_name: str,
password, password: str,
description, description: str,
size, size: str,
connect_settings, connect_settings: Dict[str, Any],
tags, tags: List[Dict[str, str]],
): # pylint: disable=too-many-arguments ) -> str: # pylint: disable=too-many-arguments
"""Create a fake AD Connector.""" """Create a fake AD Connector."""
if len(self.directories) > Directory.CONNECTED_DIRECTORIES_LIMIT: if len(self.directories) > Directory.CONNECTED_DIRECTORIES_LIMIT:
raise DirectoryLimitExceededException( raise DirectoryLimitExceededException(
@ -296,8 +301,16 @@ class DirectoryServiceBackend(BaseBackend):
return directory.directory_id return directory.directory_id
def create_directory( def create_directory(
self, region, name, short_name, password, description, size, vpc_settings, tags self,
): # pylint: disable=too-many-arguments region: str,
name: str,
short_name: str,
password: str,
description: str,
size: str,
vpc_settings: Dict[str, Any],
tags: List[Dict[str, str]],
) -> str: # pylint: disable=too-many-arguments
"""Create a fake Simple Ad Directory.""" """Create a fake Simple Ad Directory."""
if len(self.directories) > Directory.CLOUDONLY_DIRECTORIES_LIMIT: if len(self.directories) > Directory.CLOUDONLY_DIRECTORIES_LIMIT:
raise DirectoryLimitExceededException( raise DirectoryLimitExceededException(
@ -341,7 +354,7 @@ class DirectoryServiceBackend(BaseBackend):
self.tagger.tag_resource(directory.directory_id, tags or []) self.tagger.tag_resource(directory.directory_id, tags or [])
return directory.directory_id return directory.directory_id
def _validate_directory_id(self, directory_id): def _validate_directory_id(self, directory_id: str) -> None:
"""Raise an exception if the directory id is invalid or unknown.""" """Raise an exception if the directory id is invalid or unknown."""
# Validation of ID takes precedence over a check for its existence. # Validation of ID takes precedence over a check for its existence.
validate_args([("directoryId", directory_id)]) validate_args([("directoryId", directory_id)])
@ -350,7 +363,7 @@ class DirectoryServiceBackend(BaseBackend):
f"Directory {directory_id} does not exist" f"Directory {directory_id} does not exist"
) )
def create_alias(self, directory_id, alias): def create_alias(self, directory_id: str, alias: str) -> Dict[str, str]:
"""Create and assign an alias to a directory.""" """Create and assign an alias to a directory."""
self._validate_directory_id(directory_id) self._validate_directory_id(directory_id)
@ -373,15 +386,15 @@ class DirectoryServiceBackend(BaseBackend):
def create_microsoft_ad( def create_microsoft_ad(
self, self,
region, region: str,
name, name: str,
short_name, short_name: str,
password, password: str,
description, description: str,
vpc_settings, vpc_settings: Dict[str, Any],
edition, edition: str,
tags, tags: List[Dict[str, str]],
): # pylint: disable=too-many-arguments ) -> str: # pylint: disable=too-many-arguments
"""Create a fake Microsoft Ad Directory.""" """Create a fake Microsoft Ad Directory."""
if len(self.directories) > Directory.CLOUDONLY_MICROSOFT_AD_LIMIT: if len(self.directories) > Directory.CLOUDONLY_MICROSOFT_AD_LIMIT:
raise DirectoryLimitExceededException( raise DirectoryLimitExceededException(
@ -423,7 +436,7 @@ class DirectoryServiceBackend(BaseBackend):
self.tagger.tag_resource(directory.directory_id, tags or []) self.tagger.tag_resource(directory.directory_id, tags or [])
return directory.directory_id return directory.directory_id
def delete_directory(self, directory_id): def delete_directory(self, directory_id: str) -> str:
"""Delete directory with the matching ID.""" """Delete directory with the matching ID."""
self._validate_directory_id(directory_id) self._validate_directory_id(directory_id)
self.directories[directory_id].delete_eni() self.directories[directory_id].delete_eni()
@ -432,14 +445,24 @@ class DirectoryServiceBackend(BaseBackend):
self.directories.pop(directory_id) self.directories.pop(directory_id)
return directory_id return directory_id
def disable_sso(self, directory_id, username=None, password=None): def disable_sso(
self,
directory_id: str,
username: Optional[str] = None,
password: Optional[str] = None,
) -> None:
"""Disable single-sign on for a directory.""" """Disable single-sign on for a directory."""
self._validate_directory_id(directory_id) self._validate_directory_id(directory_id)
validate_args([("ssoPassword", password), ("userName", username)]) validate_args([("ssoPassword", password), ("userName", username)])
directory = self.directories[directory_id] directory = self.directories[directory_id]
directory.enable_sso(False) directory.enable_sso(False)
def enable_sso(self, directory_id, username=None, password=None): def enable_sso(
self,
directory_id: str,
username: Optional[str] = None,
password: Optional[str] = None,
) -> None:
"""Enable single-sign on for a directory.""" """Enable single-sign on for a directory."""
self._validate_directory_id(directory_id) self._validate_directory_id(directory_id)
validate_args([("ssoPassword", password), ("userName", username)]) validate_args([("ssoPassword", password), ("userName", username)])
@ -454,7 +477,7 @@ class DirectoryServiceBackend(BaseBackend):
directory.enable_sso(True) directory.enable_sso(True)
@paginate(pagination_model=PAGINATION_MODEL) @paginate(pagination_model=PAGINATION_MODEL)
def describe_directories(self, directory_ids=None): def describe_directories(self, directory_ids: Optional[List[str]] = None) -> List[Directory]: # type: ignore[misc]
"""Return info on all directories or directories with matching IDs.""" """Return info on all directories or directories with matching IDs."""
for directory_id in directory_ids or self.directories: for directory_id in directory_ids or self.directories:
self._validate_directory_id(directory_id) self._validate_directory_id(directory_id)
@ -464,7 +487,7 @@ class DirectoryServiceBackend(BaseBackend):
directories = [x for x in directories if x.directory_id in directory_ids] directories = [x for x in directories if x.directory_id in directory_ids]
return sorted(directories, key=lambda x: x.launch_time) return sorted(directories, key=lambda x: x.launch_time)
def get_directory_limits(self): def get_directory_limits(self) -> Dict[str, Any]:
"""Return hard-coded limits for the directories.""" """Return hard-coded limits for the directories."""
counts = {"SimpleAD": 0, "MicrosoftAD": 0, "ConnectedAD": 0} counts = {"SimpleAD": 0, "MicrosoftAD": 0, "ConnectedAD": 0}
for directory in self.directories.values(): for directory in self.directories.values():
@ -490,7 +513,9 @@ class DirectoryServiceBackend(BaseBackend):
== Directory.CONNECTED_DIRECTORIES_LIMIT, == Directory.CONNECTED_DIRECTORIES_LIMIT,
} }
def add_tags_to_resource(self, resource_id, tags): def add_tags_to_resource(
self, resource_id: str, tags: List[Dict[str, str]]
) -> None:
"""Add or overwrite one or more tags for specified directory.""" """Add or overwrite one or more tags for specified directory."""
self._validate_directory_id(resource_id) self._validate_directory_id(resource_id)
errmsg = self.tagger.validate_tags(tags) errmsg = self.tagger.validate_tags(tags)
@ -500,16 +525,16 @@ class DirectoryServiceBackend(BaseBackend):
raise TagLimitExceededException("Tag limit exceeded") raise TagLimitExceededException("Tag limit exceeded")
self.tagger.tag_resource(resource_id, tags) self.tagger.tag_resource(resource_id, tags)
def remove_tags_from_resource(self, resource_id, tag_keys): def remove_tags_from_resource(self, resource_id: str, tag_keys: List[str]) -> None:
"""Removes tags from a directory.""" """Removes tags from a directory."""
self._validate_directory_id(resource_id) self._validate_directory_id(resource_id)
self.tagger.untag_resource_using_names(resource_id, tag_keys) self.tagger.untag_resource_using_names(resource_id, tag_keys)
@paginate(pagination_model=PAGINATION_MODEL) @paginate(pagination_model=PAGINATION_MODEL)
def list_tags_for_resource(self, resource_id): def list_tags_for_resource(self, resource_id: str) -> List[Dict[str, str]]: # type: ignore[misc]
"""List all tags on a directory.""" """List all tags on a directory."""
self._validate_directory_id(resource_id) self._validate_directory_id(resource_id)
return self.tagger.list_tags_for_resource(resource_id).get("Tags") return self.tagger.list_tags_for_resource(resource_id).get("Tags") # type: ignore[return-value]
ds_backends = BackendDict(DirectoryServiceBackend, service_name="ds") ds_backends = BackendDict(DirectoryServiceBackend, service_name="ds")

View File

@ -4,21 +4,21 @@ import json
from moto.core.exceptions import InvalidToken from moto.core.exceptions import InvalidToken
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.ds.exceptions import InvalidNextTokenException from moto.ds.exceptions import InvalidNextTokenException
from moto.ds.models import ds_backends from moto.ds.models import ds_backends, DirectoryServiceBackend
class DirectoryServiceResponse(BaseResponse): class DirectoryServiceResponse(BaseResponse):
"""Handler for DirectoryService requests and responses.""" """Handler for DirectoryService requests and responses."""
def __init__(self): def __init__(self) -> None:
super().__init__(service_name="ds") super().__init__(service_name="ds")
@property @property
def ds_backend(self): def ds_backend(self) -> DirectoryServiceBackend:
"""Return backend instance specific for this region.""" """Return backend instance specific for this region."""
return ds_backends[self.current_account][self.region] return ds_backends[self.current_account][self.region]
def connect_directory(self): def connect_directory(self) -> str:
"""Create an AD Connector to connect to a self-managed directory.""" """Create an AD Connector to connect to a self-managed directory."""
name = self._get_param("Name") name = self._get_param("Name")
short_name = self._get_param("ShortName") short_name = self._get_param("ShortName")
@ -39,7 +39,7 @@ class DirectoryServiceResponse(BaseResponse):
) )
return json.dumps({"DirectoryId": directory_id}) return json.dumps({"DirectoryId": directory_id})
def create_directory(self): def create_directory(self) -> str:
"""Create a Simple AD directory.""" """Create a Simple AD directory."""
name = self._get_param("Name") name = self._get_param("Name")
short_name = self._get_param("ShortName") short_name = self._get_param("ShortName")
@ -60,14 +60,14 @@ class DirectoryServiceResponse(BaseResponse):
) )
return json.dumps({"DirectoryId": directory_id}) return json.dumps({"DirectoryId": directory_id})
def create_alias(self): def create_alias(self) -> str:
"""Create an alias and assign the alias to the directory.""" """Create an alias and assign the alias to the directory."""
directory_id = self._get_param("DirectoryId") directory_id = self._get_param("DirectoryId")
alias = self._get_param("Alias") alias = self._get_param("Alias")
response = self.ds_backend.create_alias(directory_id, alias) response = self.ds_backend.create_alias(directory_id, alias)
return json.dumps(response) return json.dumps(response)
def create_microsoft_ad(self): def create_microsoft_ad(self) -> str:
"""Create a Microsoft AD directory.""" """Create a Microsoft AD directory."""
name = self._get_param("Name") name = self._get_param("Name")
short_name = self._get_param("ShortName") short_name = self._get_param("ShortName")
@ -88,13 +88,13 @@ class DirectoryServiceResponse(BaseResponse):
) )
return json.dumps({"DirectoryId": directory_id}) return json.dumps({"DirectoryId": directory_id})
def delete_directory(self): def delete_directory(self) -> str:
"""Delete a Directory Service directory.""" """Delete a Directory Service directory."""
directory_id_arg = self._get_param("DirectoryId") directory_id_arg = self._get_param("DirectoryId")
directory_id = self.ds_backend.delete_directory(directory_id_arg) directory_id = self.ds_backend.delete_directory(directory_id_arg)
return json.dumps({"DirectoryId": directory_id}) return json.dumps({"DirectoryId": directory_id})
def describe_directories(self): def describe_directories(self) -> str:
"""Return directory info for the given IDs or all IDs.""" """Return directory info for the given IDs or all IDs."""
directory_ids = self._get_param("DirectoryIds") directory_ids = self._get_param("DirectoryIds")
next_token = self._get_param("NextToken") next_token = self._get_param("NextToken")
@ -111,7 +111,7 @@ class DirectoryServiceResponse(BaseResponse):
response["NextToken"] = next_token response["NextToken"] = next_token
return json.dumps(response) return json.dumps(response)
def disable_sso(self): def disable_sso(self) -> str:
"""Disable single-sign on for a directory.""" """Disable single-sign on for a directory."""
directory_id = self._get_param("DirectoryId") directory_id = self._get_param("DirectoryId")
username = self._get_param("UserName") username = self._get_param("UserName")
@ -119,7 +119,7 @@ class DirectoryServiceResponse(BaseResponse):
self.ds_backend.disable_sso(directory_id, username, password) self.ds_backend.disable_sso(directory_id, username, password)
return "" return ""
def enable_sso(self): def enable_sso(self) -> str:
"""Enable single-sign on for a directory.""" """Enable single-sign on for a directory."""
directory_id = self._get_param("DirectoryId") directory_id = self._get_param("DirectoryId")
username = self._get_param("UserName") username = self._get_param("UserName")
@ -127,19 +127,19 @@ class DirectoryServiceResponse(BaseResponse):
self.ds_backend.enable_sso(directory_id, username, password) self.ds_backend.enable_sso(directory_id, username, password)
return "" return ""
def get_directory_limits(self): def get_directory_limits(self) -> str:
"""Return directory limit information for the current region.""" """Return directory limit information for the current region."""
limits = self.ds_backend.get_directory_limits() limits = self.ds_backend.get_directory_limits()
return json.dumps({"DirectoryLimits": limits}) return json.dumps({"DirectoryLimits": limits})
def add_tags_to_resource(self): def add_tags_to_resource(self) -> str:
"""Add or overwrite on or more tags for specified directory.""" """Add or overwrite on or more tags for specified directory."""
resource_id = self._get_param("ResourceId") resource_id = self._get_param("ResourceId")
tags = self._get_param("Tags") tags = self._get_param("Tags")
self.ds_backend.add_tags_to_resource(resource_id=resource_id, tags=tags) self.ds_backend.add_tags_to_resource(resource_id=resource_id, tags=tags)
return "" return ""
def remove_tags_from_resource(self): def remove_tags_from_resource(self) -> str:
"""Removes tags from a directory.""" """Removes tags from a directory."""
resource_id = self._get_param("ResourceId") resource_id = self._get_param("ResourceId")
tag_keys = self._get_param("TagKeys") tag_keys = self._get_param("TagKeys")
@ -148,7 +148,7 @@ class DirectoryServiceResponse(BaseResponse):
) )
return "" return ""
def list_tags_for_resource(self): def list_tags_for_resource(self) -> str:
"""Lists all tags on a directory.""" """Lists all tags on a directory."""
resource_id = self._get_param("ResourceId") resource_id = self._get_param("ResourceId")
next_token = self._get_param("NextToken") next_token = self._get_param("NextToken")

View File

@ -3,11 +3,12 @@
Note that ValidationExceptions are accumulative. Note that ValidationExceptions are accumulative.
""" """
import re import re
from typing import Any
from moto.ds.exceptions import DsValidationException from moto.ds.exceptions import DsValidationException
def validate_args(validators): def validate_args(validators: Any) -> None:
"""Raise exception if any of the validations fails. """Raise exception if any of the validations fails.
validators is a list of tuples each containing the following: validators is a list of tuples each containing the following:
@ -42,7 +43,7 @@ def validate_args(validators):
raise DsValidationException(err_msgs) raise DsValidationException(err_msgs)
def validate_alias(value): def validate_alias(value: str) -> str:
"""Raise exception if alias fails to conform to length and constraints.""" """Raise exception if alias fails to conform to length and constraints."""
if len(value) > 62: if len(value) > 62:
return "have length less than or equal to 62" return "have length less than or equal to 62"
@ -53,14 +54,14 @@ def validate_alias(value):
return "" return ""
def validate_description(value): def validate_description(value: str) -> str:
"""Raise exception if description exceeds length.""" """Raise exception if description exceeds length."""
if value and len(value) > 128: if value and len(value) > 128:
return "have length less than or equal to 128" return "have length less than or equal to 128"
return "" return ""
def validate_directory_id(value): def validate_directory_id(value: str) -> str:
"""Raise exception if the directory id is invalid.""" """Raise exception if the directory id is invalid."""
id_pattern = r"^d-[0-9a-f]{10}$" id_pattern = r"^d-[0-9a-f]{10}$"
if not re.match(id_pattern, value): if not re.match(id_pattern, value):
@ -68,7 +69,7 @@ def validate_directory_id(value):
return "" return ""
def validate_dns_ips(value): def validate_dns_ips(value: str) -> str:
"""Raise exception if DNS IPs fail to match constraints.""" """Raise exception if DNS IPs fail to match constraints."""
dnsip_pattern = ( dnsip_pattern = (
r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}" r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}"
@ -80,14 +81,14 @@ def validate_dns_ips(value):
return "" return ""
def validate_edition(value): def validate_edition(value: str) -> str:
"""Raise exception if edition not one of the allowed values.""" """Raise exception if edition not one of the allowed values."""
if value and value not in ["Enterprise", "Standard"]: if value and value not in ["Enterprise", "Standard"]:
return "satisfy enum value set: [Enterprise, Standard]" return "satisfy enum value set: [Enterprise, Standard]"
return "" return ""
def validate_name(value): def validate_name(value: str) -> str:
"""Raise exception if name fails to match constraints.""" """Raise exception if name fails to match constraints."""
name_pattern = r"^([a-zA-Z0-9]+[\.-])+([a-zA-Z0-9])+$" name_pattern = r"^([a-zA-Z0-9]+[\.-])+([a-zA-Z0-9])+$"
if not re.match(name_pattern, value): if not re.match(name_pattern, value):
@ -95,7 +96,7 @@ def validate_name(value):
return "" return ""
def validate_password(value): def validate_password(value: str) -> str:
"""Raise exception if password fails to match constraints.""" """Raise exception if password fails to match constraints."""
passwd_pattern = ( passwd_pattern = (
r"^(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|" r"^(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|"
@ -108,7 +109,7 @@ def validate_password(value):
return "" return ""
def validate_short_name(value): def validate_short_name(value: str) -> str:
"""Raise exception if short name fails to match constraints.""" """Raise exception if short name fails to match constraints."""
short_name_pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$' short_name_pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$'
if value and not re.match(short_name_pattern, value): if value and not re.match(short_name_pattern, value):
@ -116,21 +117,21 @@ def validate_short_name(value):
return "" return ""
def validate_size(value): def validate_size(value: str) -> str:
"""Raise exception if size fails to match constraints.""" """Raise exception if size fails to match constraints."""
if value.lower() not in ["small", "large"]: if value.lower() not in ["small", "large"]:
return "satisfy enum value set: [Small, Large]" return "satisfy enum value set: [Small, Large]"
return "" return ""
def validate_sso_password(value): def validate_sso_password(value: str) -> str:
"""Raise exception is SSO password exceeds length.""" """Raise exception is SSO password exceeds length."""
if value and len(value) > 128: if value and len(value) > 128:
return "have length less than or equal to 128" return "have length less than or equal to 128"
return "" return ""
def validate_subnet_ids(value): def validate_subnet_ids(value: str) -> str:
"""Raise exception is subnet IDs fail to match constraints.""" """Raise exception is subnet IDs fail to match constraints."""
subnet_id_pattern = r"^(subnet-[0-9a-f]{8}|subnet-[0-9a-f]{17})$" subnet_id_pattern = r"^(subnet-[0-9a-f]{8}|subnet-[0-9a-f]{17})$"
for subnet in value: for subnet in value:
@ -139,7 +140,7 @@ def validate_subnet_ids(value):
return "" return ""
def validate_user_name(value): def validate_user_name(value: str) -> str:
"""Raise exception is username fails to match constraints.""" """Raise exception is username fails to match constraints."""
username_pattern = r"^[a-zA-Z0-9._-]+$" username_pattern = r"^[a-zA-Z0-9._-]+$"
if value and not re.match(username_pattern, value): if value and not re.match(username_pattern, value):