334 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			334 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Directory-related unit tests common to different directory types.
 | |
| 
 | |
| Simple AD directories are used for test data, but the operations are
 | |
| common to the other directory types.
 | |
| """
 | |
| from datetime import datetime, timezone
 | |
| 
 | |
| import boto3
 | |
| from botocore.exceptions import ClientError
 | |
| import pytest
 | |
| 
 | |
| from moto import mock_ds
 | |
| from moto.core.utils import get_random_hex
 | |
| from moto.ec2 import mock_ec2
 | |
| 
 | |
| from .test_ds_simple_ad_directory import create_test_directory, TEST_REGION
 | |
| 
 | |
| 
 | |
| @mock_ec2
 | |
| @mock_ds
 | |
| def test_ds_delete_directory():
 | |
|     """Test good and bad invocations of delete_directory()."""
 | |
|     client = boto3.client("ds", region_name=TEST_REGION)
 | |
| 
 | |
|     # Delete a directory when there are none.
 | |
|     random_directory_id = f"d-{get_random_hex(10)}"
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.delete_directory(DirectoryId=random_directory_id)
 | |
|     err = exc.value.response["Error"]
 | |
|     assert err["Code"] == "EntityDoesNotExistException"
 | |
|     assert f"Directory {random_directory_id} does not exist" in err["Message"]
 | |
| 
 | |
|     # Delete an existing directory.
 | |
|     ec2_client = boto3.client("ec2", region_name=TEST_REGION)
 | |
|     directory_id = create_test_directory(client, ec2_client)
 | |
|     result = client.delete_directory(DirectoryId=directory_id)
 | |
|     assert result["DirectoryId"] == directory_id
 | |
| 
 | |
|     # Verify there are no dictionaries, network interfaces or associated
 | |
|     # security groups.
 | |
|     result = client.describe_directories()
 | |
|     assert len(result["DirectoryDescriptions"]) == 0
 | |
|     result = ec2_client.describe_network_interfaces()
 | |
|     assert len(result["NetworkInterfaces"]) == 0
 | |
|     result = ec2_client.describe_security_groups()
 | |
|     for group in result["SecurityGroups"]:
 | |
|         assert "directory controllers" not in group["Description"]
 | |
| 
 | |
|     # Attempt to delete a non-existent directory.
 | |
|     nonexistent_id = f"d-{get_random_hex(10)}"
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.delete_directory(DirectoryId=nonexistent_id)
 | |
|     err = exc.value.response["Error"]
 | |
|     assert err["Code"] == "EntityDoesNotExistException"
 | |
|     assert f"Directory {nonexistent_id} does not exist" in err["Message"]
 | |
| 
 | |
|     # Attempt to use an invalid directory ID.
 | |
|     bad_id = get_random_hex(3)
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.delete_directory(DirectoryId=bad_id)
 | |
|     err = exc.value.response["Error"]
 | |
|     assert err["Code"] == "ValidationException"
 | |
|     assert "1 validation error detected" in err["Message"]
 | |
|     assert (
 | |
|         f"Value '{bad_id}' at 'directoryId' failed to satisfy constraint: "
 | |
|         f"Member must satisfy regular expression pattern: ^d-[0-9a-f]{{10}}$"
 | |
|     ) in err["Message"]
 | |
| 
 | |
| 
 | |
| @mock_ec2
 | |
| @mock_ds
 | |
| def test_ds_get_directory_limits():
 | |
|     """Test return value for directory limits."""
 | |
|     client = boto3.client("ds", region_name=TEST_REGION)
 | |
|     ec2_client = boto3.client("ec2", region_name=TEST_REGION)
 | |
| 
 | |
|     limits = client.get_directory_limits()["DirectoryLimits"]
 | |
|     assert limits["CloudOnlyDirectoriesCurrentCount"] == 0
 | |
|     assert limits["CloudOnlyDirectoriesLimit"] > 0
 | |
|     assert not limits["CloudOnlyDirectoriesLimitReached"]
 | |
| 
 | |
|     # Create a bunch of directories and verify the current count has been
 | |
|     # updated.
 | |
|     for _ in range(limits["CloudOnlyDirectoriesLimit"]):
 | |
|         create_test_directory(client, ec2_client)
 | |
|     limits = client.get_directory_limits()["DirectoryLimits"]
 | |
|     assert (
 | |
|         limits["CloudOnlyDirectoriesLimit"]
 | |
|         == limits["CloudOnlyDirectoriesCurrentCount"]
 | |
|     )
 | |
|     assert limits["CloudOnlyDirectoriesLimitReached"]
 | |
|     assert not limits["CloudOnlyMicrosoftADCurrentCount"]
 | |
|     assert not limits["ConnectedDirectoriesCurrentCount"]
 | |
| 
 | |
| 
 | |
| @mock_ec2
 | |
| @mock_ds
 | |
| def test_ds_describe_directories():
 | |
|     """Test good and bad invocations of describe_directories()."""
 | |
|     client = boto3.client("ds", region_name=TEST_REGION)
 | |
|     ec2_client = boto3.client("ec2", region_name=TEST_REGION)
 | |
| 
 | |
|     expected_ids = set()
 | |
|     limit = 10
 | |
|     for _ in range(limit):
 | |
|         expected_ids.add(create_test_directory(client, ec2_client))
 | |
| 
 | |
|     # Test that if no directory IDs are specified, all are returned.
 | |
|     result = client.describe_directories()
 | |
|     directories = result["DirectoryDescriptions"]
 | |
|     directory_ids = [x["DirectoryId"] for x in directories]
 | |
| 
 | |
|     assert len(directories) == limit
 | |
|     assert set(directory_ids) == expected_ids
 | |
|     for idx, dir_info in enumerate(directories):
 | |
|         assert dir_info["DesiredNumberOfDomainControllers"] == 0
 | |
|         assert not dir_info["SsoEnabled"]
 | |
|         assert dir_info["DirectoryId"] == directory_ids[idx]
 | |
|         assert dir_info["Name"].startswith("test-")
 | |
|         assert dir_info["Size"] == "Large"
 | |
|         assert dir_info["Alias"] == directory_ids[idx]
 | |
|         assert dir_info["AccessUrl"] == f"{directory_ids[idx]}.awsapps.com"
 | |
|         assert dir_info["Stage"] == "Active"
 | |
|         assert dir_info["LaunchTime"] <= datetime.now(timezone.utc)
 | |
|         assert dir_info["StageLastUpdatedDateTime"] <= datetime.now(timezone.utc)
 | |
|         assert dir_info["Type"] == "SimpleAD"
 | |
|         assert dir_info["VpcSettings"]["VpcId"].startswith("vpc-")
 | |
|         assert len(dir_info["VpcSettings"]["SubnetIds"]) == 2
 | |
|         assert dir_info["VpcSettings"]["SecurityGroupId"].startswith("sg-")
 | |
|         assert len(dir_info["DnsIpAddrs"]) == 2
 | |
|     assert "NextToken" not in result
 | |
| 
 | |
|     # Test with a specific directory ID.
 | |
|     result = client.describe_directories(DirectoryIds=[directory_ids[5]])
 | |
|     assert len(result["DirectoryDescriptions"]) == 1
 | |
|     assert result["DirectoryDescriptions"][0]["DirectoryId"] == directory_ids[5]
 | |
| 
 | |
|     # Test with a bad directory ID.
 | |
|     bad_id = get_random_hex(3)
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.describe_directories(DirectoryIds=[bad_id])
 | |
|     err = exc.value.response["Error"]
 | |
|     assert err["Code"] == "ValidationException"
 | |
|     assert (
 | |
|         f"Value '{bad_id}' at 'directoryId' failed to satisfy constraint: "
 | |
|         f"Member must satisfy regular expression pattern: ^d-[0-9a-f]{{10}}$"
 | |
|     ) in err["Message"]
 | |
| 
 | |
|     # Test with an invalid next token.
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.describe_directories(NextToken="bogus")
 | |
|     err = exc.value.response["Error"]
 | |
|     assert err["Code"] == "InvalidNextTokenException"
 | |
|     assert "Invalid value passed for the NextToken parameter" in err["Message"]
 | |
| 
 | |
|     # Test with a limit.
 | |
|     result = client.describe_directories(Limit=5)
 | |
|     assert len(result["DirectoryDescriptions"]) == 5
 | |
|     directories = result["DirectoryDescriptions"]
 | |
|     for idx in range(5):
 | |
|         assert directories[idx]["DirectoryId"] == directory_ids[idx]
 | |
|     assert result["NextToken"]
 | |
| 
 | |
|     result = client.describe_directories(Limit=1, NextToken=result["NextToken"])
 | |
|     assert len(result["DirectoryDescriptions"]) == 1
 | |
|     assert result["DirectoryDescriptions"][0]["DirectoryId"] == directory_ids[5]
 | |
| 
 | |
| 
 | |
| @mock_ec2
 | |
| @mock_ds
 | |
| def test_ds_create_alias():
 | |
|     """Test good and bad invocations of create_alias()."""
 | |
|     client = boto3.client("ds", region_name=TEST_REGION)
 | |
|     ec2_client = boto3.client("ec2", region_name=TEST_REGION)
 | |
| 
 | |
|     # Create a directory we can test against.
 | |
|     directory_id = create_test_directory(client, ec2_client)
 | |
| 
 | |
|     # Bad format.
 | |
|     bad_alias = f"d-{get_random_hex(10)}"
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.create_alias(DirectoryId=directory_id, Alias=bad_alias)
 | |
|     err = exc.value.response["Error"]
 | |
|     assert err["Code"] == "ValidationException"
 | |
|     assert (
 | |
|         rf"Value '{bad_alias}' at 'alias' failed to satisfy constraint: "
 | |
|         rf"Member must satisfy regular expression pattern: "
 | |
|         rf"^(?!D-|d-)([\da-zA-Z]+)([-]*[\da-zA-Z])*$"
 | |
|     ) in err["Message"]
 | |
| 
 | |
|     # Too long.
 | |
|     bad_alias = f"d-{get_random_hex(62)}"
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.create_alias(DirectoryId=directory_id, Alias=bad_alias)
 | |
|     err = exc.value.response["Error"]
 | |
|     assert err["Code"] == "ValidationException"
 | |
|     assert (
 | |
|         f"Value '{bad_alias}' at 'alias' failed to satisfy constraint: "
 | |
|         f"Member must have length less than or equal to 62"
 | |
|     ) in err["Message"]
 | |
| 
 | |
|     # Just right.
 | |
|     good_alias = f"{get_random_hex(10)}"
 | |
|     result = client.create_alias(DirectoryId=directory_id, Alias=good_alias)
 | |
|     assert result["DirectoryId"] == directory_id
 | |
|     assert result["Alias"] == good_alias
 | |
|     result = client.describe_directories()
 | |
|     directory = result["DirectoryDescriptions"][0]
 | |
|     assert directory["Alias"] == good_alias
 | |
|     assert directory["AccessUrl"] == f"{good_alias}.awsapps.com"
 | |
| 
 | |
|     # Attempt to create another alias for the same directory.
 | |
|     another_good_alias = f"{get_random_hex(10)}"
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.create_alias(DirectoryId=directory_id, Alias=another_good_alias)
 | |
|     err = exc.value.response["Error"]
 | |
|     assert err["Code"] == "InvalidParameterException"
 | |
|     assert (
 | |
|         "The directory in the request already has an alias. That alias must "
 | |
|         "be deleted before a new alias can be created."
 | |
|     ) in err["Message"]
 | |
| 
 | |
|     # Create a second directory we can test against.
 | |
|     directory_id2 = create_test_directory(client, ec2_client)
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.create_alias(DirectoryId=directory_id2, Alias=good_alias)
 | |
|     err = exc.value.response["Error"]
 | |
|     assert err["Code"] == "EntityAlreadyExistsException"
 | |
|     assert f"Alias '{good_alias}' already exists." in err["Message"]
 | |
| 
 | |
| 
 | |
| @mock_ec2
 | |
| @mock_ds
 | |
| def test_ds_enable_sso():
 | |
|     """Test good and bad invocations of enable_sso()."""
 | |
|     client = boto3.client("ds", region_name=TEST_REGION)
 | |
|     ec2_client = boto3.client("ec2", region_name=TEST_REGION)
 | |
| 
 | |
|     # Create a directory we can test against.
 | |
|     directory_id = create_test_directory(client, ec2_client)
 | |
| 
 | |
|     # Need an alias before setting SSO.
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.enable_sso(DirectoryId=directory_id)
 | |
|     err = exc.value.response["Error"]
 | |
|     assert err["Code"] == "ClientException"
 | |
|     assert (
 | |
|         f"An alias is required before enabling SSO. DomainId={directory_id}"
 | |
|     ) in err["Message"]
 | |
| 
 | |
|     # Add the alias to continue testing.
 | |
|     client.create_alias(DirectoryId=directory_id, Alias="anything-goes")
 | |
| 
 | |
|     # Password must be less than 128 chars in length.
 | |
|     good_username = "test"
 | |
|     bad_password = f"bad_password{get_random_hex(128)}"
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.enable_sso(
 | |
|             DirectoryId=directory_id, UserName=good_username, Password=bad_password
 | |
|         )
 | |
|     err = exc.value.response["Error"]
 | |
|     assert err["Code"] == "ValidationException"
 | |
|     assert (
 | |
|         "Value at 'ssoPassword' failed to satisfy constraint: Member must "
 | |
|         "have length less than or equal to 128"
 | |
|     ) in err["Message"]
 | |
| 
 | |
|     # Username has constraints.
 | |
|     bad_username = "@test"
 | |
|     good_password = "password"
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.enable_sso(
 | |
|             DirectoryId=directory_id, UserName=bad_username, Password=good_password
 | |
|         )
 | |
|     err = exc.value.response["Error"]
 | |
|     assert err["Code"] == "ValidationException"
 | |
|     assert (
 | |
|         rf"Value '{bad_username}' at 'userName' failed to satisfy constraint: "
 | |
|         rf"Member must satisfy regular expression pattern: ^[a-zA-Z0-9._-]+$"
 | |
|     ) in err["Message"]
 | |
| 
 | |
|     # Valid execution.
 | |
|     client.enable_sso(DirectoryId=directory_id)
 | |
|     result = client.describe_directories()
 | |
|     directory = result["DirectoryDescriptions"][0]
 | |
|     assert directory["SsoEnabled"]
 | |
| 
 | |
| 
 | |
| @mock_ec2
 | |
| @mock_ds
 | |
| def test_ds_disable_sso():
 | |
|     """Test good and bad invocations of disable_sso()."""
 | |
|     client = boto3.client("ds", region_name=TEST_REGION)
 | |
|     ec2_client = boto3.client("ec2", region_name=TEST_REGION)
 | |
| 
 | |
|     # Create a directory we can test against.
 | |
|     directory_id = create_test_directory(client, ec2_client)
 | |
| 
 | |
|     # Password must be less than 128 chars in length.
 | |
|     good_username = "test"
 | |
|     bad_password = f"bad_password{get_random_hex(128)}"
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.disable_sso(
 | |
|             DirectoryId=directory_id, UserName=good_username, Password=bad_password
 | |
|         )
 | |
|     err = exc.value.response["Error"]
 | |
|     assert err["Code"] == "ValidationException"
 | |
|     assert (
 | |
|         "Value at 'ssoPassword' failed to satisfy constraint: Member must "
 | |
|         "have length less than or equal to 128"
 | |
|     ) in err["Message"]
 | |
| 
 | |
|     # Username has constraints.
 | |
|     bad_username = "@test"
 | |
|     good_password = "password"
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.disable_sso(
 | |
|             DirectoryId=directory_id, UserName=bad_username, Password=good_password
 | |
|         )
 | |
|     err = exc.value.response["Error"]
 | |
|     assert err["Code"] == "ValidationException"
 | |
|     assert (
 | |
|         rf"Value '{bad_username}' at 'userName' failed to satisfy constraint: "
 | |
|         rf"Member must satisfy regular expression pattern: ^[a-zA-Z0-9._-]+$"
 | |
|     ) in err["Message"]
 | |
| 
 | |
|     # Valid execution.  First enable SSO, as the default is disabled SSO.
 | |
|     client.create_alias(DirectoryId=directory_id, Alias="anything-goes")
 | |
|     client.enable_sso(DirectoryId=directory_id)
 | |
|     client.disable_sso(DirectoryId=directory_id)
 | |
|     result = client.describe_directories()
 | |
|     directory = result["DirectoryDescriptions"][0]
 | |
|     assert not directory["SsoEnabled"]
 |