334 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			334 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Directory-related unit tests common to different directory types.
 | 
						|
 | 
						|
Simple AD directories are used for test data, but the operations are
 | 
						|
common to the other directory types.
 | 
						|
"""
 | 
						|
from datetime import datetime, timezone
 | 
						|
 | 
						|
import boto3
 | 
						|
from botocore.exceptions import ClientError
 | 
						|
import pytest
 | 
						|
 | 
						|
from moto import mock_ds
 | 
						|
from moto.core.utils import get_random_hex
 | 
						|
from moto.ec2 import mock_ec2
 | 
						|
 | 
						|
from .test_ds_simple_ad_directory import create_test_directory, TEST_REGION
 | 
						|
 | 
						|
 | 
						|
@mock_ec2
 | 
						|
@mock_ds
 | 
						|
def test_ds_delete_directory():
 | 
						|
    """Test good and bad invocations of delete_directory()."""
 | 
						|
    client = boto3.client("ds", region_name=TEST_REGION)
 | 
						|
 | 
						|
    # Delete a directory when there are none.
 | 
						|
    random_directory_id = f"d-{get_random_hex(10)}"
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.delete_directory(DirectoryId=random_directory_id)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "EntityDoesNotExistException"
 | 
						|
    assert f"Directory {random_directory_id} does not exist" in err["Message"]
 | 
						|
 | 
						|
    # Delete an existing directory.
 | 
						|
    ec2_client = boto3.client("ec2", region_name=TEST_REGION)
 | 
						|
    directory_id = create_test_directory(client, ec2_client)
 | 
						|
    result = client.delete_directory(DirectoryId=directory_id)
 | 
						|
    assert result["DirectoryId"] == directory_id
 | 
						|
 | 
						|
    # Verify there are no dictionaries, network interfaces or associated
 | 
						|
    # security groups.
 | 
						|
    result = client.describe_directories()
 | 
						|
    assert len(result["DirectoryDescriptions"]) == 0
 | 
						|
    result = ec2_client.describe_network_interfaces()
 | 
						|
    assert len(result["NetworkInterfaces"]) == 0
 | 
						|
    result = ec2_client.describe_security_groups()
 | 
						|
    for group in result["SecurityGroups"]:
 | 
						|
        assert "directory controllers" not in group["Description"]
 | 
						|
 | 
						|
    # Attempt to delete a non-existent directory.
 | 
						|
    nonexistent_id = f"d-{get_random_hex(10)}"
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.delete_directory(DirectoryId=nonexistent_id)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "EntityDoesNotExistException"
 | 
						|
    assert f"Directory {nonexistent_id} does not exist" in err["Message"]
 | 
						|
 | 
						|
    # Attempt to use an invalid directory ID.
 | 
						|
    bad_id = get_random_hex(3)
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.delete_directory(DirectoryId=bad_id)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ValidationException"
 | 
						|
    assert "1 validation error detected" in err["Message"]
 | 
						|
    assert (
 | 
						|
        f"Value '{bad_id}' at 'directoryId' failed to satisfy constraint: "
 | 
						|
        f"Member must satisfy regular expression pattern: ^d-[0-9a-f]{{10}}$"
 | 
						|
    ) in err["Message"]
 | 
						|
 | 
						|
 | 
						|
@mock_ec2
 | 
						|
@mock_ds
 | 
						|
def test_ds_get_directory_limits():
 | 
						|
    """Test return value for directory limits."""
 | 
						|
    client = boto3.client("ds", region_name=TEST_REGION)
 | 
						|
    ec2_client = boto3.client("ec2", region_name=TEST_REGION)
 | 
						|
 | 
						|
    limits = client.get_directory_limits()["DirectoryLimits"]
 | 
						|
    assert limits["CloudOnlyDirectoriesCurrentCount"] == 0
 | 
						|
    assert limits["CloudOnlyDirectoriesLimit"] > 0
 | 
						|
    assert not limits["CloudOnlyDirectoriesLimitReached"]
 | 
						|
 | 
						|
    # Create a bunch of directories and verify the current count has been
 | 
						|
    # updated.
 | 
						|
    for _ in range(limits["CloudOnlyDirectoriesLimit"]):
 | 
						|
        create_test_directory(client, ec2_client)
 | 
						|
    limits = client.get_directory_limits()["DirectoryLimits"]
 | 
						|
    assert (
 | 
						|
        limits["CloudOnlyDirectoriesLimit"]
 | 
						|
        == limits["CloudOnlyDirectoriesCurrentCount"]
 | 
						|
    )
 | 
						|
    assert limits["CloudOnlyDirectoriesLimitReached"]
 | 
						|
    assert not limits["CloudOnlyMicrosoftADCurrentCount"]
 | 
						|
    assert not limits["ConnectedDirectoriesCurrentCount"]
 | 
						|
 | 
						|
 | 
						|
@mock_ec2
 | 
						|
@mock_ds
 | 
						|
def test_ds_describe_directories():
 | 
						|
    """Test good and bad invocations of describe_directories()."""
 | 
						|
    client = boto3.client("ds", region_name=TEST_REGION)
 | 
						|
    ec2_client = boto3.client("ec2", region_name=TEST_REGION)
 | 
						|
 | 
						|
    expected_ids = set()
 | 
						|
    limit = 10
 | 
						|
    for _ in range(limit):
 | 
						|
        expected_ids.add(create_test_directory(client, ec2_client))
 | 
						|
 | 
						|
    # Test that if no directory IDs are specified, all are returned.
 | 
						|
    result = client.describe_directories()
 | 
						|
    directories = result["DirectoryDescriptions"]
 | 
						|
    directory_ids = [x["DirectoryId"] for x in directories]
 | 
						|
 | 
						|
    assert len(directories) == limit
 | 
						|
    assert set(directory_ids) == expected_ids
 | 
						|
    for idx, dir_info in enumerate(directories):
 | 
						|
        assert dir_info["DesiredNumberOfDomainControllers"] == 0
 | 
						|
        assert not dir_info["SsoEnabled"]
 | 
						|
        assert dir_info["DirectoryId"] == directory_ids[idx]
 | 
						|
        assert dir_info["Name"].startswith("test-")
 | 
						|
        assert dir_info["Size"] == "Large"
 | 
						|
        assert dir_info["Alias"] == directory_ids[idx]
 | 
						|
        assert dir_info["AccessUrl"] == f"{directory_ids[idx]}.awsapps.com"
 | 
						|
        assert dir_info["Stage"] == "Active"
 | 
						|
        assert dir_info["LaunchTime"] <= datetime.now(timezone.utc)
 | 
						|
        assert dir_info["StageLastUpdatedDateTime"] <= datetime.now(timezone.utc)
 | 
						|
        assert dir_info["Type"] == "SimpleAD"
 | 
						|
        assert dir_info["VpcSettings"]["VpcId"].startswith("vpc-")
 | 
						|
        assert len(dir_info["VpcSettings"]["SubnetIds"]) == 2
 | 
						|
        assert dir_info["VpcSettings"]["SecurityGroupId"].startswith("sg-")
 | 
						|
        assert len(dir_info["DnsIpAddrs"]) == 2
 | 
						|
    assert "NextToken" not in result
 | 
						|
 | 
						|
    # Test with a specific directory ID.
 | 
						|
    result = client.describe_directories(DirectoryIds=[directory_ids[5]])
 | 
						|
    assert len(result["DirectoryDescriptions"]) == 1
 | 
						|
    assert result["DirectoryDescriptions"][0]["DirectoryId"] == directory_ids[5]
 | 
						|
 | 
						|
    # Test with a bad directory ID.
 | 
						|
    bad_id = get_random_hex(3)
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.describe_directories(DirectoryIds=[bad_id])
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ValidationException"
 | 
						|
    assert (
 | 
						|
        f"Value '{bad_id}' at 'directoryId' failed to satisfy constraint: "
 | 
						|
        f"Member must satisfy regular expression pattern: ^d-[0-9a-f]{{10}}$"
 | 
						|
    ) in err["Message"]
 | 
						|
 | 
						|
    # Test with an invalid next token.
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.describe_directories(NextToken="bogus")
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "InvalidNextTokenException"
 | 
						|
    assert "Invalid value passed for the NextToken parameter" in err["Message"]
 | 
						|
 | 
						|
    # Test with a limit.
 | 
						|
    result = client.describe_directories(Limit=5)
 | 
						|
    assert len(result["DirectoryDescriptions"]) == 5
 | 
						|
    directories = result["DirectoryDescriptions"]
 | 
						|
    for idx in range(5):
 | 
						|
        assert directories[idx]["DirectoryId"] == directory_ids[idx]
 | 
						|
    assert result["NextToken"]
 | 
						|
 | 
						|
    result = client.describe_directories(Limit=1, NextToken=result["NextToken"])
 | 
						|
    assert len(result["DirectoryDescriptions"]) == 1
 | 
						|
    assert result["DirectoryDescriptions"][0]["DirectoryId"] == directory_ids[5]
 | 
						|
 | 
						|
 | 
						|
@mock_ec2
 | 
						|
@mock_ds
 | 
						|
def test_ds_create_alias():
 | 
						|
    """Test good and bad invocations of create_alias()."""
 | 
						|
    client = boto3.client("ds", region_name=TEST_REGION)
 | 
						|
    ec2_client = boto3.client("ec2", region_name=TEST_REGION)
 | 
						|
 | 
						|
    # Create a directory we can test against.
 | 
						|
    directory_id = create_test_directory(client, ec2_client)
 | 
						|
 | 
						|
    # Bad format.
 | 
						|
    bad_alias = f"d-{get_random_hex(10)}"
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.create_alias(DirectoryId=directory_id, Alias=bad_alias)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ValidationException"
 | 
						|
    assert (
 | 
						|
        fr"Value '{bad_alias}' at 'alias' failed to satisfy constraint: "
 | 
						|
        fr"Member must satisfy regular expression pattern: "
 | 
						|
        fr"^(?!D-|d-)([\da-zA-Z]+)([-]*[\da-zA-Z])*$"
 | 
						|
    ) in err["Message"]
 | 
						|
 | 
						|
    # Too long.
 | 
						|
    bad_alias = f"d-{get_random_hex(62)}"
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.create_alias(DirectoryId=directory_id, Alias=bad_alias)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ValidationException"
 | 
						|
    assert (
 | 
						|
        f"Value '{bad_alias}' at 'alias' failed to satisfy constraint: "
 | 
						|
        f"Member must have length less than or equal to 62"
 | 
						|
    ) in err["Message"]
 | 
						|
 | 
						|
    # Just right.
 | 
						|
    good_alias = f"{get_random_hex(10)}"
 | 
						|
    result = client.create_alias(DirectoryId=directory_id, Alias=good_alias)
 | 
						|
    assert result["DirectoryId"] == directory_id
 | 
						|
    assert result["Alias"] == good_alias
 | 
						|
    result = client.describe_directories()
 | 
						|
    directory = result["DirectoryDescriptions"][0]
 | 
						|
    assert directory["Alias"] == good_alias
 | 
						|
    assert directory["AccessUrl"] == f"{good_alias}.awsapps.com"
 | 
						|
 | 
						|
    # Attempt to create another alias for the same directory.
 | 
						|
    another_good_alias = f"{get_random_hex(10)}"
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.create_alias(DirectoryId=directory_id, Alias=another_good_alias)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "InvalidParameterException"
 | 
						|
    assert (
 | 
						|
        "The directory in the request already has an alias. That alias must "
 | 
						|
        "be deleted before a new alias can be created."
 | 
						|
    ) in err["Message"]
 | 
						|
 | 
						|
    # Create a second directory we can test against.
 | 
						|
    directory_id2 = create_test_directory(client, ec2_client)
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.create_alias(DirectoryId=directory_id2, Alias=good_alias)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "EntityAlreadyExistsException"
 | 
						|
    assert f"Alias '{good_alias}' already exists." in err["Message"]
 | 
						|
 | 
						|
 | 
						|
@mock_ec2
 | 
						|
@mock_ds
 | 
						|
def test_ds_enable_sso():
 | 
						|
    """Test good and bad invocations of enable_sso()."""
 | 
						|
    client = boto3.client("ds", region_name=TEST_REGION)
 | 
						|
    ec2_client = boto3.client("ec2", region_name=TEST_REGION)
 | 
						|
 | 
						|
    # Create a directory we can test against.
 | 
						|
    directory_id = create_test_directory(client, ec2_client)
 | 
						|
 | 
						|
    # Need an alias before setting SSO.
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.enable_sso(DirectoryId=directory_id)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ClientException"
 | 
						|
    assert (
 | 
						|
        f"An alias is required before enabling SSO. DomainId={directory_id}"
 | 
						|
    ) in err["Message"]
 | 
						|
 | 
						|
    # Add the alias to continue testing.
 | 
						|
    client.create_alias(DirectoryId=directory_id, Alias="anything-goes")
 | 
						|
 | 
						|
    # Password must be less than 128 chars in length.
 | 
						|
    good_username = "test"
 | 
						|
    bad_password = f"bad_password{get_random_hex(128)}"
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.enable_sso(
 | 
						|
            DirectoryId=directory_id, UserName=good_username, Password=bad_password
 | 
						|
        )
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ValidationException"
 | 
						|
    assert (
 | 
						|
        "Value at 'ssoPassword' failed to satisfy constraint: Member must "
 | 
						|
        "have length less than or equal to 128"
 | 
						|
    ) in err["Message"]
 | 
						|
 | 
						|
    # Username has constraints.
 | 
						|
    bad_username = "@test"
 | 
						|
    good_password = "password"
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.enable_sso(
 | 
						|
            DirectoryId=directory_id, UserName=bad_username, Password=good_password
 | 
						|
        )
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ValidationException"
 | 
						|
    assert (
 | 
						|
        fr"Value '{bad_username}' at 'userName' failed to satisfy constraint: "
 | 
						|
        fr"Member must satisfy regular expression pattern: ^[a-zA-Z0-9._-]+$"
 | 
						|
    ) in err["Message"]
 | 
						|
 | 
						|
    # Valid execution.
 | 
						|
    client.enable_sso(DirectoryId=directory_id)
 | 
						|
    result = client.describe_directories()
 | 
						|
    directory = result["DirectoryDescriptions"][0]
 | 
						|
    assert directory["SsoEnabled"]
 | 
						|
 | 
						|
 | 
						|
@mock_ec2
 | 
						|
@mock_ds
 | 
						|
def test_ds_disable_sso():
 | 
						|
    """Test good and bad invocations of disable_sso()."""
 | 
						|
    client = boto3.client("ds", region_name=TEST_REGION)
 | 
						|
    ec2_client = boto3.client("ec2", region_name=TEST_REGION)
 | 
						|
 | 
						|
    # Create a directory we can test against.
 | 
						|
    directory_id = create_test_directory(client, ec2_client)
 | 
						|
 | 
						|
    # Password must be less than 128 chars in length.
 | 
						|
    good_username = "test"
 | 
						|
    bad_password = f"bad_password{get_random_hex(128)}"
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.disable_sso(
 | 
						|
            DirectoryId=directory_id, UserName=good_username, Password=bad_password
 | 
						|
        )
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ValidationException"
 | 
						|
    assert (
 | 
						|
        "Value at 'ssoPassword' failed to satisfy constraint: Member must "
 | 
						|
        "have length less than or equal to 128"
 | 
						|
    ) in err["Message"]
 | 
						|
 | 
						|
    # Username has constraints.
 | 
						|
    bad_username = "@test"
 | 
						|
    good_password = "password"
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.disable_sso(
 | 
						|
            DirectoryId=directory_id, UserName=bad_username, Password=good_password
 | 
						|
        )
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ValidationException"
 | 
						|
    assert (
 | 
						|
        fr"Value '{bad_username}' at 'userName' failed to satisfy constraint: "
 | 
						|
        fr"Member must satisfy regular expression pattern: ^[a-zA-Z0-9._-]+$"
 | 
						|
    ) in err["Message"]
 | 
						|
 | 
						|
    # Valid execution.  First enable SSO, as the default is disabled SSO.
 | 
						|
    client.create_alias(DirectoryId=directory_id, Alias="anything-goes")
 | 
						|
    client.enable_sso(DirectoryId=directory_id)
 | 
						|
    client.disable_sso(DirectoryId=directory_id)
 | 
						|
    result = client.describe_directories()
 | 
						|
    directory = result["DirectoryDescriptions"][0]
 | 
						|
    assert not directory["SsoEnabled"]
 |