DirectoryService: Add support for other directories (#4507)

This commit is contained in:
kbalk 2021-11-05 09:15:57 -04:00 committed by GitHub
parent ab531aed9b
commit 7b375195bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1189 additions and 108 deletions

View File

@ -916,20 +916,20 @@
## ds
<details>
<summary>11% implemented</summary>
<summary>19% implemented</summary>
- [ ] accept_shared_directory
- [ ] add_ip_routes
- [ ] add_region
- [X] add_tags_to_resource
- [ ] cancel_schema_extension
- [ ] connect_directory
- [ ] create_alias
- [X] connect_directory
- [X] create_alias
- [ ] create_computer
- [ ] create_conditional_forwarder
- [X] create_directory
- [ ] create_log_subscription
- [ ] create_microsoft_ad
- [X] create_microsoft_ad
- [ ] create_snapshot
- [ ] create_trust
- [ ] delete_conditional_forwarder
@ -953,11 +953,11 @@
- [ ] disable_client_authentication
- [ ] disable_ldaps
- [ ] disable_radius
- [ ] disable_sso
- [X] disable_sso
- [ ] enable_client_authentication
- [ ] enable_ldaps
- [ ] enable_radius
- [ ] enable_sso
- [X] enable_sso
- [X] get_directory_limits
- [ ] get_snapshot_limits
- [ ] list_certificates
@ -4768,4 +4768,4 @@
- workmailmessageflow
- workspaces
- xray
</details>
</details>

View File

@ -57,6 +57,15 @@ class EntityDoesNotExistException(JsonRESTError):
super().__init__("EntityDoesNotExistException", message)
class EntityAlreadyExistsException(JsonRESTError):
"""The specified entity already exists."""
code = 400
def __init__(self, message):
super().__init__("EntityAlreadyExistsException", message)
class InvalidNextTokenException(JsonRESTError):
"""Invalid next token parameter used to return a list of entities."""

View File

@ -1,6 +1,6 @@
"""DirectoryServiceBackend class with methods for supported APIs."""
from datetime import datetime, timezone
import re
import ipaddress
from boto3 import Session
@ -9,20 +9,49 @@ from moto.core.utils import get_random_hex
from moto.ds.exceptions import (
ClientException,
DirectoryLimitExceededException,
EntityAlreadyExistsException,
EntityDoesNotExistException,
DsValidationException,
InvalidParameterException,
TagLimitExceededException,
ValidationException,
)
from moto.ds.utils import PAGINATION_MODEL
from moto.ds.validations import (
validate_args,
validate_alias,
validate_description,
validate_directory_id,
validate_dns_ips,
validate_edition,
validate_name,
validate_password,
validate_short_name,
validate_size,
validate_sso_password,
validate_subnet_ids,
validate_user_name,
)
from moto.ec2.exceptions import InvalidSubnetIdError
from moto.utilities.paginator import paginate
from moto.utilities.tagging_service import TaggingService
from .utils import PAGINATION_MODEL
class Directory(BaseModel): # pylint: disable=too-many-instance-attributes
"""Representation of a Simple AD Directory."""
"""Representation of a Simple AD Directory.
When the "create" API for a Simple AD or a Microsoft AD directory is
invoked, two domain controllers and a DNS server are supposed to be
created. That is NOT done for the fake directories.
However, the DnsIpAddrs attribute is supposed to contain the IP addresses
of the DNS servers. For a AD Connecter, the DnsIpAddrs are provided when
the directory is created, but the ConnectSettings.ConnectIps values should
contain the IP addresses of the DNS servers or domain controllers in the
directory to which the AD connector is connected.
Instead, the dns_ip_addrs attribute or ConnectIPs attribute for the fake
directories will contain IPs picked from the subnets' CIDR blocks.
"""
# The assumption here is that the limits are the same for all regions.
CLOUDONLY_DIRECTORIES_LIMIT = 10
@ -35,19 +64,24 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes
self,
name,
password,
size,
vpc_settings,
directory_type,
subnets,
size=None,
vpc_settings=None,
connect_settings=None,
short_name=None,
description=None,
edition=None,
): # pylint: disable=too-many-arguments
self.name = name
self.password = password
self.directory_type = directory_type
self.size = size
self.vpc_settings = vpc_settings
self.directory_type = directory_type
self.connect_settings = connect_settings
self.short_name = short_name
self.description = description
self.edition = edition
# Calculated or default values for the directory attributes.
self.directory_id = f"d-{get_random_hex(10)}"
@ -59,6 +93,36 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes
self.launch_time = datetime.now(timezone.utc).isoformat()
self.stage_last_updated_date_time = datetime.now(timezone.utc).isoformat()
if directory_type != "ADConnector":
self.dns_ip_addrs = self.subnet_ips(subnets)
else:
self.dns_ip_addrs = self.connect_settings["CustomerDnsIps"]
self.connect_settings["ConnectIps"] = self.subnet_ips(subnets)
@staticmethod
def subnet_ips(subnets):
"""Return an IP from each of the given subnets.
This is a bit dodgey and may need to be reworked at a later time.
"""
ip_addrs = []
for subnet in subnets:
ips = ipaddress.IPv4Network(subnet.cidr_block)
# Not sure if the following could occur, but if it does,
# the situation will be ignored.
if ips:
ip_addrs.append(str(ips[1]) if ips.num_addresses > 1 else str(ips[0]))
return ip_addrs
def update_alias(self, alias):
"""Change default alias to given alias."""
self.alias = alias
self.access_url = f"{alias}.awsapps.com"
def enable_sso(self, new_state):
"""Enable/disable sso based on whether new_state is True or False."""
self.sso_enabled = new_state
def to_json(self):
"""Convert the attributes into json with CamelCase tags."""
replacement_keys = {"directory_type": "Type"}
@ -73,9 +137,11 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes
if item in replacement_keys:
json_result[replacement_keys[item]] = value
else:
parts = item.split("_")
new_tag = "".join(x.title() for x in parts)
new_tag = "".join(x.title() for x in item.split("_"))
json_result[new_tag] = value
if json_result["ConnectSettings"]:
json_result["ConnectSettings"]["CustomerDnsIps"] = None
return json_result
@ -101,75 +167,8 @@ class DirectoryServiceBackend(BaseBackend):
)
@staticmethod
def _validate_create_directory_args(
name, passwd, size, vpc_settings, description, short_name,
): # pylint: disable=too-many-arguments
"""Raise exception if create_directory() args don't meet constraints.
The error messages are accumulated before the exception is raised.
"""
error_tuples = []
passwd_pattern = (
r"(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[^A-Za-z0-9\s])(?=.*[a-z])|"
r"(?=.*[^A-Za-z0-9\s])(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*"
)
if not re.match(passwd_pattern, passwd):
# Can't have an odd number of backslashes in a literal.
json_pattern = passwd_pattern.replace("\\", r"\\")
error_tuples.append(
(
"password",
passwd,
fr"satisfy regular expression pattern: {json_pattern}",
)
)
if size.lower() not in ["small", "large"]:
error_tuples.append(
("size", size, "satisfy enum value set: [Small, Large]")
)
name_pattern = r"^([a-zA-Z0-9]+[\\.-])+([a-zA-Z0-9])+$"
if not re.match(name_pattern, name):
error_tuples.append(
("name", name, fr"satisfy regular expression pattern: {name_pattern}")
)
subnet_id_pattern = r"^(subnet-[0-9a-f]{8}|subnet-[0-9a-f]{17})$"
for subnet in vpc_settings["SubnetIds"]:
if not re.match(subnet_id_pattern, subnet):
error_tuples.append(
(
"vpcSettings.subnetIds",
subnet,
fr"satisfy regular expression pattern: {subnet_id_pattern}",
)
)
if description and len(description) > 128:
error_tuples.append(
("description", description, "have length less than or equal to 128")
)
short_name_pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$'
if short_name and not re.match(short_name_pattern, short_name):
json_pattern = short_name_pattern.replace("\\", r"\\").replace('"', r"\"")
error_tuples.append(
(
"shortName",
short_name,
fr"satisfy regular expression pattern: {json_pattern}",
)
)
if error_tuples:
raise DsValidationException(error_tuples)
@staticmethod
def _validate_vpc_setting_values(region, vpc_settings):
"""Raise exception if vpc_settings are invalid.
def _get_subnets(region, vpc_settings):
"""Return subnets if vpc_settings are invalid, else raise an exception.
If settings are valid, add AvailabilityZones to vpc_settings.
"""
@ -205,6 +204,72 @@ class DirectoryServiceBackend(BaseBackend):
raise ClientException("Invalid VPC ID.")
vpc_settings["AvailabilityZones"] = regions
return subnets
def connect_directory(
self,
region,
name,
short_name,
password,
description,
size,
connect_settings,
tags,
): # pylint: disable=too-many-arguments
"""Create a fake AD Connector."""
if len(self.directories) > Directory.CONNECTED_DIRECTORIES_LIMIT:
raise DirectoryLimitExceededException(
f"Directory limit exceeded. A maximum of "
f"{Directory.CONNECTED_DIRECTORIES_LIMIT} directories may be created"
)
validate_args(
[
(validate_password, "password", password),
(validate_size, "size", size),
(validate_name, "name", name),
(validate_description, "description", description),
(validate_short_name, "shortName", short_name),
(
validate_subnet_ids,
"connectSettings.vpcSettings.subnetIds",
connect_settings["SubnetIds"],
),
(
validate_user_name,
"connectSettings.customerUserName",
connect_settings["CustomerUserName"],
),
(
validate_dns_ips,
"connectSettings.customerDnsIps",
connect_settings["CustomerDnsIps"],
),
]
)
# ConnectSettings and VpcSettings both have a VpcId and Subnets.
subnets = self._get_subnets(region, connect_settings)
errmsg = self.tagger.validate_tags(tags or [])
if errmsg:
raise ValidationException(errmsg)
if len(tags) > Directory.MAX_TAGS_PER_DIRECTORY:
raise DirectoryLimitExceededException("Tag Limit is exceeding")
directory = Directory(
name,
password,
"ADConnector",
subnets,
size=size,
connect_settings=connect_settings,
short_name=short_name,
description=description,
)
self.directories[directory.directory_id] = directory
self.tagger.tag_resource(directory.directory_id, tags or [])
return directory.directory_id
def create_directory(
self, region, name, short_name, password, description, size, vpc_settings, tags
@ -219,25 +284,35 @@ class DirectoryServiceBackend(BaseBackend):
# botocore doesn't look for missing vpc_settings, but boto3 does.
if not vpc_settings:
raise InvalidParameterException("VpcSettings must be specified.")
self._validate_create_directory_args(
name, password, size, vpc_settings, description, short_name,
validate_args(
[
(validate_password, "password", password),
(validate_size, "size", size),
(validate_name, "name", name),
(validate_description, "description", description),
(validate_short_name, "shortName", short_name),
(
validate_subnet_ids,
"vpcSettings.subnetIds",
vpc_settings["SubnetIds"],
),
]
)
self._validate_vpc_setting_values(region, vpc_settings)
subnets = self._get_subnets(region, vpc_settings)
errmsg = self.tagger.validate_tags(tags or [])
if errmsg:
raise ValidationException(errmsg)
if len(tags) > Directory.MAX_TAGS_PER_DIRECTORY:
raise DirectoryLimitExceededException("Tag Limit is exceeding")
directory = Directory(
name,
password,
size,
vpc_settings,
directory_type="SimpleAD",
"SimpleAD",
subnets,
size=size,
vpc_settings=vpc_settings,
short_name=short_name,
description=description,
)
@ -248,23 +323,89 @@ class DirectoryServiceBackend(BaseBackend):
def _validate_directory_id(self, directory_id):
"""Raise an exception if the directory id is invalid or unknown."""
# Validation of ID takes precedence over a check for its existence.
id_pattern = r"^d-[0-9a-f]{10}$"
if not re.match(id_pattern, directory_id):
raise DsValidationException(
[
(
"directoryId",
directory_id,
fr"satisfy regular expression pattern: {id_pattern}",
)
]
)
validate_args([(validate_directory_id, "directoryId", directory_id)])
if directory_id not in self.directories:
raise EntityDoesNotExistException(
f"Directory {directory_id} does not exist"
)
def create_alias(self, directory_id, alias):
"""Create and assign an alias to a directory."""
self._validate_directory_id(directory_id)
# The default alias name is the same as the directory name. Check
# whether this directory was already given an alias.
directory = self.directories[directory_id]
if directory.alias != directory_id:
raise InvalidParameterException(
"The directory in the request already has an alias. That "
"alias must be deleted before a new alias can be created."
)
# Is the alias already in use?
if alias in [x.alias for x in self.directories.values()]:
raise EntityAlreadyExistsException(f"Alias '{alias}' already exists.")
validate_args([(validate_alias, "alias", alias)])
directory.update_alias(alias)
return {"DirectoryId": directory_id, "Alias": alias}
def create_microsoft_ad(
self,
region,
name,
short_name,
password,
description,
vpc_settings,
edition,
tags,
): # pylint: disable=too-many-arguments
"""Create a fake Microsoft Ad Directory."""
if len(self.directories) > Directory.CLOUDONLY_MICROSOFT_AD_LIMIT:
raise DirectoryLimitExceededException(
f"Directory limit exceeded. A maximum of "
f"{Directory.CLOUDONLY_MICROSOFT_AD_LIMIT} directories may be created"
)
# boto3 looks for missing vpc_settings for create_microsoft_ad().
validate_args(
[
(validate_password, "password", password),
(validate_edition, "edition", edition),
(validate_name, "name", name),
(validate_description, "description", description),
(validate_short_name, "shortName", short_name),
(
validate_subnet_ids,
"vpcSettings.subnetIds",
vpc_settings["SubnetIds"],
),
]
)
subnets = self._get_subnets(region, vpc_settings)
errmsg = self.tagger.validate_tags(tags or [])
if errmsg:
raise ValidationException(errmsg)
if len(tags) > Directory.MAX_TAGS_PER_DIRECTORY:
raise DirectoryLimitExceededException("Tag Limit is exceeding")
directory = Directory(
name,
password,
"MicrosoftAD",
subnets,
vpc_settings=vpc_settings,
short_name=short_name,
description=description,
edition=edition,
)
self.directories[directory.directory_id] = directory
self.tagger.tag_resource(directory.directory_id, tags or [])
return directory.directory_id
def delete_directory(self, directory_id):
"""Delete directory with the matching ID."""
self._validate_directory_id(directory_id)
@ -272,6 +413,37 @@ class DirectoryServiceBackend(BaseBackend):
self.directories.pop(directory_id)
return directory_id
def disable_sso(self, directory_id, username=None, password=None):
"""Disable single-sign on for a directory."""
self._validate_directory_id(directory_id)
validate_args(
[
(validate_sso_password, "password", password),
(validate_user_name, "userName", username),
]
)
directory = self.directories[directory_id]
directory.enable_sso(False)
def enable_sso(self, directory_id, username=None, password=None):
"""Enable single-sign on for a directory."""
self._validate_directory_id(directory_id)
validate_args(
[
(validate_sso_password, "password", password),
(validate_user_name, "userName", username),
]
)
directory = self.directories[directory_id]
if directory.alias == directory_id:
raise ClientException(
f"An alias is required before enabling SSO. DomainId={directory_id}"
)
directory = self.directories[directory_id]
directory.enable_sso(True)
@paginate(pagination_model=PAGINATION_MODEL)
def describe_directories(
self, directory_ids=None, next_token=None, limit=0

View File

@ -15,6 +15,27 @@ class DirectoryServiceResponse(BaseResponse):
"""Return backend instance specific for this region."""
return ds_backends[self.region]
def connect_directory(self):
"""Create an AD Connector to connect to a self-managed directory."""
name = self._get_param("Name")
short_name = self._get_param("ShortName")
password = self._get_param("Password")
description = self._get_param("Description")
size = self._get_param("Size")
connect_settings = self._get_param("ConnectSettings")
tags = self._get_param("Tags")
directory_id = self.ds_backend.connect_directory(
region=self.region,
name=name,
short_name=short_name,
password=password,
description=description,
size=size,
connect_settings=connect_settings,
tags=tags,
)
return json.dumps({"DirectoryId": directory_id})
def create_directory(self):
"""Create a Simple AD directory."""
name = self._get_param("Name")
@ -36,6 +57,34 @@ class DirectoryServiceResponse(BaseResponse):
)
return json.dumps({"DirectoryId": directory_id})
def create_alias(self):
"""Create an alias and assign the alias to the directory."""
directory_id = self._get_param("DirectoryId")
alias = self._get_param("Alias")
response = self.ds_backend.create_alias(directory_id, alias)
return json.dumps(response)
def create_microsoft_ad(self):
"""Create a Microsoft AD directory."""
name = self._get_param("Name")
short_name = self._get_param("ShortName")
password = self._get_param("Password")
description = self._get_param("Description")
vpc_settings = self._get_param("VpcSettings")
edition = self._get_param("Edition")
tags = self._get_param("Tags")
directory_id = self.ds_backend.create_microsoft_ad(
region=self.region,
name=name,
short_name=short_name,
password=password,
description=description,
vpc_settings=vpc_settings,
edition=edition,
tags=tags,
)
return json.dumps({"DirectoryId": directory_id})
def delete_directory(self):
"""Delete a Directory Service directory."""
directory_id_arg = self._get_param("DirectoryId")
@ -59,6 +108,22 @@ class DirectoryServiceResponse(BaseResponse):
response["NextToken"] = next_token
return json.dumps(response)
def disable_sso(self):
"""Disable single-sign on for a directory."""
directory_id = self._get_param("DirectoryId")
username = self._get_param("UserName")
password = self._get_param("Password")
self.ds_backend.disable_sso(directory_id, username, password)
return ""
def enable_sso(self):
"""Enable single-sign on for a directory."""
directory_id = self._get_param("DirectoryId")
username = self._get_param("UserName")
password = self._get_param("Password")
self.ds_backend.enable_sso(directory_id, username, password)
return ""
def get_directory_limits(self):
"""Return directory limit information for the current region."""
limits = self.ds_backend.get_directory_limits()

134
moto/ds/validations.py Normal file
View File

@ -0,0 +1,134 @@
"""DirectoryServiceBackend checks that result in ValidationException.
Note that ValidationExceptions are accumulative.
"""
import re
from moto.ds.exceptions import DsValidationException
def validate_args(validators):
"""Raise exception if any of the validations fails.
validators is a list of tuples each containing the following:
(validator_function, printable field name, field value)
The error messages are accumulated before the exception is raised.
"""
err_msgs = []
for (func, fieldname, value) in validators:
msg = func(value)
if msg:
err_msgs.append((fieldname, value, msg))
if err_msgs:
raise DsValidationException(err_msgs)
def validate_alias(value):
"""Raise exception if alias fails to conform to length and constraints."""
if len(value) > 62:
return "have length less than or equal to 62"
alias_pattern = r"^(?!D-|d-)([\da-zA-Z]+)([-]*[\da-zA-Z])*$"
if not re.match(alias_pattern, value):
json_pattern = alias_pattern.replace("\\", r"\\")
return fr"satisfy regular expression pattern: {json_pattern}"
return ""
def validate_description(value):
"""Raise exception if description exceeds length."""
if value and len(value) > 128:
return "have length less than or equal to 128"
return ""
def validate_directory_id(value):
"""Raise exception if the directory id is invalid."""
id_pattern = r"^d-[0-9a-f]{10}$"
if not re.match(id_pattern, value):
return fr"satisfy regular expression pattern: {id_pattern}"
return ""
def validate_dns_ips(value):
"""Raise exception if DNS IPs fail to match constraints."""
dnsip_pattern = (
r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}"
r"(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
)
for dnsip in value:
if not re.match(dnsip_pattern, dnsip):
json_pattern = dnsip_pattern.replace("\\", r"\\")
return fr"satisfy regular expression pattern: {json_pattern}"
return ""
def validate_edition(value):
"""Raise exception if edition not one of the allowed values."""
if value and value not in ["Enterprise", "Standard"]:
return "satisfy enum value set: [Enterprise, Standard]"
return ""
def validate_name(value):
"""Raise exception if name fails to match constraints."""
name_pattern = r"^([a-zA-Z0-9]+[\\.-])+([a-zA-Z0-9])+$"
if not re.match(name_pattern, value):
return fr"satisfy regular expression pattern: {name_pattern}"
return ""
def validate_password(value):
"""Raise exception if password fails to match constraints."""
passwd_pattern = (
r"^(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[^A-Za-z0-9\s])(?=.*[a-z])|"
r"(?=.*[^A-Za-z0-9\s])(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*$"
)
if not re.match(passwd_pattern, value):
# Can't have an odd number of backslashes in a literal.
json_pattern = passwd_pattern.replace("\\", r"\\")
return fr"satisfy regular expression pattern: {json_pattern}"
return ""
def validate_short_name(value):
"""Raise exception if short name fails to match constraints."""
short_name_pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$'
if value and not re.match(short_name_pattern, value):
json_pattern = short_name_pattern.replace("\\", r"\\").replace('"', r"\"")
return fr"satisfy regular expression pattern: {json_pattern}"
return ""
def validate_size(value):
"""Raise exception if size fails to match constraints."""
if value.lower() not in ["small", "large"]:
return "satisfy enum value set: [Small, Large]"
return ""
def validate_sso_password(value):
"""Raise exception is SSO password exceeds length."""
if value and len(value) > 128:
return "have length less than or equal to 128"
return ""
def validate_subnet_ids(value):
"""Raise exception is subnet IDs fail to match constraints."""
subnet_id_pattern = r"^(subnet-[0-9a-f]{8}|subnet-[0-9a-f]{17})$"
for subnet in value:
if not re.match(subnet_id_pattern, subnet):
return fr"satisfy regular expression pattern: {subnet_id_pattern}"
return ""
def validate_user_name(value):
"""Raise exception is username fails to match constraints."""
username_pattern = r"^[a-zA-Z0-9._-]+$"
if value and not re.match(username_pattern, value):
return fr"satisfy regular expression pattern: {username_pattern}"
return ""

View File

@ -79,6 +79,8 @@ def test_ds_get_directory_limits():
== limits["CloudOnlyDirectoriesCurrentCount"]
)
assert limits["CloudOnlyDirectoriesLimitReached"]
assert not limits["CloudOnlyMicrosoftADCurrentCount"]
assert not limits["ConnectedDirectoriesCurrentCount"]
@mock_ec2
@ -114,6 +116,7 @@ def test_ds_describe_directories():
assert dir_info["Type"] == "SimpleAD"
assert dir_info["VpcSettings"]["VpcId"].startswith("vpc-")
assert len(dir_info["VpcSettings"]["SubnetIds"]) == 2
assert set(dir_info["DnsIpAddrs"]) == set(["10.0.1.1", "10.0.0.1"])
assert "NextToken" not in result
# Test with a specific directory ID.
@ -150,3 +153,170 @@ def test_ds_describe_directories():
result = client.describe_directories(Limit=1, NextToken=result["NextToken"])
assert len(result["DirectoryDescriptions"]) == 1
assert result["DirectoryDescriptions"][0]["DirectoryId"] == directory_ids[5]
@mock_ec2
@mock_ds
def test_ds_create_alias():
"""Test good and bad invocations of create_alias()."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Create a directory we can test against.
directory_id = create_test_directory(client, ec2_client)
# Bad format.
bad_alias = f"d-{get_random_hex(10)}"
with pytest.raises(ClientError) as exc:
client.create_alias(DirectoryId=directory_id, Alias=bad_alias)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
fr"Value '{bad_alias}' at 'alias' failed to satisfy constraint: "
fr"Member must satisfy regular expression pattern: "
fr"^(?!D-|d-)([\da-zA-Z]+)([-]*[\da-zA-Z])*$"
) in err["Message"]
# Too long.
bad_alias = f"d-{get_random_hex(62)}"
with pytest.raises(ClientError) as exc:
client.create_alias(DirectoryId=directory_id, Alias=bad_alias)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
f"Value '{bad_alias}' at 'alias' failed to satisfy constraint: "
f"Member must have length less than or equal to 62"
) in err["Message"]
# Just right.
good_alias = f"{get_random_hex(10)}"
result = client.create_alias(DirectoryId=directory_id, Alias=good_alias)
assert result["DirectoryId"] == directory_id
assert result["Alias"] == good_alias
result = client.describe_directories()
directory = result["DirectoryDescriptions"][0]
assert directory["Alias"] == good_alias
assert directory["AccessUrl"] == f"{good_alias}.awsapps.com"
# Attempt to create another alias for the same directory.
another_good_alias = f"{get_random_hex(10)}"
with pytest.raises(ClientError) as exc:
client.create_alias(DirectoryId=directory_id, Alias=another_good_alias)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert (
"The directory in the request already has an alias. That alias must "
"be deleted before a new alias can be created."
) in err["Message"]
# Create a second directory we can test against.
directory_id2 = create_test_directory(client, ec2_client)
with pytest.raises(ClientError) as exc:
client.create_alias(DirectoryId=directory_id2, Alias=good_alias)
err = exc.value.response["Error"]
assert err["Code"] == "EntityAlreadyExistsException"
assert f"Alias '{good_alias}' already exists." in err["Message"]
@mock_ec2
@mock_ds
def test_ds_enable_sso():
"""Test good and bad invocations of enable_sso()."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Create a directory we can test against.
directory_id = create_test_directory(client, ec2_client)
# Need an alias before setting SSO.
with pytest.raises(ClientError) as exc:
client.enable_sso(DirectoryId=directory_id)
err = exc.value.response["Error"]
assert err["Code"] == "ClientException"
assert (
f"An alias is required before enabling SSO. DomainId={directory_id}"
) in err["Message"]
# Add the alias to continue testing.
client.create_alias(DirectoryId=directory_id, Alias="anything-goes")
# Password must be less than 128 chars in length.
good_username = "test"
bad_password = f"bad_password{get_random_hex(128)}"
with pytest.raises(ClientError) as exc:
client.enable_sso(
DirectoryId=directory_id, UserName=good_username, Password=bad_password
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
"Value at 'password' failed to satisfy constraint: Member must "
"have length less than or equal to 128"
) in err["Message"]
# Username has constraints.
bad_username = "@test"
good_password = "password"
with pytest.raises(ClientError) as exc:
client.enable_sso(
DirectoryId=directory_id, UserName=bad_username, Password=good_password
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
fr"Value '{bad_username}' at 'userName' failed to satisfy constraint: "
fr"Member must satisfy regular expression pattern: ^[a-zA-Z0-9._-]+$"
) in err["Message"]
# Valid execution.
client.enable_sso(DirectoryId=directory_id)
result = client.describe_directories()
directory = result["DirectoryDescriptions"][0]
assert directory["SsoEnabled"]
@mock_ec2
@mock_ds
def test_ds_disable_sso():
"""Test good and bad invocations of disable_sso()."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Create a directory we can test against.
directory_id = create_test_directory(client, ec2_client)
# Password must be less than 128 chars in length.
good_username = "test"
bad_password = f"bad_password{get_random_hex(128)}"
with pytest.raises(ClientError) as exc:
client.disable_sso(
DirectoryId=directory_id, UserName=good_username, Password=bad_password
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
"Value at 'password' failed to satisfy constraint: Member must "
"have length less than or equal to 128"
) in err["Message"]
# Username has constraints.
bad_username = "@test"
good_password = "password"
with pytest.raises(ClientError) as exc:
client.disable_sso(
DirectoryId=directory_id, UserName=bad_username, Password=good_password
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
fr"Value '{bad_username}' at 'userName' failed to satisfy constraint: "
fr"Member must satisfy regular expression pattern: ^[a-zA-Z0-9._-]+$"
) in err["Message"]
# Valid execution. First enable SSO, as the default is disabled SSO.
client.create_alias(DirectoryId=directory_id, Alias="anything-goes")
client.enable_sso(DirectoryId=directory_id)
client.disable_sso(DirectoryId=directory_id)
result = client.describe_directories()
directory = result["DirectoryDescriptions"][0]
assert not directory["SsoEnabled"]

