296 lines
11 KiB
Python
296 lines
11 KiB
Python
"""Directory-related unit tests for AD Connect Directory Services.
|
|
|
|
The logic to check the details of VPCs and Subnets is shared between the
|
|
"create directory" APIs, so it will not be repeated here.
|
|
"""
|
|
from datetime import datetime, timezone
|
|
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
import pytest
|
|
|
|
from moto import mock_ds
|
|
from moto.ec2 import mock_ec2
|
|
from moto.moto_api._internal import mock_random
|
|
|
|
from .test_ds_simple_ad_directory import TEST_REGION, create_vpc, create_subnets
|
|
|
|
|
|
def create_test_ad_connector(
|
|
ds_client,
|
|
ec2_client,
|
|
vpc_settings=None,
|
|
customer_dns_ips=None,
|
|
customer_user_name="Admin",
|
|
tags=None,
|
|
): # pylint: disable=too-many-arguments
|
|
"""Return ID of a newly created valid directory."""
|
|
if not vpc_settings:
|
|
good_vpc_id = create_vpc(ec2_client)
|
|
good_subnet_ids = create_subnets(ec2_client, good_vpc_id)
|
|
vpc_settings = {"VpcId": good_vpc_id, "SubnetIds": good_subnet_ids}
|
|
|
|
if not customer_dns_ips:
|
|
customer_dns_ips = ["1.2.3.4", "5.6.7.8"]
|
|
|
|
if not tags:
|
|
tags = []
|
|
|
|
result = ds_client.connect_directory(
|
|
Name=f"test-{mock_random.get_random_hex(6)}.test",
|
|
Password="4ADConnectPassword",
|
|
Size="Small",
|
|
ConnectSettings={
|
|
"VpcId": vpc_settings["VpcId"],
|
|
"SubnetIds": vpc_settings["SubnetIds"],
|
|
"CustomerDnsIps": customer_dns_ips,
|
|
"CustomerUserName": customer_user_name,
|
|
},
|
|
Tags=tags,
|
|
)
|
|
return result["DirectoryId"]
|
|
|
|
|
|
@mock_ds
|
|
def test_ds_connect_directory_validations():
|
|
"""Test validation errs that aren't caught by botocore.
|
|
|
|
Most of this validation is shared with the Simple AD directory, but
|
|
this verifies that it is invoked from connect_directory().
|
|
"""
|
|
client = boto3.client("ds", region_name=TEST_REGION)
|
|
random_num = mock_random.get_random_hex(6)
|
|
|
|
# Verify ValidationException error messages are accumulated properly.
|
|
bad_name = f"bad_name_{random_num}"
|
|
bad_password = "bad_password"
|
|
bad_size = "foo"
|
|
ok_connect_settings = {
|
|
"VpcId": f"vpc-{random_num}",
|
|
"SubnetIds": [f"subnet-{random_num}01", f"subnet-{random_num}02"],
|
|
"CustomerUserName": "foo",
|
|
"CustomerDnsIps": ["1.2.3.4"],
|
|
}
|
|
with pytest.raises(ClientError) as exc:
|
|
client.connect_directory(
|
|
Name=bad_name,
|
|
Password=bad_password,
|
|
Size=bad_size,
|
|
ConnectSettings=ok_connect_settings,
|
|
)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "ValidationException"
|
|
assert "3 validation errors detected" in err["Message"]
|
|
assert (
|
|
r"Value at 'password' failed to satisfy constraint: "
|
|
r"Member must satisfy regular expression pattern: "
|
|
r"^(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|"
|
|
r"(?=.*\d)(?=.*[^A-Za-z0-9\s])(?=.*[a-z])|"
|
|
r"(?=.*[^A-Za-z0-9\s])(?=.*[A-Z])(?=.*[a-z])|"
|
|
r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*$" in err["Message"]
|
|
)
|
|
assert (
|
|
f"Value '{bad_size}' at 'size' failed to satisfy constraint: "
|
|
f"Member must satisfy enum value set: [Small, Large]" in err["Message"]
|
|
)
|
|
assert (
|
|
rf"Value '{bad_name}' at 'name' failed to satisfy constraint: "
|
|
rf"Member must satisfy regular expression pattern: "
|
|
rf"^([a-zA-Z0-9]+[\.-])+([a-zA-Z0-9])+$" in err["Message"]
|
|
)
|
|
|
|
too_long = (
|
|
"Test of directory service 0123456789 0123456789 0123456789 "
|
|
"0123456789 0123456789 0123456789 0123456789 0123456789 0123456789 "
|
|
"0123456789 0123456789"
|
|
)
|
|
short_name = "a:b.c"
|
|
with pytest.raises(ClientError) as exc:
|
|
client.connect_directory(
|
|
Name=f"test{random_num}.test",
|
|
Password="TESTfoobar1",
|
|
ConnectSettings=ok_connect_settings,
|
|
Description=too_long,
|
|
ShortName=short_name,
|
|
Size="Small",
|
|
)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "ValidationException"
|
|
assert "2 validation errors detected" in err["Message"]
|
|
assert (
|
|
f"Value '{too_long}' at 'description' failed to satisfy constraint: "
|
|
f"Member must have length less than or equal to 128" in err["Message"]
|
|
)
|
|
pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$'
|
|
assert (
|
|
f"Value '{short_name}' at 'shortName' failed to satisfy constraint: "
|
|
f"Member must satisfy regular expression pattern: " + pattern
|
|
) in err["Message"]
|
|
|
|
bad_connect_settings = {
|
|
"VpcId": f"vpc-{random_num}",
|
|
"SubnetIds": ["foo"],
|
|
"CustomerUserName": "foo",
|
|
"CustomerDnsIps": ["1.2.3.4"],
|
|
}
|
|
with pytest.raises(ClientError) as exc:
|
|
client.connect_directory(
|
|
Name=f"test{random_num}.test",
|
|
Password="TESTfoobar1",
|
|
ConnectSettings=bad_connect_settings,
|
|
Size="Small",
|
|
)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "ValidationException"
|
|
assert "1 validation error detected" in err["Message"]
|
|
assert (
|
|
rf"Value '['{bad_connect_settings['SubnetIds'][0]}']' at "
|
|
rf"'connectSettings.vpcSettings.subnetIds' failed to satisfy "
|
|
rf"constraint: Member must satisfy regular expression pattern: "
|
|
rf"^(subnet-[0-9a-f]{{8}}|subnet-[0-9a-f]{{17}})$" in err["Message"]
|
|
)
|
|
|
|
|
|
@mock_ec2
|
|
@mock_ds
|
|
def test_ds_connect_directory_good_args():
|
|
"""Test creation of AD connect directory using good arguments."""
|
|
client = boto3.client("ds", region_name=TEST_REGION)
|
|
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
|
|
|
|
# Verify a good call to connect_directory()
|
|
directory_id = create_test_ad_connector(client, ec2_client)
|
|
assert directory_id.startswith("d-")
|
|
|
|
# Verify that too many directories can't be created.
|
|
limits = client.get_directory_limits()["DirectoryLimits"]
|
|
for _ in range(limits["ConnectedDirectoriesLimit"]):
|
|
create_test_ad_connector(client, ec2_client)
|
|
with pytest.raises(ClientError) as exc:
|
|
create_test_ad_connector(client, ec2_client)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "DirectoryLimitExceededException"
|
|
assert (
|
|
f"Directory limit exceeded. A maximum of "
|
|
f"{limits['ConnectedDirectoriesLimit']} "
|
|
f"directories may be created" in err["Message"]
|
|
)
|
|
|
|
|
|
@mock_ec2
|
|
@mock_ds
|
|
def test_ds_connect_directory_bad_args():
|
|
"""Test validation of non-vpc related ConnectionSettings values."""
|
|
client = boto3.client("ds", region_name=TEST_REGION)
|
|
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
|
|
|
|
# Bad CustomerUserName.
|
|
bad_username = "oops$"
|
|
with pytest.raises(ClientError) as exc:
|
|
create_test_ad_connector(client, ec2_client, customer_user_name=bad_username)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "ValidationException"
|
|
assert "1 validation error detected" in err["Message"]
|
|
assert (
|
|
rf"Value '{bad_username}' at 'connectSettings.customerUserName' "
|
|
rf"failed to satisfy constraint: Member must satisfy regular "
|
|
rf"expression pattern: ^[a-zA-Z0-9._-]+$" in err["Message"]
|
|
)
|
|
|
|
# Bad CustomerDnsIps.
|
|
bad_dns_ip = ["1.2.3.450"]
|
|
with pytest.raises(ClientError) as exc:
|
|
create_test_ad_connector(client, ec2_client, customer_dns_ips=bad_dns_ip)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "ValidationException"
|
|
assert "1 validation error detected" in err["Message"]
|
|
assert (
|
|
rf"Value '{bad_dns_ip}' at 'connectSettings.customerDnsIps' "
|
|
rf"failed to satisfy constraint: Member must satisfy regular "
|
|
rf"expression pattern: ^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.)"
|
|
rf"{{3}}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$" in err["Message"]
|
|
)
|
|
|
|
|
|
@mock_ec2
|
|
@mock_ds
|
|
def test_ds_connect_directory_delete():
|
|
"""Test deletion of AD Connector directory."""
|
|
client = boto3.client("ds", region_name=TEST_REGION)
|
|
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
|
|
|
|
# Delete an existing directory.
|
|
directory_id = create_test_ad_connector(client, ec2_client)
|
|
result = client.delete_directory(DirectoryId=directory_id)
|
|
assert result["DirectoryId"] == directory_id
|
|
|
|
|
|
@mock_ec2
|
|
@mock_ds
|
|
def test_ds_connect_directory_describe():
|
|
"""Test describe_directory() for AD Connector directory."""
|
|
client = boto3.client("ds", region_name=TEST_REGION)
|
|
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
|
|
|
|
# Test that if no directory IDs are specified, all are returned.
|
|
directory_id = create_test_ad_connector(client, ec2_client)
|
|
result = client.describe_directories()
|
|
directory = result["DirectoryDescriptions"][0]
|
|
|
|
assert len(result["DirectoryDescriptions"]) == 1
|
|
assert directory["DesiredNumberOfDomainControllers"] == 0
|
|
assert not directory["SsoEnabled"]
|
|
assert directory["DirectoryId"] == directory_id
|
|
assert directory["Name"].startswith("test-")
|
|
assert directory["Alias"] == directory_id
|
|
assert directory["AccessUrl"] == f"{directory_id}.awsapps.com"
|
|
assert directory["Stage"] == "Active"
|
|
assert directory["LaunchTime"] <= datetime.now(timezone.utc)
|
|
assert directory["StageLastUpdatedDateTime"] <= datetime.now(timezone.utc)
|
|
assert directory["Type"] == "ADConnector"
|
|
assert directory["ConnectSettings"]["VpcId"].startswith("vpc-")
|
|
assert len(directory["ConnectSettings"]["SubnetIds"]) == 2
|
|
assert directory["ConnectSettings"]["CustomerUserName"] == "Admin"
|
|
assert len(directory["ConnectSettings"]["ConnectIps"]) == 2
|
|
assert directory["Size"] == "Small"
|
|
assert set(directory["DnsIpAddrs"]) == set(["1.2.3.4", "5.6.7.8"])
|
|
assert "NextToken" not in result
|
|
|
|
|
|
@mock_ec2
|
|
@mock_ds
|
|
def test_ds_connect_directory_tags():
|
|
"""Test that directory tags can be added and retrieved."""
|
|
client = boto3.client("ds", region_name=TEST_REGION)
|
|
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
|
|
|
|
added_tags = [{"Key": f"{x}", "Value": f"{x}"} for x in range(10)]
|
|
directory_id = create_test_ad_connector(client, ec2_client, tags=added_tags)
|
|
|
|
result = client.list_tags_for_resource(ResourceId=directory_id)
|
|
assert len(result["Tags"]) == 10
|
|
assert result["Tags"] == added_tags
|
|
|
|
|
|
@mock_ec2
|
|
@mock_ds
|
|
def test_ds_get_connect_directory_limits():
|
|
"""Test return value for ad connector directory limits."""
|
|
client = boto3.client("ds", region_name=TEST_REGION)
|
|
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
|
|
|
|
# Create a bunch of directories and verify the current count has been
|
|
# updated.
|
|
limits = client.get_directory_limits()["DirectoryLimits"]
|
|
for _ in range(limits["ConnectedDirectoriesLimit"]):
|
|
create_test_ad_connector(client, ec2_client)
|
|
|
|
limits = client.get_directory_limits()["DirectoryLimits"]
|
|
assert (
|
|
limits["ConnectedDirectoriesLimit"]
|
|
== limits["ConnectedDirectoriesCurrentCount"]
|
|
)
|
|
assert limits["ConnectedDirectoriesLimitReached"]
|
|
assert not limits["CloudOnlyDirectoriesCurrentCount"]
|
|
assert not limits["CloudOnlyMicrosoftADCurrentCount"]
|