moto/tests/test_ds/test_ds_simple_ad_directory.py

247 lines
8.9 KiB
Python

"""Directory-related unit tests for Simple AD Directory Services."""
import boto3
from botocore.exceptions import ClientError
import pytest
from moto import mock_ds
from moto import settings
from moto.ec2 import mock_ec2
from moto.moto_api._internal import mock_random
TEST_REGION = "us-east-1" if settings.TEST_SERVER_MODE else "us-west-2"
def create_vpc(ec2_client):
"""Return the ID for a valid VPC."""
return ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]["VpcId"]
def create_subnets(
ec2_client, vpc_id, region1=TEST_REGION + "a", region2=TEST_REGION + "b"
):
"""Return list of two subnets IDs."""
subnet_ids = []
for cidr_block, region in [("10.0.1.0/24", region1), ("10.0.0.0/24", region2)]:
subnet_ids.append(
ec2_client.create_subnet(
VpcId=vpc_id, CidrBlock=cidr_block, AvailabilityZone=region
)["Subnet"]["SubnetId"]
)
return subnet_ids
def create_test_directory(ds_client, ec2_client, vpc_settings=None, tags=None):
"""Return ID of a newly created valid directory."""
if not vpc_settings:
good_vpc_id = create_vpc(ec2_client)
good_subnet_ids = create_subnets(ec2_client, good_vpc_id)
vpc_settings = {"VpcId": good_vpc_id, "SubnetIds": good_subnet_ids}
if not tags:
tags = []
result = ds_client.create_directory(
Name=f"test-{mock_random.get_random_hex(6)}.test",
Password="Password4TheAges",
Size="Large",
VpcSettings=vpc_settings,
Tags=tags,
)
return result["DirectoryId"]
@mock_ds
def test_ds_create_directory_validations():
"""Test validation errs that aren't caught by botocore."""
client = boto3.client("ds", region_name=TEST_REGION)
random_num = mock_random.get_random_hex(6)
# Verify ValidationException error messages are accumulated properly.
bad_name = f"bad_name_{random_num}"
bad_password = "bad_password"
bad_size = "big"
ok_vpc_settings = {
"VpcId": f"vpc-{random_num}",
"SubnetIds": [f"subnet-{random_num}01", f"subnet-{random_num}02"],
}
with pytest.raises(ClientError) as exc:
client.create_directory(
Name=bad_name,
Password=bad_password,
Size=bad_size,
VpcSettings=ok_vpc_settings,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "3 validation errors detected" in err["Message"]
assert (
r"Value at 'password' failed to satisfy constraint: "
r"Member must satisfy regular expression pattern: "
r"^(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[^A-Za-z0-9\s])(?=.*[a-z])|"
r"(?=.*[^A-Za-z0-9\s])(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*$" in err["Message"]
)
assert (
f"Value '{bad_size}' at 'size' failed to satisfy constraint: "
f"Member must satisfy enum value set: [Small, Large];" in err["Message"]
)
assert (
rf"Value '{bad_name}' at 'name' failed to satisfy constraint: "
rf"Member must satisfy regular expression pattern: "
rf"^([a-zA-Z0-9]+[\.-])+([a-zA-Z0-9])+$" in err["Message"]
)
too_long = (
"Test of directory service 0123456789 0123456789 0123456789 "
"0123456789 0123456789 0123456789 0123456789 0123456789 0123456789 "
"0123456789 0123456789"
)
short_name = "a:b.c"
with pytest.raises(ClientError) as exc:
client.create_directory(
Name=f"test{random_num}.test",
Password="TESTfoobar1",
Size="Large",
VpcSettings=ok_vpc_settings,
Description=too_long,
ShortName=short_name,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "2 validation errors detected" in err["Message"]
assert (
f"Value '{too_long}' at 'description' failed to satisfy constraint: "
f"Member must have length less than or equal to 128" in err["Message"]
)
pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$'
assert (
f"Value '{short_name}' at 'shortName' failed to satisfy constraint: "
f"Member must satisfy regular expression pattern: " + pattern
) in err["Message"]
bad_vpc_settings = {"VpcId": f"vpc-{random_num}", "SubnetIds": ["foo"]}
with pytest.raises(ClientError) as exc:
client.create_directory(
Name=f"test{random_num}.test",
Password="TESTfoobar1",
Size="Large",
VpcSettings=bad_vpc_settings,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
rf"Value '['{bad_vpc_settings['SubnetIds'][0]}']' at "
rf"'vpcSettings.subnetIds' failed to satisfy constraint: "
rf"Member must satisfy regular expression pattern: "
rf"^(subnet-[0-9a-f]{{8}}|subnet-[0-9a-f]{{17}})$" in err["Message"]
)
@mock_ec2
@mock_ds
def test_ds_create_directory_bad_vpc_settings():
"""Test validation of bad vpc that doesn't raise ValidationException."""
client = boto3.client("ds", region_name=TEST_REGION)
# Error if no VpcSettings argument.
with pytest.raises(ClientError) as exc:
client.create_directory(
Name=f"test-{mock_random.get_random_hex(6)}.test",
Password="TESTfoobar1",
Size="Small",
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert "VpcSettings must be specified" in err["Message"]
# Error if VPC is bogus.
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
good_subnet_ids = create_subnets(ec2_client, create_vpc(ec2_client))
with pytest.raises(ClientError) as exc:
create_test_directory(
client, ec2_client, {"VpcId": "vpc-12345678", "SubnetIds": good_subnet_ids}
)
err = exc.value.response["Error"]
assert err["Code"] == "ClientException"
assert "Invalid VPC ID" in err["Message"]
@mock_ec2
@mock_ds
def test_ds_create_directory_bad_subnets():
"""Test validation of VPC subnets."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Error if VPC subnets are bogus.
good_vpc_id = create_vpc(ec2_client)
with pytest.raises(ClientError) as exc:
create_test_directory(
client,
ec2_client,
{"VpcId": good_vpc_id, "SubnetIds": ["subnet-12345678", "subnet-87654321"]},
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert (
"Invalid subnet ID(s). They must correspond to two subnets in "
"different Availability Zones."
) in err["Message"]
# Error if both VPC subnets are in the same region.
subnets_same_region = create_subnets(
ec2_client, good_vpc_id, region1=TEST_REGION + "a", region2=TEST_REGION + "a"
)
with pytest.raises(ClientError) as exc:
create_test_directory(
client, ec2_client, {"VpcId": good_vpc_id, "SubnetIds": subnets_same_region}
)
err = exc.value.response["Error"]
assert err["Code"] == "ClientException"
assert (
"Invalid subnet ID(s). The two subnets must be in different "
"Availability Zones."
) in err["Message"]
ec2_client.delete_subnet(SubnetId=subnets_same_region[0])
ec2_client.delete_subnet(SubnetId=subnets_same_region[1])
# Error if only one VPC subnet.
good_subnet_ids = create_subnets(ec2_client, good_vpc_id)
with pytest.raises(ClientError) as exc:
create_test_directory(
client,
ec2_client,
{"VpcId": good_vpc_id, "SubnetIds": [good_subnet_ids[0]]},
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert "Invalid subnet ID(s). They must correspond to two subnets" in err["Message"]
@mock_ec2
@mock_ds
def test_ds_create_directory_good_args():
"""Test creation of AD directory using good arguments."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Verify a good call to create_directory()
directory_id = create_test_directory(client, ec2_client)
assert directory_id.startswith("d-")
# Verify that too many directories can't be created.
limits = client.get_directory_limits()["DirectoryLimits"]
for _ in range(limits["CloudOnlyDirectoriesLimit"]):
create_test_directory(client, ec2_client)
with pytest.raises(ClientError) as exc:
create_test_directory(client, ec2_client)
err = exc.value.response["Error"]
assert err["Code"] == "DirectoryLimitExceededException"
assert (
f"Directory limit exceeded. A maximum of "
f"{limits['CloudOnlyDirectoriesLimit']} "
f"directories may be created" in err["Message"]
)