View File

@ -0,0 +1,297 @@
"""Directory-related unit tests for AD Connect Directory Services.
The logic to check the details of VPCs and Subnets is shared between the
"create directory" APIs, so it will not be repeated here.
"""
from datetime import datetime, timezone
import boto3
from botocore.exceptions import ClientError
import pytest
from moto import mock_ds
from moto.core.utils import get_random_hex
from moto.ec2 import mock_ec2
from .test_ds_simple_ad_directory import TEST_REGION, create_vpc, create_subnets
def create_test_ad_connector(
ds_client,
ec2_client,
vpc_settings=None,
customer_dns_ips=None,
customer_user_name="Admin",
tags=None,
): # pylint: disable=too-many-arguments
"""Return ID of a newly created valid directory."""
if not vpc_settings:
good_vpc_id = create_vpc(ec2_client)
good_subnet_ids = create_subnets(ec2_client, good_vpc_id)
vpc_settings = {"VpcId": good_vpc_id, "SubnetIds": good_subnet_ids}
if not customer_dns_ips:
customer_dns_ips = ["1.2.3.4", "5.6.7.8"]
if not tags:
tags = []
result = ds_client.connect_directory(
Name=f"test-{get_random_hex(6)}.test",
Password="4ADConnectPassword",
Size="Small",
ConnectSettings={
"VpcId": vpc_settings["VpcId"],
"SubnetIds": vpc_settings["SubnetIds"],
"CustomerDnsIps": customer_dns_ips,
"CustomerUserName": customer_user_name,
},
Tags=tags,
)
return result["DirectoryId"]
@mock_ds
def test_ds_connect_directory_validations():
"""Test validation errs that aren't caught by botocore.
Most of this validation is shared with the Simple AD directory, but
this verifies that it is invoked from connect_directory().
"""
client = boto3.client("ds", region_name=TEST_REGION)
random_num = get_random_hex(6)
# Verify ValidationException error messages are accumulated properly.
bad_name = f"bad_name_{random_num}"
bad_password = "bad_password"
bad_size = "foo"
ok_connect_settings = {
"VpcId": f"vpc-{random_num}",
"SubnetIds": [f"subnet-{random_num}01", f"subnet-{random_num}02"],
"CustomerUserName": "foo",
"CustomerDnsIps": ["1.2.3.4"],
}
with pytest.raises(ClientError) as exc:
client.connect_directory(
Name=bad_name,
Password=bad_password,
Size=bad_size,
ConnectSettings=ok_connect_settings,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "3 validation errors detected" in err["Message"]
assert (
r"Value at 'password' failed to satisfy constraint: "
r"Member must satisfy regular expression pattern: "
r"^(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[^A-Za-z0-9\s])(?=.*[a-z])|"
r"(?=.*[^A-Za-z0-9\s])(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*$" in err["Message"]
)
assert (
f"Value '{bad_size}' at 'size' failed to satisfy constraint: "
f"Member must satisfy enum value set: [Small, Large]" in err["Message"]
)
assert (
fr"Value '{bad_name}' at 'name' failed to satisfy constraint: "
fr"Member must satisfy regular expression pattern: "
fr"^([a-zA-Z0-9]+[\.-])+([a-zA-Z0-9])+$" in err["Message"]
)
too_long = (
"Test of directory service 0123456789 0123456789 0123456789 "
"0123456789 0123456789 0123456789 0123456789 0123456789 0123456789 "
"0123456789 0123456789"
)
short_name = "a:b.c"
with pytest.raises(ClientError) as exc:
client.connect_directory(
Name=f"test{random_num}.test",
Password="TESTfoobar1",
ConnectSettings=ok_connect_settings,
Description=too_long,
ShortName=short_name,
Size="Small",
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "2 validation errors detected" in err["Message"]
assert (
f"Value '{too_long}' at 'description' failed to satisfy constraint: "
f"Member must have length less than or equal to 128" in err["Message"]
)
pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$'
assert (
f"Value '{short_name}' at 'shortName' failed to satisfy constraint: "
f"Member must satisfy regular expression pattern: " + pattern
) in err["Message"]
bad_connect_settings = {
"VpcId": f"vpc-{random_num}",
"SubnetIds": ["foo"],
"CustomerUserName": "foo",
"CustomerDnsIps": ["1.2.3.4"],
}
with pytest.raises(ClientError) as exc:
client.connect_directory(
Name=f"test{random_num}.test",
Password="TESTfoobar1",
ConnectSettings=bad_connect_settings,
Size="Small",
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
fr"Value '['{bad_connect_settings['SubnetIds'][0]}']' at "
fr"'connectSettings.vpcSettings.subnetIds' failed to satisfy "
fr"constraint: Member must satisfy regular expression pattern: "
fr"^(subnet-[0-9a-f]{{8}}|subnet-[0-9a-f]{{17}})$" in err["Message"]
)
@mock_ec2
@mock_ds
def test_ds_connect_directory_good_args():
"""Test creation of AD connect directory using good arguments."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Verify a good call to connect_directory()
directory_id = create_test_ad_connector(client, ec2_client)
assert directory_id.startswith("d-")
# Verify that too many directories can't be created.
limits = client.get_directory_limits()["DirectoryLimits"]
for _ in range(limits["ConnectedDirectoriesLimit"]):
create_test_ad_connector(client, ec2_client)
with pytest.raises(ClientError) as exc:
create_test_ad_connector(client, ec2_client)
err = exc.value.response["Error"]
assert err["Code"] == "DirectoryLimitExceededException"
assert (
f"Directory limit exceeded. A maximum of "
f"{limits['ConnectedDirectoriesLimit']} "
f"directories may be created" in err["Message"]
)
@mock_ec2
@mock_ds
def test_ds_connect_directory_bad_args():
"""Test validation of non-vpc related ConnectionSettings values."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Bad CustomerUserName.
bad_username = "oops$"
with pytest.raises(ClientError) as exc:
create_test_ad_connector(client, ec2_client, customer_user_name=bad_username)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
fr"Value '{bad_username}' at 'connectSettings.customerUserName' "
fr"failed to satisfy constraint: Member must satisfy regular "
fr"expression pattern: ^[a-zA-Z0-9._-]+$" in err["Message"]
)
# Bad CustomerDnsIps.
bad_dns_ip = ["1.2.3.450"]
with pytest.raises(ClientError) as exc:
create_test_ad_connector(client, ec2_client, customer_dns_ips=bad_dns_ip)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
fr"Value '{bad_dns_ip}' at 'connectSettings.customerDnsIps' "
fr"failed to satisfy constraint: Member must satisfy regular "
fr"expression pattern: ^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.)"
fr"{{3}}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$" in err["Message"]
)
@mock_ec2
@mock_ds
def test_ds_connect_directory_delete():
"""Test deletion of AD Connector directory."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Delete an existing directory.
directory_id = create_test_ad_connector(client, ec2_client)
result = client.delete_directory(DirectoryId=directory_id)
assert result["DirectoryId"] == directory_id
@mock_ec2
@mock_ds
def test_ds_connect_directory_describe():
"""Test describe_directory() for AD Connector directory."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Test that if no directory IDs are specified, all are returned.
directory_id = create_test_ad_connector(client, ec2_client)
result = client.describe_directories()
directory = result["DirectoryDescriptions"][0]
assert len(result["DirectoryDescriptions"]) == 1
assert directory["DesiredNumberOfDomainControllers"] == 0
assert not directory["SsoEnabled"]
assert directory["DirectoryId"] == directory_id
assert directory["Name"].startswith("test-")
assert directory["Alias"] == directory_id
assert directory["AccessUrl"] == f"{directory_id}.awsapps.com"
assert directory["Stage"] == "Active"
assert directory["LaunchTime"] <= datetime.now(timezone.utc)
assert directory["StageLastUpdatedDateTime"] <= datetime.now(timezone.utc)
assert directory["Type"] == "ADConnector"
assert directory["ConnectSettings"]["VpcId"].startswith("vpc-")
assert len(directory["ConnectSettings"]["SubnetIds"]) == 2
assert directory["ConnectSettings"]["CustomerUserName"] == "Admin"
assert set(directory["ConnectSettings"]["ConnectIps"]) == set(
["10.0.0.1", "10.0.1.1"]
)
assert directory["Size"] == "Small"
assert set(directory["DnsIpAddrs"]) == set(["1.2.3.4", "5.6.7.8"])
assert "NextToken" not in result
@mock_ec2
@mock_ds
def test_ds_connect_directory_tags():
"""Test that directory tags can be added and retrieved."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
added_tags = [{"Key": f"{x}", "Value": f"{x}"} for x in range(10)]
directory_id = create_test_ad_connector(client, ec2_client, tags=added_tags)
result = client.list_tags_for_resource(ResourceId=directory_id)
assert len(result["Tags"]) == 10
assert result["Tags"] == added_tags
@mock_ec2
@mock_ds
def test_ds_get_connect_directory_limits():
"""Test return value for ad connector directory limits."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Create a bunch of directories and verify the current count has been
# updated.
limits = client.get_directory_limits()["DirectoryLimits"]
for _ in range(limits["ConnectedDirectoriesLimit"]):
create_test_ad_connector(client, ec2_client)
limits = client.get_directory_limits()["DirectoryLimits"]
assert (
limits["ConnectedDirectoriesLimit"]
== limits["ConnectedDirectoriesCurrentCount"]
)
assert limits["ConnectedDirectoriesLimitReached"]
assert not limits["CloudOnlyDirectoriesCurrentCount"]
assert not limits["CloudOnlyMicrosoftADCurrentCount"]

