diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index fb2e5f17d..35726e03c 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -916,20 +916,20 @@ ## ds
-11% implemented +19% implemented - [ ] accept_shared_directory - [ ] add_ip_routes - [ ] add_region - [X] add_tags_to_resource - [ ] cancel_schema_extension -- [ ] connect_directory -- [ ] create_alias +- [X] connect_directory +- [X] create_alias - [ ] create_computer - [ ] create_conditional_forwarder - [X] create_directory - [ ] create_log_subscription -- [ ] create_microsoft_ad +- [X] create_microsoft_ad - [ ] create_snapshot - [ ] create_trust - [ ] delete_conditional_forwarder @@ -953,11 +953,11 @@ - [ ] disable_client_authentication - [ ] disable_ldaps - [ ] disable_radius -- [ ] disable_sso +- [X] disable_sso - [ ] enable_client_authentication - [ ] enable_ldaps - [ ] enable_radius -- [ ] enable_sso +- [X] enable_sso - [X] get_directory_limits - [ ] get_snapshot_limits - [ ] list_certificates @@ -4768,4 +4768,4 @@ - workmailmessageflow - workspaces - xray -
\ No newline at end of file + diff --git a/moto/ds/exceptions.py b/moto/ds/exceptions.py index 5597f023c..0497eaa37 100644 --- a/moto/ds/exceptions.py +++ b/moto/ds/exceptions.py @@ -57,6 +57,15 @@ class EntityDoesNotExistException(JsonRESTError): super().__init__("EntityDoesNotExistException", message) +class EntityAlreadyExistsException(JsonRESTError): + """The specified entity already exists.""" + + code = 400 + + def __init__(self, message): + super().__init__("EntityAlreadyExistsException", message) + + class InvalidNextTokenException(JsonRESTError): """Invalid next token parameter used to return a list of entities.""" diff --git a/moto/ds/models.py b/moto/ds/models.py index 405a8584e..84199a350 100644 --- a/moto/ds/models.py +++ b/moto/ds/models.py @@ -1,6 +1,6 @@ """DirectoryServiceBackend class with methods for supported APIs.""" from datetime import datetime, timezone -import re +import ipaddress from boto3 import Session @@ -9,20 +9,49 @@ from moto.core.utils import get_random_hex from moto.ds.exceptions import ( ClientException, DirectoryLimitExceededException, + EntityAlreadyExistsException, EntityDoesNotExistException, - DsValidationException, InvalidParameterException, TagLimitExceededException, ValidationException, ) +from moto.ds.utils import PAGINATION_MODEL +from moto.ds.validations import ( + validate_args, + validate_alias, + validate_description, + validate_directory_id, + validate_dns_ips, + validate_edition, + validate_name, + validate_password, + validate_short_name, + validate_size, + validate_sso_password, + validate_subnet_ids, + validate_user_name, +) from moto.ec2.exceptions import InvalidSubnetIdError from moto.utilities.paginator import paginate from moto.utilities.tagging_service import TaggingService -from .utils import PAGINATION_MODEL class Directory(BaseModel): # pylint: disable=too-many-instance-attributes - """Representation of a Simple AD Directory.""" + """Representation of a Simple AD Directory. + + When the "create" API for a Simple AD or a Microsoft AD directory is + invoked, two domain controllers and a DNS server are supposed to be + created. That is NOT done for the fake directories. + + However, the DnsIpAddrs attribute is supposed to contain the IP addresses + of the DNS servers. For a AD Connecter, the DnsIpAddrs are provided when + the directory is created, but the ConnectSettings.ConnectIps values should + contain the IP addresses of the DNS servers or domain controllers in the + directory to which the AD connector is connected. + + Instead, the dns_ip_addrs attribute or ConnectIPs attribute for the fake + directories will contain IPs picked from the subnets' CIDR blocks. + """ # The assumption here is that the limits are the same for all regions. CLOUDONLY_DIRECTORIES_LIMIT = 10 @@ -35,19 +64,24 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes self, name, password, - size, - vpc_settings, directory_type, + subnets, + size=None, + vpc_settings=None, + connect_settings=None, short_name=None, description=None, + edition=None, ): # pylint: disable=too-many-arguments self.name = name self.password = password + self.directory_type = directory_type self.size = size self.vpc_settings = vpc_settings - self.directory_type = directory_type + self.connect_settings = connect_settings self.short_name = short_name self.description = description + self.edition = edition # Calculated or default values for the directory attributes. self.directory_id = f"d-{get_random_hex(10)}" @@ -59,6 +93,36 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes self.launch_time = datetime.now(timezone.utc).isoformat() self.stage_last_updated_date_time = datetime.now(timezone.utc).isoformat() + if directory_type != "ADConnector": + self.dns_ip_addrs = self.subnet_ips(subnets) + else: + self.dns_ip_addrs = self.connect_settings["CustomerDnsIps"] + self.connect_settings["ConnectIps"] = self.subnet_ips(subnets) + + @staticmethod + def subnet_ips(subnets): + """Return an IP from each of the given subnets. + + This is a bit dodgey and may need to be reworked at a later time. + """ + ip_addrs = [] + for subnet in subnets: + ips = ipaddress.IPv4Network(subnet.cidr_block) + # Not sure if the following could occur, but if it does, + # the situation will be ignored. + if ips: + ip_addrs.append(str(ips[1]) if ips.num_addresses > 1 else str(ips[0])) + return ip_addrs + + def update_alias(self, alias): + """Change default alias to given alias.""" + self.alias = alias + self.access_url = f"{alias}.awsapps.com" + + def enable_sso(self, new_state): + """Enable/disable sso based on whether new_state is True or False.""" + self.sso_enabled = new_state + def to_json(self): """Convert the attributes into json with CamelCase tags.""" replacement_keys = {"directory_type": "Type"} @@ -73,9 +137,11 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes if item in replacement_keys: json_result[replacement_keys[item]] = value else: - parts = item.split("_") - new_tag = "".join(x.title() for x in parts) + new_tag = "".join(x.title() for x in item.split("_")) json_result[new_tag] = value + + if json_result["ConnectSettings"]: + json_result["ConnectSettings"]["CustomerDnsIps"] = None return json_result @@ -101,75 +167,8 @@ class DirectoryServiceBackend(BaseBackend): ) @staticmethod - def _validate_create_directory_args( - name, passwd, size, vpc_settings, description, short_name, - ): # pylint: disable=too-many-arguments - """Raise exception if create_directory() args don't meet constraints. - - The error messages are accumulated before the exception is raised. - """ - error_tuples = [] - passwd_pattern = ( - r"(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|" - r"(?=.*\d)(?=.*[^A-Za-z0-9\s])(?=.*[a-z])|" - r"(?=.*[^A-Za-z0-9\s])(?=.*[A-Z])(?=.*[a-z])|" - r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*" - ) - if not re.match(passwd_pattern, passwd): - # Can't have an odd number of backslashes in a literal. - json_pattern = passwd_pattern.replace("\\", r"\\") - error_tuples.append( - ( - "password", - passwd, - fr"satisfy regular expression pattern: {json_pattern}", - ) - ) - - if size.lower() not in ["small", "large"]: - error_tuples.append( - ("size", size, "satisfy enum value set: [Small, Large]") - ) - - name_pattern = r"^([a-zA-Z0-9]+[\\.-])+([a-zA-Z0-9])+$" - if not re.match(name_pattern, name): - error_tuples.append( - ("name", name, fr"satisfy regular expression pattern: {name_pattern}") - ) - - subnet_id_pattern = r"^(subnet-[0-9a-f]{8}|subnet-[0-9a-f]{17})$" - for subnet in vpc_settings["SubnetIds"]: - if not re.match(subnet_id_pattern, subnet): - error_tuples.append( - ( - "vpcSettings.subnetIds", - subnet, - fr"satisfy regular expression pattern: {subnet_id_pattern}", - ) - ) - - if description and len(description) > 128: - error_tuples.append( - ("description", description, "have length less than or equal to 128") - ) - - short_name_pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$' - if short_name and not re.match(short_name_pattern, short_name): - json_pattern = short_name_pattern.replace("\\", r"\\").replace('"', r"\"") - error_tuples.append( - ( - "shortName", - short_name, - fr"satisfy regular expression pattern: {json_pattern}", - ) - ) - - if error_tuples: - raise DsValidationException(error_tuples) - - @staticmethod - def _validate_vpc_setting_values(region, vpc_settings): - """Raise exception if vpc_settings are invalid. + def _get_subnets(region, vpc_settings): + """Return subnets if vpc_settings are invalid, else raise an exception. If settings are valid, add AvailabilityZones to vpc_settings. """ @@ -205,6 +204,72 @@ class DirectoryServiceBackend(BaseBackend): raise ClientException("Invalid VPC ID.") vpc_settings["AvailabilityZones"] = regions + return subnets + + def connect_directory( + self, + region, + name, + short_name, + password, + description, + size, + connect_settings, + tags, + ): # pylint: disable=too-many-arguments + """Create a fake AD Connector.""" + if len(self.directories) > Directory.CONNECTED_DIRECTORIES_LIMIT: + raise DirectoryLimitExceededException( + f"Directory limit exceeded. A maximum of " + f"{Directory.CONNECTED_DIRECTORIES_LIMIT} directories may be created" + ) + + validate_args( + [ + (validate_password, "password", password), + (validate_size, "size", size), + (validate_name, "name", name), + (validate_description, "description", description), + (validate_short_name, "shortName", short_name), + ( + validate_subnet_ids, + "connectSettings.vpcSettings.subnetIds", + connect_settings["SubnetIds"], + ), + ( + validate_user_name, + "connectSettings.customerUserName", + connect_settings["CustomerUserName"], + ), + ( + validate_dns_ips, + "connectSettings.customerDnsIps", + connect_settings["CustomerDnsIps"], + ), + ] + ) + # ConnectSettings and VpcSettings both have a VpcId and Subnets. + subnets = self._get_subnets(region, connect_settings) + + errmsg = self.tagger.validate_tags(tags or []) + if errmsg: + raise ValidationException(errmsg) + if len(tags) > Directory.MAX_TAGS_PER_DIRECTORY: + raise DirectoryLimitExceededException("Tag Limit is exceeding") + + directory = Directory( + name, + password, + "ADConnector", + subnets, + size=size, + connect_settings=connect_settings, + short_name=short_name, + description=description, + ) + self.directories[directory.directory_id] = directory + self.tagger.tag_resource(directory.directory_id, tags or []) + return directory.directory_id def create_directory( self, region, name, short_name, password, description, size, vpc_settings, tags @@ -219,25 +284,35 @@ class DirectoryServiceBackend(BaseBackend): # botocore doesn't look for missing vpc_settings, but boto3 does. if not vpc_settings: raise InvalidParameterException("VpcSettings must be specified.") - - self._validate_create_directory_args( - name, password, size, vpc_settings, description, short_name, + validate_args( + [ + (validate_password, "password", password), + (validate_size, "size", size), + (validate_name, "name", name), + (validate_description, "description", description), + (validate_short_name, "shortName", short_name), + ( + validate_subnet_ids, + "vpcSettings.subnetIds", + vpc_settings["SubnetIds"], + ), + ] ) - self._validate_vpc_setting_values(region, vpc_settings) + subnets = self._get_subnets(region, vpc_settings) errmsg = self.tagger.validate_tags(tags or []) if errmsg: raise ValidationException(errmsg) - if len(tags) > Directory.MAX_TAGS_PER_DIRECTORY: raise DirectoryLimitExceededException("Tag Limit is exceeding") directory = Directory( name, password, - size, - vpc_settings, - directory_type="SimpleAD", + "SimpleAD", + subnets, + size=size, + vpc_settings=vpc_settings, short_name=short_name, description=description, ) @@ -248,23 +323,89 @@ class DirectoryServiceBackend(BaseBackend): def _validate_directory_id(self, directory_id): """Raise an exception if the directory id is invalid or unknown.""" # Validation of ID takes precedence over a check for its existence. - id_pattern = r"^d-[0-9a-f]{10}$" - if not re.match(id_pattern, directory_id): - raise DsValidationException( - [ - ( - "directoryId", - directory_id, - fr"satisfy regular expression pattern: {id_pattern}", - ) - ] - ) - + validate_args([(validate_directory_id, "directoryId", directory_id)]) if directory_id not in self.directories: raise EntityDoesNotExistException( f"Directory {directory_id} does not exist" ) + def create_alias(self, directory_id, alias): + """Create and assign an alias to a directory.""" + self._validate_directory_id(directory_id) + + # The default alias name is the same as the directory name. Check + # whether this directory was already given an alias. + directory = self.directories[directory_id] + if directory.alias != directory_id: + raise InvalidParameterException( + "The directory in the request already has an alias. That " + "alias must be deleted before a new alias can be created." + ) + + # Is the alias already in use? + if alias in [x.alias for x in self.directories.values()]: + raise EntityAlreadyExistsException(f"Alias '{alias}' already exists.") + + validate_args([(validate_alias, "alias", alias)]) + + directory.update_alias(alias) + return {"DirectoryId": directory_id, "Alias": alias} + + def create_microsoft_ad( + self, + region, + name, + short_name, + password, + description, + vpc_settings, + edition, + tags, + ): # pylint: disable=too-many-arguments + """Create a fake Microsoft Ad Directory.""" + if len(self.directories) > Directory.CLOUDONLY_MICROSOFT_AD_LIMIT: + raise DirectoryLimitExceededException( + f"Directory limit exceeded. A maximum of " + f"{Directory.CLOUDONLY_MICROSOFT_AD_LIMIT} directories may be created" + ) + + # boto3 looks for missing vpc_settings for create_microsoft_ad(). + validate_args( + [ + (validate_password, "password", password), + (validate_edition, "edition", edition), + (validate_name, "name", name), + (validate_description, "description", description), + (validate_short_name, "shortName", short_name), + ( + validate_subnet_ids, + "vpcSettings.subnetIds", + vpc_settings["SubnetIds"], + ), + ] + ) + subnets = self._get_subnets(region, vpc_settings) + + errmsg = self.tagger.validate_tags(tags or []) + if errmsg: + raise ValidationException(errmsg) + if len(tags) > Directory.MAX_TAGS_PER_DIRECTORY: + raise DirectoryLimitExceededException("Tag Limit is exceeding") + + directory = Directory( + name, + password, + "MicrosoftAD", + subnets, + vpc_settings=vpc_settings, + short_name=short_name, + description=description, + edition=edition, + ) + self.directories[directory.directory_id] = directory + self.tagger.tag_resource(directory.directory_id, tags or []) + return directory.directory_id + def delete_directory(self, directory_id): """Delete directory with the matching ID.""" self._validate_directory_id(directory_id) @@ -272,6 +413,37 @@ class DirectoryServiceBackend(BaseBackend): self.directories.pop(directory_id) return directory_id + def disable_sso(self, directory_id, username=None, password=None): + """Disable single-sign on for a directory.""" + self._validate_directory_id(directory_id) + validate_args( + [ + (validate_sso_password, "password", password), + (validate_user_name, "userName", username), + ] + ) + directory = self.directories[directory_id] + directory.enable_sso(False) + + def enable_sso(self, directory_id, username=None, password=None): + """Enable single-sign on for a directory.""" + self._validate_directory_id(directory_id) + validate_args( + [ + (validate_sso_password, "password", password), + (validate_user_name, "userName", username), + ] + ) + + directory = self.directories[directory_id] + if directory.alias == directory_id: + raise ClientException( + f"An alias is required before enabling SSO. DomainId={directory_id}" + ) + + directory = self.directories[directory_id] + directory.enable_sso(True) + @paginate(pagination_model=PAGINATION_MODEL) def describe_directories( self, directory_ids=None, next_token=None, limit=0 diff --git a/moto/ds/responses.py b/moto/ds/responses.py index d55d84ef8..34b22aa67 100644 --- a/moto/ds/responses.py +++ b/moto/ds/responses.py @@ -15,6 +15,27 @@ class DirectoryServiceResponse(BaseResponse): """Return backend instance specific for this region.""" return ds_backends[self.region] + def connect_directory(self): + """Create an AD Connector to connect to a self-managed directory.""" + name = self._get_param("Name") + short_name = self._get_param("ShortName") + password = self._get_param("Password") + description = self._get_param("Description") + size = self._get_param("Size") + connect_settings = self._get_param("ConnectSettings") + tags = self._get_param("Tags") + directory_id = self.ds_backend.connect_directory( + region=self.region, + name=name, + short_name=short_name, + password=password, + description=description, + size=size, + connect_settings=connect_settings, + tags=tags, + ) + return json.dumps({"DirectoryId": directory_id}) + def create_directory(self): """Create a Simple AD directory.""" name = self._get_param("Name") @@ -36,6 +57,34 @@ class DirectoryServiceResponse(BaseResponse): ) return json.dumps({"DirectoryId": directory_id}) + def create_alias(self): + """Create an alias and assign the alias to the directory.""" + directory_id = self._get_param("DirectoryId") + alias = self._get_param("Alias") + response = self.ds_backend.create_alias(directory_id, alias) + return json.dumps(response) + + def create_microsoft_ad(self): + """Create a Microsoft AD directory.""" + name = self._get_param("Name") + short_name = self._get_param("ShortName") + password = self._get_param("Password") + description = self._get_param("Description") + vpc_settings = self._get_param("VpcSettings") + edition = self._get_param("Edition") + tags = self._get_param("Tags") + directory_id = self.ds_backend.create_microsoft_ad( + region=self.region, + name=name, + short_name=short_name, + password=password, + description=description, + vpc_settings=vpc_settings, + edition=edition, + tags=tags, + ) + return json.dumps({"DirectoryId": directory_id}) + def delete_directory(self): """Delete a Directory Service directory.""" directory_id_arg = self._get_param("DirectoryId") @@ -59,6 +108,22 @@ class DirectoryServiceResponse(BaseResponse): response["NextToken"] = next_token return json.dumps(response) + def disable_sso(self): + """Disable single-sign on for a directory.""" + directory_id = self._get_param("DirectoryId") + username = self._get_param("UserName") + password = self._get_param("Password") + self.ds_backend.disable_sso(directory_id, username, password) + return "" + + def enable_sso(self): + """Enable single-sign on for a directory.""" + directory_id = self._get_param("DirectoryId") + username = self._get_param("UserName") + password = self._get_param("Password") + self.ds_backend.enable_sso(directory_id, username, password) + return "" + def get_directory_limits(self): """Return directory limit information for the current region.""" limits = self.ds_backend.get_directory_limits() diff --git a/moto/ds/validations.py b/moto/ds/validations.py new file mode 100644 index 000000000..cd530b18e --- /dev/null +++ b/moto/ds/validations.py @@ -0,0 +1,134 @@ +"""DirectoryServiceBackend checks that result in ValidationException. + +Note that ValidationExceptions are accumulative. +""" +import re + +from moto.ds.exceptions import DsValidationException + + +def validate_args(validators): + """Raise exception if any of the validations fails. + + validators is a list of tuples each containing the following: + (validator_function, printable field name, field value) + + The error messages are accumulated before the exception is raised. + """ + err_msgs = [] + for (func, fieldname, value) in validators: + msg = func(value) + if msg: + err_msgs.append((fieldname, value, msg)) + if err_msgs: + raise DsValidationException(err_msgs) + + +def validate_alias(value): + """Raise exception if alias fails to conform to length and constraints.""" + if len(value) > 62: + return "have length less than or equal to 62" + + alias_pattern = r"^(?!D-|d-)([\da-zA-Z]+)([-]*[\da-zA-Z])*$" + if not re.match(alias_pattern, value): + json_pattern = alias_pattern.replace("\\", r"\\") + return fr"satisfy regular expression pattern: {json_pattern}" + return "" + + +def validate_description(value): + """Raise exception if description exceeds length.""" + if value and len(value) > 128: + return "have length less than or equal to 128" + return "" + + +def validate_directory_id(value): + """Raise exception if the directory id is invalid.""" + id_pattern = r"^d-[0-9a-f]{10}$" + if not re.match(id_pattern, value): + return fr"satisfy regular expression pattern: {id_pattern}" + return "" + + +def validate_dns_ips(value): + """Raise exception if DNS IPs fail to match constraints.""" + dnsip_pattern = ( + r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}" + r"(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$" + ) + for dnsip in value: + if not re.match(dnsip_pattern, dnsip): + json_pattern = dnsip_pattern.replace("\\", r"\\") + return fr"satisfy regular expression pattern: {json_pattern}" + return "" + + +def validate_edition(value): + """Raise exception if edition not one of the allowed values.""" + if value and value not in ["Enterprise", "Standard"]: + return "satisfy enum value set: [Enterprise, Standard]" + return "" + + +def validate_name(value): + """Raise exception if name fails to match constraints.""" + name_pattern = r"^([a-zA-Z0-9]+[\\.-])+([a-zA-Z0-9])+$" + if not re.match(name_pattern, value): + return fr"satisfy regular expression pattern: {name_pattern}" + return "" + + +def validate_password(value): + """Raise exception if password fails to match constraints.""" + passwd_pattern = ( + r"^(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|" + r"(?=.*\d)(?=.*[^A-Za-z0-9\s])(?=.*[a-z])|" + r"(?=.*[^A-Za-z0-9\s])(?=.*[A-Z])(?=.*[a-z])|" + r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*$" + ) + if not re.match(passwd_pattern, value): + # Can't have an odd number of backslashes in a literal. + json_pattern = passwd_pattern.replace("\\", r"\\") + return fr"satisfy regular expression pattern: {json_pattern}" + return "" + + +def validate_short_name(value): + """Raise exception if short name fails to match constraints.""" + short_name_pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$' + if value and not re.match(short_name_pattern, value): + json_pattern = short_name_pattern.replace("\\", r"\\").replace('"', r"\"") + return fr"satisfy regular expression pattern: {json_pattern}" + return "" + + +def validate_size(value): + """Raise exception if size fails to match constraints.""" + if value.lower() not in ["small", "large"]: + return "satisfy enum value set: [Small, Large]" + return "" + + +def validate_sso_password(value): + """Raise exception is SSO password exceeds length.""" + if value and len(value) > 128: + return "have length less than or equal to 128" + return "" + + +def validate_subnet_ids(value): + """Raise exception is subnet IDs fail to match constraints.""" + subnet_id_pattern = r"^(subnet-[0-9a-f]{8}|subnet-[0-9a-f]{17})$" + for subnet in value: + if not re.match(subnet_id_pattern, subnet): + return fr"satisfy regular expression pattern: {subnet_id_pattern}" + return "" + + +def validate_user_name(value): + """Raise exception is username fails to match constraints.""" + username_pattern = r"^[a-zA-Z0-9._-]+$" + if value and not re.match(username_pattern, value): + return fr"satisfy regular expression pattern: {username_pattern}" + return "" diff --git a/tests/test_ds/test_ds.py b/tests/test_ds/test_ds.py index 0e937eb67..09613523a 100644 --- a/tests/test_ds/test_ds.py +++ b/tests/test_ds/test_ds.py @@ -79,6 +79,8 @@ def test_ds_get_directory_limits(): == limits["CloudOnlyDirectoriesCurrentCount"] ) assert limits["CloudOnlyDirectoriesLimitReached"] + assert not limits["CloudOnlyMicrosoftADCurrentCount"] + assert not limits["ConnectedDirectoriesCurrentCount"] @mock_ec2 @@ -114,6 +116,7 @@ def test_ds_describe_directories(): assert dir_info["Type"] == "SimpleAD" assert dir_info["VpcSettings"]["VpcId"].startswith("vpc-") assert len(dir_info["VpcSettings"]["SubnetIds"]) == 2 + assert set(dir_info["DnsIpAddrs"]) == set(["10.0.1.1", "10.0.0.1"]) assert "NextToken" not in result # Test with a specific directory ID. @@ -150,3 +153,170 @@ def test_ds_describe_directories(): result = client.describe_directories(Limit=1, NextToken=result["NextToken"]) assert len(result["DirectoryDescriptions"]) == 1 assert result["DirectoryDescriptions"][0]["DirectoryId"] == directory_ids[5] + + +@mock_ec2 +@mock_ds +def test_ds_create_alias(): + """Test good and bad invocations of create_alias().""" + client = boto3.client("ds", region_name=TEST_REGION) + ec2_client = boto3.client("ec2", region_name=TEST_REGION) + + # Create a directory we can test against. + directory_id = create_test_directory(client, ec2_client) + + # Bad format. + bad_alias = f"d-{get_random_hex(10)}" + with pytest.raises(ClientError) as exc: + client.create_alias(DirectoryId=directory_id, Alias=bad_alias) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert ( + fr"Value '{bad_alias}' at 'alias' failed to satisfy constraint: " + fr"Member must satisfy regular expression pattern: " + fr"^(?!D-|d-)([\da-zA-Z]+)([-]*[\da-zA-Z])*$" + ) in err["Message"] + + # Too long. + bad_alias = f"d-{get_random_hex(62)}" + with pytest.raises(ClientError) as exc: + client.create_alias(DirectoryId=directory_id, Alias=bad_alias) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert ( + f"Value '{bad_alias}' at 'alias' failed to satisfy constraint: " + f"Member must have length less than or equal to 62" + ) in err["Message"] + + # Just right. + good_alias = f"{get_random_hex(10)}" + result = client.create_alias(DirectoryId=directory_id, Alias=good_alias) + assert result["DirectoryId"] == directory_id + assert result["Alias"] == good_alias + result = client.describe_directories() + directory = result["DirectoryDescriptions"][0] + assert directory["Alias"] == good_alias + assert directory["AccessUrl"] == f"{good_alias}.awsapps.com" + + # Attempt to create another alias for the same directory. + another_good_alias = f"{get_random_hex(10)}" + with pytest.raises(ClientError) as exc: + client.create_alias(DirectoryId=directory_id, Alias=another_good_alias) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert ( + "The directory in the request already has an alias. That alias must " + "be deleted before a new alias can be created." + ) in err["Message"] + + # Create a second directory we can test against. + directory_id2 = create_test_directory(client, ec2_client) + with pytest.raises(ClientError) as exc: + client.create_alias(DirectoryId=directory_id2, Alias=good_alias) + err = exc.value.response["Error"] + assert err["Code"] == "EntityAlreadyExistsException" + assert f"Alias '{good_alias}' already exists." in err["Message"] + + +@mock_ec2 +@mock_ds +def test_ds_enable_sso(): + """Test good and bad invocations of enable_sso().""" + client = boto3.client("ds", region_name=TEST_REGION) + ec2_client = boto3.client("ec2", region_name=TEST_REGION) + + # Create a directory we can test against. + directory_id = create_test_directory(client, ec2_client) + + # Need an alias before setting SSO. + with pytest.raises(ClientError) as exc: + client.enable_sso(DirectoryId=directory_id) + err = exc.value.response["Error"] + assert err["Code"] == "ClientException" + assert ( + f"An alias is required before enabling SSO. DomainId={directory_id}" + ) in err["Message"] + + # Add the alias to continue testing. + client.create_alias(DirectoryId=directory_id, Alias="anything-goes") + + # Password must be less than 128 chars in length. + good_username = "test" + bad_password = f"bad_password{get_random_hex(128)}" + with pytest.raises(ClientError) as exc: + client.enable_sso( + DirectoryId=directory_id, UserName=good_username, Password=bad_password + ) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert ( + "Value at 'password' failed to satisfy constraint: Member must " + "have length less than or equal to 128" + ) in err["Message"] + + # Username has constraints. + bad_username = "@test" + good_password = "password" + with pytest.raises(ClientError) as exc: + client.enable_sso( + DirectoryId=directory_id, UserName=bad_username, Password=good_password + ) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert ( + fr"Value '{bad_username}' at 'userName' failed to satisfy constraint: " + fr"Member must satisfy regular expression pattern: ^[a-zA-Z0-9._-]+$" + ) in err["Message"] + + # Valid execution. + client.enable_sso(DirectoryId=directory_id) + result = client.describe_directories() + directory = result["DirectoryDescriptions"][0] + assert directory["SsoEnabled"] + + +@mock_ec2 +@mock_ds +def test_ds_disable_sso(): + """Test good and bad invocations of disable_sso().""" + client = boto3.client("ds", region_name=TEST_REGION) + ec2_client = boto3.client("ec2", region_name=TEST_REGION) + + # Create a directory we can test against. + directory_id = create_test_directory(client, ec2_client) + + # Password must be less than 128 chars in length. + good_username = "test" + bad_password = f"bad_password{get_random_hex(128)}" + with pytest.raises(ClientError) as exc: + client.disable_sso( + DirectoryId=directory_id, UserName=good_username, Password=bad_password + ) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert ( + "Value at 'password' failed to satisfy constraint: Member must " + "have length less than or equal to 128" + ) in err["Message"] + + # Username has constraints. + bad_username = "@test" + good_password = "password" + with pytest.raises(ClientError) as exc: + client.disable_sso( + DirectoryId=directory_id, UserName=bad_username, Password=good_password + ) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert ( + fr"Value '{bad_username}' at 'userName' failed to satisfy constraint: " + fr"Member must satisfy regular expression pattern: ^[a-zA-Z0-9._-]+$" + ) in err["Message"] + + # Valid execution. First enable SSO, as the default is disabled SSO. + client.create_alias(DirectoryId=directory_id, Alias="anything-goes") + client.enable_sso(DirectoryId=directory_id) + client.disable_sso(DirectoryId=directory_id) + result = client.describe_directories() + directory = result["DirectoryDescriptions"][0] + assert not directory["SsoEnabled"] diff --git a/tests/test_ds/test_ds_ad_connect.py b/tests/test_ds/test_ds_ad_connect.py new file mode 100644 index 000000000..a851d5a80 --- /dev/null +++ b/tests/test_ds/test_ds_ad_connect.py @@ -0,0 +1,297 @@ +"""Directory-related unit tests for AD Connect Directory Services. + +The logic to check the details of VPCs and Subnets is shared between the +"create directory" APIs, so it will not be repeated here. +""" +from datetime import datetime, timezone + +import boto3 +from botocore.exceptions import ClientError +import pytest + +from moto import mock_ds +from moto.core.utils import get_random_hex +from moto.ec2 import mock_ec2 + +from .test_ds_simple_ad_directory import TEST_REGION, create_vpc, create_subnets + + +def create_test_ad_connector( + ds_client, + ec2_client, + vpc_settings=None, + customer_dns_ips=None, + customer_user_name="Admin", + tags=None, +): # pylint: disable=too-many-arguments + """Return ID of a newly created valid directory.""" + if not vpc_settings: + good_vpc_id = create_vpc(ec2_client) + good_subnet_ids = create_subnets(ec2_client, good_vpc_id) + vpc_settings = {"VpcId": good_vpc_id, "SubnetIds": good_subnet_ids} + + if not customer_dns_ips: + customer_dns_ips = ["1.2.3.4", "5.6.7.8"] + + if not tags: + tags = [] + + result = ds_client.connect_directory( + Name=f"test-{get_random_hex(6)}.test", + Password="4ADConnectPassword", + Size="Small", + ConnectSettings={ + "VpcId": vpc_settings["VpcId"], + "SubnetIds": vpc_settings["SubnetIds"], + "CustomerDnsIps": customer_dns_ips, + "CustomerUserName": customer_user_name, + }, + Tags=tags, + ) + return result["DirectoryId"] + + +@mock_ds +def test_ds_connect_directory_validations(): + """Test validation errs that aren't caught by botocore. + + Most of this validation is shared with the Simple AD directory, but + this verifies that it is invoked from connect_directory(). + """ + client = boto3.client("ds", region_name=TEST_REGION) + random_num = get_random_hex(6) + + # Verify ValidationException error messages are accumulated properly. + bad_name = f"bad_name_{random_num}" + bad_password = "bad_password" + bad_size = "foo" + ok_connect_settings = { + "VpcId": f"vpc-{random_num}", + "SubnetIds": [f"subnet-{random_num}01", f"subnet-{random_num}02"], + "CustomerUserName": "foo", + "CustomerDnsIps": ["1.2.3.4"], + } + with pytest.raises(ClientError) as exc: + client.connect_directory( + Name=bad_name, + Password=bad_password, + Size=bad_size, + ConnectSettings=ok_connect_settings, + ) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert "3 validation errors detected" in err["Message"] + assert ( + r"Value at 'password' failed to satisfy constraint: " + r"Member must satisfy regular expression pattern: " + r"^(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|" + r"(?=.*\d)(?=.*[^A-Za-z0-9\s])(?=.*[a-z])|" + r"(?=.*[^A-Za-z0-9\s])(?=.*[A-Z])(?=.*[a-z])|" + r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*$" in err["Message"] + ) + assert ( + f"Value '{bad_size}' at 'size' failed to satisfy constraint: " + f"Member must satisfy enum value set: [Small, Large]" in err["Message"] + ) + assert ( + fr"Value '{bad_name}' at 'name' failed to satisfy constraint: " + fr"Member must satisfy regular expression pattern: " + fr"^([a-zA-Z0-9]+[\.-])+([a-zA-Z0-9])+$" in err["Message"] + ) + + too_long = ( + "Test of directory service 0123456789 0123456789 0123456789 " + "0123456789 0123456789 0123456789 0123456789 0123456789 0123456789 " + "0123456789 0123456789" + ) + short_name = "a:b.c" + with pytest.raises(ClientError) as exc: + client.connect_directory( + Name=f"test{random_num}.test", + Password="TESTfoobar1", + ConnectSettings=ok_connect_settings, + Description=too_long, + ShortName=short_name, + Size="Small", + ) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert "2 validation errors detected" in err["Message"] + assert ( + f"Value '{too_long}' at 'description' failed to satisfy constraint: " + f"Member must have length less than or equal to 128" in err["Message"] + ) + pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$' + assert ( + f"Value '{short_name}' at 'shortName' failed to satisfy constraint: " + f"Member must satisfy regular expression pattern: " + pattern + ) in err["Message"] + + bad_connect_settings = { + "VpcId": f"vpc-{random_num}", + "SubnetIds": ["foo"], + "CustomerUserName": "foo", + "CustomerDnsIps": ["1.2.3.4"], + } + with pytest.raises(ClientError) as exc: + client.connect_directory( + Name=f"test{random_num}.test", + Password="TESTfoobar1", + ConnectSettings=bad_connect_settings, + Size="Small", + ) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert "1 validation error detected" in err["Message"] + assert ( + fr"Value '['{bad_connect_settings['SubnetIds'][0]}']' at " + fr"'connectSettings.vpcSettings.subnetIds' failed to satisfy " + fr"constraint: Member must satisfy regular expression pattern: " + fr"^(subnet-[0-9a-f]{{8}}|subnet-[0-9a-f]{{17}})$" in err["Message"] + ) + + +@mock_ec2 +@mock_ds +def test_ds_connect_directory_good_args(): + """Test creation of AD connect directory using good arguments.""" + client = boto3.client("ds", region_name=TEST_REGION) + ec2_client = boto3.client("ec2", region_name=TEST_REGION) + + # Verify a good call to connect_directory() + directory_id = create_test_ad_connector(client, ec2_client) + assert directory_id.startswith("d-") + + # Verify that too many directories can't be created. + limits = client.get_directory_limits()["DirectoryLimits"] + for _ in range(limits["ConnectedDirectoriesLimit"]): + create_test_ad_connector(client, ec2_client) + with pytest.raises(ClientError) as exc: + create_test_ad_connector(client, ec2_client) + err = exc.value.response["Error"] + assert err["Code"] == "DirectoryLimitExceededException" + assert ( + f"Directory limit exceeded. A maximum of " + f"{limits['ConnectedDirectoriesLimit']} " + f"directories may be created" in err["Message"] + ) + + +@mock_ec2 +@mock_ds +def test_ds_connect_directory_bad_args(): + """Test validation of non-vpc related ConnectionSettings values.""" + client = boto3.client("ds", region_name=TEST_REGION) + ec2_client = boto3.client("ec2", region_name=TEST_REGION) + + # Bad CustomerUserName. + bad_username = "oops$" + with pytest.raises(ClientError) as exc: + create_test_ad_connector(client, ec2_client, customer_user_name=bad_username) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert "1 validation error detected" in err["Message"] + assert ( + fr"Value '{bad_username}' at 'connectSettings.customerUserName' " + fr"failed to satisfy constraint: Member must satisfy regular " + fr"expression pattern: ^[a-zA-Z0-9._-]+$" in err["Message"] + ) + + # Bad CustomerDnsIps. + bad_dns_ip = ["1.2.3.450"] + with pytest.raises(ClientError) as exc: + create_test_ad_connector(client, ec2_client, customer_dns_ips=bad_dns_ip) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert "1 validation error detected" in err["Message"] + assert ( + fr"Value '{bad_dns_ip}' at 'connectSettings.customerDnsIps' " + fr"failed to satisfy constraint: Member must satisfy regular " + fr"expression pattern: ^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.)" + fr"{{3}}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$" in err["Message"] + ) + + +@mock_ec2 +@mock_ds +def test_ds_connect_directory_delete(): + """Test deletion of AD Connector directory.""" + client = boto3.client("ds", region_name=TEST_REGION) + ec2_client = boto3.client("ec2", region_name=TEST_REGION) + + # Delete an existing directory. + directory_id = create_test_ad_connector(client, ec2_client) + result = client.delete_directory(DirectoryId=directory_id) + assert result["DirectoryId"] == directory_id + + +@mock_ec2 +@mock_ds +def test_ds_connect_directory_describe(): + """Test describe_directory() for AD Connector directory.""" + client = boto3.client("ds", region_name=TEST_REGION) + ec2_client = boto3.client("ec2", region_name=TEST_REGION) + + # Test that if no directory IDs are specified, all are returned. + directory_id = create_test_ad_connector(client, ec2_client) + result = client.describe_directories() + directory = result["DirectoryDescriptions"][0] + + assert len(result["DirectoryDescriptions"]) == 1 + assert directory["DesiredNumberOfDomainControllers"] == 0 + assert not directory["SsoEnabled"] + assert directory["DirectoryId"] == directory_id + assert directory["Name"].startswith("test-") + assert directory["Alias"] == directory_id + assert directory["AccessUrl"] == f"{directory_id}.awsapps.com" + assert directory["Stage"] == "Active" + assert directory["LaunchTime"] <= datetime.now(timezone.utc) + assert directory["StageLastUpdatedDateTime"] <= datetime.now(timezone.utc) + assert directory["Type"] == "ADConnector" + assert directory["ConnectSettings"]["VpcId"].startswith("vpc-") + assert len(directory["ConnectSettings"]["SubnetIds"]) == 2 + assert directory["ConnectSettings"]["CustomerUserName"] == "Admin" + assert set(directory["ConnectSettings"]["ConnectIps"]) == set( + ["10.0.0.1", "10.0.1.1"] + ) + assert directory["Size"] == "Small" + assert set(directory["DnsIpAddrs"]) == set(["1.2.3.4", "5.6.7.8"]) + assert "NextToken" not in result + + +@mock_ec2 +@mock_ds +def test_ds_connect_directory_tags(): + """Test that directory tags can be added and retrieved.""" + client = boto3.client("ds", region_name=TEST_REGION) + ec2_client = boto3.client("ec2", region_name=TEST_REGION) + + added_tags = [{"Key": f"{x}", "Value": f"{x}"} for x in range(10)] + directory_id = create_test_ad_connector(client, ec2_client, tags=added_tags) + + result = client.list_tags_for_resource(ResourceId=directory_id) + assert len(result["Tags"]) == 10 + assert result["Tags"] == added_tags + + +@mock_ec2 +@mock_ds +def test_ds_get_connect_directory_limits(): + """Test return value for ad connector directory limits.""" + client = boto3.client("ds", region_name=TEST_REGION) + ec2_client = boto3.client("ec2", region_name=TEST_REGION) + + # Create a bunch of directories and verify the current count has been + # updated. + limits = client.get_directory_limits()["DirectoryLimits"] + for _ in range(limits["ConnectedDirectoriesLimit"]): + create_test_ad_connector(client, ec2_client) + + limits = client.get_directory_limits()["DirectoryLimits"] + assert ( + limits["ConnectedDirectoriesLimit"] + == limits["ConnectedDirectoriesCurrentCount"] + ) + assert limits["ConnectedDirectoriesLimitReached"] + assert not limits["CloudOnlyDirectoriesCurrentCount"] + assert not limits["CloudOnlyMicrosoftADCurrentCount"] diff --git a/tests/test_ds/test_ds_microsoft_ad.py b/tests/test_ds/test_ds_microsoft_ad.py new file mode 100644 index 000000000..e79a0f02e --- /dev/null +++ b/tests/test_ds/test_ds_microsoft_ad.py @@ -0,0 +1,234 @@ +"""Directory-related unit tests for Microsoft AD Directory Services. + +The logic to check the details of VPCs and Subnets is shared between the +"create directory" APIs, so it will not be repeated here. +""" +from datetime import datetime, timezone + +import boto3 +from botocore.exceptions import ClientError +import pytest + +from moto import mock_ds +from moto.core.utils import get_random_hex +from moto.ec2 import mock_ec2 + +from .test_ds_simple_ad_directory import TEST_REGION, create_vpc, create_subnets + + +def create_test_microsoft_ad(ds_client, ec2_client, vpc_settings=None, tags=None): + """Return ID of a newly created valid directory.""" + if not vpc_settings: + good_vpc_id = create_vpc(ec2_client) + good_subnet_ids = create_subnets(ec2_client, good_vpc_id) + vpc_settings = {"VpcId": good_vpc_id, "SubnetIds": good_subnet_ids} + + if not tags: + tags = [] + + result = ds_client.create_microsoft_ad( + Name=f"test-{get_random_hex(6)}.test", + Password="4MicrosoftADPassword", + VpcSettings=vpc_settings, + Tags=tags, + Edition="Standard", + ) + return result["DirectoryId"] + + +@mock_ds +def test_ds_create_microsoft_ad_validations(): + """Test validation errs that aren't caught by botocore. + + Most of this validation is shared with the Simple AD directory, but + this verifies that it is invoked from create_microsoft_ad(). + """ + client = boto3.client("ds", region_name=TEST_REGION) + random_num = get_random_hex(6) + + # Verify ValidationException error messages are accumulated properly. + bad_name = f"bad_name_{random_num}" + bad_password = "bad_password" + bad_edition = "foo" + ok_vpc_settings = { + "VpcId": f"vpc-{random_num}", + "SubnetIds": [f"subnet-{random_num}01", f"subnet-{random_num}02"], + } + with pytest.raises(ClientError) as exc: + client.create_microsoft_ad( + Name=bad_name, + Password=bad_password, + Edition=bad_edition, + VpcSettings=ok_vpc_settings, + ) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert "3 validation errors detected" in err["Message"] + assert ( + r"Value at 'password' failed to satisfy constraint: " + r"Member must satisfy regular expression pattern: " + r"^(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|" + r"(?=.*\d)(?=.*[^A-Za-z0-9\s])(?=.*[a-z])|" + r"(?=.*[^A-Za-z0-9\s])(?=.*[A-Z])(?=.*[a-z])|" + r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*$" in err["Message"] + ) + assert ( + f"Value '{bad_edition}' at 'edition' failed to satisfy constraint: " + f"Member must satisfy enum value set: [Enterprise, Standard]" in err["Message"] + ) + assert ( + fr"Value '{bad_name}' at 'name' failed to satisfy constraint: " + fr"Member must satisfy regular expression pattern: " + fr"^([a-zA-Z0-9]+[\.-])+([a-zA-Z0-9])+$" in err["Message"] + ) + + too_long = ( + "Test of directory service 0123456789 0123456789 0123456789 " + "0123456789 0123456789 0123456789 0123456789 0123456789 0123456789 " + "0123456789 0123456789" + ) + short_name = "a:b.c" + with pytest.raises(ClientError) as exc: + client.create_microsoft_ad( + Name=f"test{random_num}.test", + Password="TESTfoobar1", + VpcSettings=ok_vpc_settings, + Description=too_long, + ShortName=short_name, + ) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert "2 validation errors detected" in err["Message"] + assert ( + f"Value '{too_long}' at 'description' failed to satisfy constraint: " + f"Member must have length less than or equal to 128" in err["Message"] + ) + pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$' + assert ( + f"Value '{short_name}' at 'shortName' failed to satisfy constraint: " + f"Member must satisfy regular expression pattern: " + pattern + ) in err["Message"] + + bad_vpc_settings = {"VpcId": f"vpc-{random_num}", "SubnetIds": ["foo"]} + with pytest.raises(ClientError) as exc: + client.create_microsoft_ad( + Name=f"test{random_num}.test", + Password="TESTfoobar1", + VpcSettings=bad_vpc_settings, + ) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + assert "1 validation error detected" in err["Message"] + assert ( + fr"Value '['{bad_vpc_settings['SubnetIds'][0]}']' at " + fr"'vpcSettings.subnetIds' failed to satisfy constraint: " + fr"Member must satisfy regular expression pattern: " + fr"^(subnet-[0-9a-f]{{8}}|subnet-[0-9a-f]{{17}})$" in err["Message"] + ) + + +@mock_ec2 +@mock_ds +def test_ds_create_microsoft_ad_good_args(): + """Test creation of Microsoft AD directory using good arguments.""" + client = boto3.client("ds", region_name=TEST_REGION) + ec2_client = boto3.client("ec2", region_name=TEST_REGION) + + # Verify a good call to create_microsoft_ad() + directory_id = create_test_microsoft_ad(client, ec2_client) + assert directory_id.startswith("d-") + + # Verify that too many directories can't be created. + limits = client.get_directory_limits()["DirectoryLimits"] + for _ in range(limits["CloudOnlyMicrosoftADLimit"]): + create_test_microsoft_ad(client, ec2_client) + with pytest.raises(ClientError) as exc: + create_test_microsoft_ad(client, ec2_client) + err = exc.value.response["Error"] + assert err["Code"] == "DirectoryLimitExceededException" + assert ( + f"Directory limit exceeded. A maximum of " + f"{limits['CloudOnlyMicrosoftADLimit']} " + f"directories may be created" in err["Message"] + ) + + +@mock_ec2 +@mock_ds +def test_ds_create_microsoft_ad_delete(): + """Test deletion of Microsoft AD directory.""" + client = boto3.client("ds", region_name=TEST_REGION) + ec2_client = boto3.client("ec2", region_name=TEST_REGION) + + # Delete an existing directory. + directory_id = create_test_microsoft_ad(client, ec2_client) + result = client.delete_directory(DirectoryId=directory_id) + assert result["DirectoryId"] == directory_id + + +@mock_ec2 +@mock_ds +def test_ds_create_microsoft_ad_describe(): + """Test describe_directory() for Microsoft AD directory.""" + client = boto3.client("ds", region_name=TEST_REGION) + ec2_client = boto3.client("ec2", region_name=TEST_REGION) + + # Test that if no directory IDs are specified, all are returned. + directory_id = create_test_microsoft_ad(client, ec2_client) + result = client.describe_directories() + directory = result["DirectoryDescriptions"][0] + + assert len(result["DirectoryDescriptions"]) == 1 + assert directory["DesiredNumberOfDomainControllers"] == 0 + assert not directory["SsoEnabled"] + assert directory["DirectoryId"] == directory_id + assert directory["Name"].startswith("test-") + assert directory["Alias"] == directory_id + assert directory["AccessUrl"] == f"{directory_id}.awsapps.com" + assert directory["Stage"] == "Active" + assert directory["LaunchTime"] <= datetime.now(timezone.utc) + assert directory["StageLastUpdatedDateTime"] <= datetime.now(timezone.utc) + assert directory["Type"] == "MicrosoftAD" + assert directory["VpcSettings"]["VpcId"].startswith("vpc-") + assert len(directory["VpcSettings"]["SubnetIds"]) == 2 + assert directory["Edition"] == "Standard" + assert set(directory["DnsIpAddrs"]) == set(["10.0.1.1", "10.0.0.1"]) + assert "NextToken" not in result + + +@mock_ec2 +@mock_ds +def test_ds_create_microsoft_ad_tags(): + """Test that AD directory tags can be added and retrieved.""" + client = boto3.client("ds", region_name=TEST_REGION) + ec2_client = boto3.client("ec2", region_name=TEST_REGION) + + added_tags = [{"Key": f"{x}", "Value": f"{x}"} for x in range(10)] + directory_id = create_test_microsoft_ad(client, ec2_client, tags=added_tags) + + result = client.list_tags_for_resource(ResourceId=directory_id) + assert len(result["Tags"]) == 10 + assert result["Tags"] == added_tags + + +@mock_ec2 +@mock_ds +def test_ds_get_microsoft_ad_directory_limits(): + """Test return value for directory limits.""" + client = boto3.client("ds", region_name=TEST_REGION) + ec2_client = boto3.client("ec2", region_name=TEST_REGION) + + # Create a bunch of directories and verify the current count has been + # updated. + limits = client.get_directory_limits()["DirectoryLimits"] + for _ in range(limits["CloudOnlyMicrosoftADLimit"]): + create_test_microsoft_ad(client, ec2_client) + + limits = client.get_directory_limits()["DirectoryLimits"] + assert ( + limits["CloudOnlyMicrosoftADLimit"] + == limits["CloudOnlyMicrosoftADCurrentCount"] + ) + assert limits["CloudOnlyMicrosoftADLimitReached"] + assert not limits["ConnectedDirectoriesLimitReached"] + assert not limits["CloudOnlyDirectoriesCurrentCount"] diff --git a/tests/test_ds/test_ds_simple_ad_directory.py b/tests/test_ds/test_ds_simple_ad_directory.py index 6428e4d1c..759cdab60 100644 --- a/tests/test_ds/test_ds_simple_ad_directory.py +++ b/tests/test_ds/test_ds_simple_ad_directory.py @@ -77,10 +77,10 @@ def test_ds_create_directory_validations(): assert ( r"Value at 'password' failed to satisfy constraint: " r"Member must satisfy regular expression pattern: " - r"(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|" + r"^(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|" r"(?=.*\d)(?=.*[^A-Za-z0-9\s])(?=.*[a-z])|" r"(?=.*[^A-Za-z0-9\s])(?=.*[A-Z])(?=.*[a-z])|" - r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*;" in err["Message"] + r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*$" in err["Message"] ) assert ( f"Value '{bad_size}' at 'size' failed to satisfy constraint: " @@ -132,7 +132,7 @@ def test_ds_create_directory_validations(): assert err["Code"] == "ValidationException" assert "1 validation error detected" in err["Message"] assert ( - fr"Value '{bad_vpc_settings['SubnetIds'][0]}' at " + fr"Value '['{bad_vpc_settings['SubnetIds'][0]}']' at " fr"'vpcSettings.subnetIds' failed to satisfy constraint: " fr"Member must satisfy regular expression pattern: " fr"^(subnet-[0-9a-f]{{8}}|subnet-[0-9a-f]{{17}})$" in err["Message"]