View File

@ -0,0 +1,234 @@
"""Directory-related unit tests for Microsoft AD Directory Services.
The logic to check the details of VPCs and Subnets is shared between the
"create directory" APIs, so it will not be repeated here.
"""
from datetime import datetime, timezone
import boto3
from botocore.exceptions import ClientError
import pytest
from moto import mock_ds
from moto.core.utils import get_random_hex
from moto.ec2 import mock_ec2
from .test_ds_simple_ad_directory import TEST_REGION, create_vpc, create_subnets
def create_test_microsoft_ad(ds_client, ec2_client, vpc_settings=None, tags=None):
"""Return ID of a newly created valid directory."""
if not vpc_settings:
good_vpc_id = create_vpc(ec2_client)
good_subnet_ids = create_subnets(ec2_client, good_vpc_id)
vpc_settings = {"VpcId": good_vpc_id, "SubnetIds": good_subnet_ids}
if not tags:
tags = []
result = ds_client.create_microsoft_ad(
Name=f"test-{get_random_hex(6)}.test",
Password="4MicrosoftADPassword",
VpcSettings=vpc_settings,
Tags=tags,
Edition="Standard",
)
return result["DirectoryId"]
@mock_ds
def test_ds_create_microsoft_ad_validations():
"""Test validation errs that aren't caught by botocore.
Most of this validation is shared with the Simple AD directory, but
this verifies that it is invoked from create_microsoft_ad().
"""
client = boto3.client("ds", region_name=TEST_REGION)
random_num = get_random_hex(6)
# Verify ValidationException error messages are accumulated properly.
bad_name = f"bad_name_{random_num}"
bad_password = "bad_password"
bad_edition = "foo"
ok_vpc_settings = {
"VpcId": f"vpc-{random_num}",
"SubnetIds": [f"subnet-{random_num}01", f"subnet-{random_num}02"],
}
with pytest.raises(ClientError) as exc:
client.create_microsoft_ad(
Name=bad_name,
Password=bad_password,
Edition=bad_edition,
VpcSettings=ok_vpc_settings,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "3 validation errors detected" in err["Message"]
assert (
r"Value at 'password' failed to satisfy constraint: "
r"Member must satisfy regular expression pattern: "
r"^(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[^A-Za-z0-9\s])(?=.*[a-z])|"
r"(?=.*[^A-Za-z0-9\s])(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*$" in err["Message"]
)
assert (
f"Value '{bad_edition}' at 'edition' failed to satisfy constraint: "
f"Member must satisfy enum value set: [Enterprise, Standard]" in err["Message"]
)
assert (
fr"Value '{bad_name}' at 'name' failed to satisfy constraint: "
fr"Member must satisfy regular expression pattern: "
fr"^([a-zA-Z0-9]+[\.-])+([a-zA-Z0-9])+$" in err["Message"]
)
too_long = (
"Test of directory service 0123456789 0123456789 0123456789 "
"0123456789 0123456789 0123456789 0123456789 0123456789 0123456789 "
"0123456789 0123456789"
)
short_name = "a:b.c"
with pytest.raises(ClientError) as exc:
client.create_microsoft_ad(
Name=f"test{random_num}.test",
Password="TESTfoobar1",
VpcSettings=ok_vpc_settings,
Description=too_long,
ShortName=short_name,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "2 validation errors detected" in err["Message"]
assert (
f"Value '{too_long}' at 'description' failed to satisfy constraint: "
f"Member must have length less than or equal to 128" in err["Message"]
)
pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$'
assert (
f"Value '{short_name}' at 'shortName' failed to satisfy constraint: "
f"Member must satisfy regular expression pattern: " + pattern
) in err["Message"]
bad_vpc_settings = {"VpcId": f"vpc-{random_num}", "SubnetIds": ["foo"]}
with pytest.raises(ClientError) as exc:
client.create_microsoft_ad(
Name=f"test{random_num}.test",
Password="TESTfoobar1",
VpcSettings=bad_vpc_settings,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
fr"Value '['{bad_vpc_settings['SubnetIds'][0]}']' at "
fr"'vpcSettings.subnetIds' failed to satisfy constraint: "
fr"Member must satisfy regular expression pattern: "
fr"^(subnet-[0-9a-f]{{8}}|subnet-[0-9a-f]{{17}})$" in err["Message"]
)
@mock_ec2
@mock_ds
def test_ds_create_microsoft_ad_good_args():
"""Test creation of Microsoft AD directory using good arguments."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Verify a good call to create_microsoft_ad()
directory_id = create_test_microsoft_ad(client, ec2_client)
assert directory_id.startswith("d-")
# Verify that too many directories can't be created.
limits = client.get_directory_limits()["DirectoryLimits"]
for _ in range(limits["CloudOnlyMicrosoftADLimit"]):
create_test_microsoft_ad(client, ec2_client)
with pytest.raises(ClientError) as exc:
create_test_microsoft_ad(client, ec2_client)
err = exc.value.response["Error"]
assert err["Code"] == "DirectoryLimitExceededException"
assert (
f"Directory limit exceeded. A maximum of "
f"{limits['CloudOnlyMicrosoftADLimit']} "
f"directories may be created" in err["Message"]
)
@mock_ec2
@mock_ds
def test_ds_create_microsoft_ad_delete():
"""Test deletion of Microsoft AD directory."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Delete an existing directory.
directory_id = create_test_microsoft_ad(client, ec2_client)
result = client.delete_directory(DirectoryId=directory_id)
assert result["DirectoryId"] == directory_id
@mock_ec2
@mock_ds
def test_ds_create_microsoft_ad_describe():
"""Test describe_directory() for Microsoft AD directory."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Test that if no directory IDs are specified, all are returned.
directory_id = create_test_microsoft_ad(client, ec2_client)
result = client.describe_directories()
directory = result["DirectoryDescriptions"][0]
assert len(result["DirectoryDescriptions"]) == 1
assert directory["DesiredNumberOfDomainControllers"] == 0
assert not directory["SsoEnabled"]
assert directory["DirectoryId"] == directory_id
assert directory["Name"].startswith("test-")
assert directory["Alias"] == directory_id
assert directory["AccessUrl"] == f"{directory_id}.awsapps.com"
assert directory["Stage"] == "Active"
assert directory["LaunchTime"] <= datetime.now(timezone.utc)
assert directory["StageLastUpdatedDateTime"] <= datetime.now(timezone.utc)
assert directory["Type"] == "MicrosoftAD"
assert directory["VpcSettings"]["VpcId"].startswith("vpc-")
assert len(directory["VpcSettings"]["SubnetIds"]) == 2
assert directory["Edition"] == "Standard"
assert set(directory["DnsIpAddrs"]) == set(["10.0.1.1", "10.0.0.1"])
assert "NextToken" not in result
@mock_ec2
@mock_ds
def test_ds_create_microsoft_ad_tags():
"""Test that AD directory tags can be added and retrieved."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
added_tags = [{"Key": f"{x}", "Value": f"{x}"} for x in range(10)]
directory_id = create_test_microsoft_ad(client, ec2_client, tags=added_tags)
result = client.list_tags_for_resource(ResourceId=directory_id)
assert len(result["Tags"]) == 10
assert result["Tags"] == added_tags
@mock_ec2
@mock_ds
def test_ds_get_microsoft_ad_directory_limits():
"""Test return value for directory limits."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Create a bunch of directories and verify the current count has been
# updated.
limits = client.get_directory_limits()["DirectoryLimits"]
for _ in range(limits["CloudOnlyMicrosoftADLimit"]):
create_test_microsoft_ad(client, ec2_client)
limits = client.get_directory_limits()["DirectoryLimits"]
assert (
limits["CloudOnlyMicrosoftADLimit"]
== limits["CloudOnlyMicrosoftADCurrentCount"]
)
assert limits["CloudOnlyMicrosoftADLimitReached"]
assert not limits["ConnectedDirectoriesLimitReached"]
assert not limits["CloudOnlyDirectoriesCurrentCount"]

View File

@ -77,10 +77,10 @@ def test_ds_create_directory_validations():
assert (
r"Value at 'password' failed to satisfy constraint: "
r"Member must satisfy regular expression pattern: "
r"(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|"
r"^(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[^A-Za-z0-9\s])(?=.*[a-z])|"
r"(?=.*[^A-Za-z0-9\s])(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*;" in err["Message"]
r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*$" in err["Message"]
)
assert (
f"Value '{bad_size}' at 'size' failed to satisfy constraint: "
@ -132,7 +132,7 @@ def test_ds_create_directory_validations():
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
fr"Value '{bad_vpc_settings['SubnetIds'][0]}' at "
fr"Value '['{bad_vpc_settings['SubnetIds'][0]}']' at "
fr"'vpcSettings.subnetIds' failed to satisfy constraint: "
fr"Member must satisfy regular expression pattern: "
fr"^(subnet-[0-9a-f]{{8}}|subnet-[0-9a-f]{{17}})$" in err["Message"]