DirectoryService: Create security group and ENI (#4588)

This commit is contained in:
kbalk 2021-11-18 05:57:44 -05:00 committed by GitHub
parent 0e6157011f
commit 5c44a8945d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 92 additions and 101 deletions

View File

@ -22,7 +22,7 @@ class DsValidationException(JsonRESTError):
)
msgs = []
for arg_name, arg_value, constraint in error_tuples:
value = "at" if arg_name == "password" else f"'{arg_value}' at"
value = "at" if "assword" in arg_name else f"'{arg_value}' at"
msgs.append(
f"Value {value} '{arg_name}' failed to satisfy constraint: "
f"Member must {constraint}"

View File

@ -1,6 +1,5 @@
"""DirectoryServiceBackend class with methods for supported APIs."""
from datetime import datetime, timezone
import ipaddress
from boto3 import Session
@ -16,22 +15,9 @@ from moto.ds.exceptions import (
ValidationException,
)
from moto.ds.utils import PAGINATION_MODEL
from moto.ds.validations import (
validate_args,
validate_alias,
validate_description,
validate_directory_id,
validate_dns_ips,
validate_edition,
validate_name,
validate_password,
validate_short_name,
validate_size,
validate_sso_password,
validate_subnet_ids,
validate_user_name,
)
from moto.ds.validations import validate_args
from moto.ec2.exceptions import InvalidSubnetIdError
from moto.ec2 import ec2_backends
from moto.utilities.paginator import paginate
from moto.utilities.tagging_service import TaggingService
@ -62,10 +48,10 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes
def __init__(
self,
region,
name,
password,
directory_type,
subnets,
size=None,
vpc_settings=None,
connect_settings=None,
@ -73,6 +59,7 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes
description=None,
edition=None,
): # pylint: disable=too-many-arguments
self.region = region
self.name = name
self.password = password
self.directory_type = directory_type
@ -92,26 +79,43 @@ class Directory(BaseModel): # pylint: disable=too-many-instance-attributes
self.stage = "Active"
self.launch_time = datetime.now(timezone.utc).isoformat()
self.stage_last_updated_date_time = datetime.now(timezone.utc).isoformat()
# Create a security group and ENI, returning the IPs for the ENI.
subnet_ips = self.create_eni()
if directory_type != "ADConnector":
self.dns_ip_addrs = self.subnet_ips(subnets)
if self.directory_type != "ADConnector":
self.dns_ip_addrs = subnet_ips
else:
self.dns_ip_addrs = self.connect_settings["CustomerDnsIps"]
self.connect_settings["ConnectIps"] = self.subnet_ips(subnets)
self.connect_settings["ConnectIps"] = subnet_ips
@staticmethod
def subnet_ips(subnets):
"""Return an IP from each of the given subnets.
def create_eni(self):
"""Return IP addrs after creating an ENI for each subnet."""
if self.vpc_settings:
vpc_id = self.vpc_settings["VpcId"]
subnet_ids = self.vpc_settings["SubnetIds"]
else:
vpc_id = self.connect_settings["VpcId"]
subnet_ids = self.connect_settings["SubnetIds"]
# Need a security group for the ENI.
security_group = ec2_backends[self.region].create_security_group(
name=f"{self.directory_id}_controllers",
description=(
f"AWS created security group for {self.directory_id} "
f"directory controllers"
),
vpc_id=vpc_id,
)
This is a bit dodgey and may need to be reworked at a later time.
"""
ip_addrs = []
for subnet in subnets:
ips = ipaddress.ip_network(subnet.cidr_block)
# Not sure if the following could occur, but if it does,
# the situation will be ignored.
if ips:
ip_addrs.append(str(ips[1]) if ips.num_addresses > 1 else str(ips[0]))
for subnet_id in subnet_ids:
eni_info = ec2_backends[self.region].create_network_interface(
subnet=subnet_id,
private_ip_address=None,
group_ids=[security_group.id],
description=f"AWS created network interface for {self.directory_id}",
)
ip_addrs.append(eni_info.private_ip_address)
return ip_addrs
def update_alias(self, alias):
@ -167,8 +171,8 @@ class DirectoryServiceBackend(BaseBackend):
)
@staticmethod
def _get_subnets(region, vpc_settings):
"""Return subnets if vpc_settings are invalid, else raise an exception.
def _verify_subnets(region, vpc_settings):
"""Verify subnets are valid, else raise an exception.
If settings are valid, add AvailabilityZones to vpc_settings.
"""
@ -178,8 +182,6 @@ class DirectoryServiceBackend(BaseBackend):
"in different Availability Zones."
)
from moto.ec2 import ec2_backends # pylint: disable=import-outside-toplevel
# Subnet IDs are checked before the VPC ID. The Subnet IDs must
# be valid and in different availability zones.
try:
@ -202,9 +204,7 @@ class DirectoryServiceBackend(BaseBackend):
vpcs = ec2_backends[region].describe_vpcs()
if vpc_settings["VpcId"] not in [x.id for x in vpcs]:
raise ClientException("Invalid VPC ID.")
vpc_settings["AvailabilityZones"] = regions
return subnets
def connect_directory(
self,
@ -226,30 +226,24 @@ class DirectoryServiceBackend(BaseBackend):
validate_args(
[
(validate_password, "password", password),
(validate_size, "size", size),
(validate_name, "name", name),
(validate_description, "description", description),
(validate_short_name, "shortName", short_name),
("password", password),
("size", size),
("name", name),
("description", description),
("shortName", short_name),
(
validate_subnet_ids,
"connectSettings.vpcSettings.subnetIds",
connect_settings["SubnetIds"],
),
(
validate_user_name,
"connectSettings.customerUserName",
connect_settings["CustomerUserName"],
),
(
validate_dns_ips,
"connectSettings.customerDnsIps",
connect_settings["CustomerDnsIps"],
),
("connectSettings.customerDnsIps", connect_settings["CustomerDnsIps"]),
]
)
# ConnectSettings and VpcSettings both have a VpcId and Subnets.
subnets = self._get_subnets(region, connect_settings)
self._verify_subnets(region, connect_settings)
errmsg = self.tagger.validate_tags(tags or [])
if errmsg:
@ -258,10 +252,10 @@ class DirectoryServiceBackend(BaseBackend):
raise DirectoryLimitExceededException("Tag Limit is exceeding")
directory = Directory(
region,
name,
password,
"ADConnector",
subnets,
size=size,
connect_settings=connect_settings,
short_name=short_name,
@ -286,19 +280,15 @@ class DirectoryServiceBackend(BaseBackend):
raise InvalidParameterException("VpcSettings must be specified.")
validate_args(
[
(validate_password, "password", password),
(validate_size, "size", size),
(validate_name, "name", name),
(validate_description, "description", description),
(validate_short_name, "shortName", short_name),
(
validate_subnet_ids,
"vpcSettings.subnetIds",
vpc_settings["SubnetIds"],
),
("password", password),
("size", size),
("name", name),
("description", description),
("shortName", short_name),
("vpcSettings.subnetIds", vpc_settings["SubnetIds"]),
]
)
subnets = self._get_subnets(region, vpc_settings)
self._verify_subnets(region, vpc_settings)
errmsg = self.tagger.validate_tags(tags or [])
if errmsg:
@ -307,10 +297,10 @@ class DirectoryServiceBackend(BaseBackend):
raise DirectoryLimitExceededException("Tag Limit is exceeding")
directory = Directory(
region,
name,
password,
"SimpleAD",
subnets,
size=size,
vpc_settings=vpc_settings,
short_name=short_name,
@ -323,7 +313,7 @@ class DirectoryServiceBackend(BaseBackend):
def _validate_directory_id(self, directory_id):
"""Raise an exception if the directory id is invalid or unknown."""
# Validation of ID takes precedence over a check for its existence.
validate_args([(validate_directory_id, "directoryId", directory_id)])
validate_args([("directoryId", directory_id)])
if directory_id not in self.directories:
raise EntityDoesNotExistException(
f"Directory {directory_id} does not exist"
@ -345,8 +335,7 @@ class DirectoryServiceBackend(BaseBackend):
# Is the alias already in use?
if alias in [x.alias for x in self.directories.values()]:
raise EntityAlreadyExistsException(f"Alias '{alias}' already exists.")
validate_args([(validate_alias, "alias", alias)])
validate_args([("alias", alias)])
directory.update_alias(alias)
return {"DirectoryId": directory_id, "Alias": alias}
@ -372,19 +361,15 @@ class DirectoryServiceBackend(BaseBackend):
# boto3 looks for missing vpc_settings for create_microsoft_ad().
validate_args(
[
(validate_password, "password", password),
(validate_edition, "edition", edition),
(validate_name, "name", name),
(validate_description, "description", description),
(validate_short_name, "shortName", short_name),
(
validate_subnet_ids,
"vpcSettings.subnetIds",
vpc_settings["SubnetIds"],
),
("password", password),
("edition", edition),
("name", name),
("description", description),
("shortName", short_name),
("vpcSettings.subnetIds", vpc_settings["SubnetIds"]),
]
)
subnets = self._get_subnets(region, vpc_settings)
self._verify_subnets(region, vpc_settings)
errmsg = self.tagger.validate_tags(tags or [])
if errmsg:
@ -393,10 +378,10 @@ class DirectoryServiceBackend(BaseBackend):
raise DirectoryLimitExceededException("Tag Limit is exceeding")
directory = Directory(
region,
name,
password,
"MicrosoftAD",
subnets,
vpc_settings=vpc_settings,
short_name=short_name,
description=description,
@ -416,24 +401,14 @@ class DirectoryServiceBackend(BaseBackend):
def disable_sso(self, directory_id, username=None, password=None):
"""Disable single-sign on for a directory."""
self._validate_directory_id(directory_id)
validate_args(
[
(validate_sso_password, "password", password),
(validate_user_name, "userName", username),
]
)
validate_args([("ssoPassword", password), ("userName", username)])
directory = self.directories[directory_id]
directory.enable_sso(False)
def enable_sso(self, directory_id, username=None, password=None):
"""Enable single-sign on for a directory."""
self._validate_directory_id(directory_id)
validate_args(
[
(validate_sso_password, "password", password),
(validate_user_name, "userName", username),
]
)
validate_args([("ssoPassword", password), ("userName", username)])
directory = self.directories[directory_id]
if directory.alias == directory_id:

View File

@ -11,13 +11,31 @@ def validate_args(validators):
"""Raise exception if any of the validations fails.
validators is a list of tuples each containing the following:
(validator_function, printable field name, field value)
(printable field name, field value)
The error messages are accumulated before the exception is raised.
"""
validation_map = {
"alias": validate_alias,
"description": validate_description,
"directoryId": validate_directory_id,
"connectSettings.customerDnsIps": validate_dns_ips,
"edition": validate_edition,
"name": validate_name,
"password": validate_password,
"shortName": validate_short_name,
"size": validate_size,
"ssoPassword": validate_sso_password,
"connectSettings.vpcSettings.subnetIds": validate_subnet_ids,
"connectSettings.customerUserName": validate_user_name,
"userName": validate_user_name,
"vpcSettings.subnetIds": validate_subnet_ids,
}
err_msgs = []
for (func, fieldname, value) in validators:
msg = func(value)
# This eventually could be a switch (python 3.10), elminating the need
# for the above map and individual functions.
for (fieldname, value) in validators:
msg = validation_map[fieldname](value)
if msg:
err_msgs.append((fieldname, value, msg))
if err_msgs:

View File

@ -116,7 +116,7 @@ def test_ds_describe_directories():
assert dir_info["Type"] == "SimpleAD"
assert dir_info["VpcSettings"]["VpcId"].startswith("vpc-")
assert len(dir_info["VpcSettings"]["SubnetIds"]) == 2
assert set(dir_info["DnsIpAddrs"]) == set(["10.0.1.1", "10.0.0.1"])
assert len(dir_info["DnsIpAddrs"]) == 2
assert "NextToken" not in result
# Test with a specific directory ID.
@ -250,7 +250,7 @@ def test_ds_enable_sso():
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
"Value at 'password' failed to satisfy constraint: Member must "
"Value at 'ssoPassword' failed to satisfy constraint: Member must "
"have length less than or equal to 128"
) in err["Message"]
@ -295,7 +295,7 @@ def test_ds_disable_sso():
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
"Value at 'password' failed to satisfy constraint: Member must "
"Value at 'ssoPassword' failed to satisfy constraint: Member must "
"have length less than or equal to 128"
) in err["Message"]

View File

@ -251,9 +251,7 @@ def test_ds_connect_directory_describe():
assert directory["ConnectSettings"]["VpcId"].startswith("vpc-")
assert len(directory["ConnectSettings"]["SubnetIds"]) == 2
assert directory["ConnectSettings"]["CustomerUserName"] == "Admin"
assert set(directory["ConnectSettings"]["ConnectIps"]) == set(
["10.0.0.1", "10.0.1.1"]
)
assert len(directory["ConnectSettings"]["ConnectIps"]) == 2
assert directory["Size"] == "Small"
assert set(directory["DnsIpAddrs"]) == set(["1.2.3.4", "5.6.7.8"])
assert "NextToken" not in result

View File

@ -192,7 +192,7 @@ def test_ds_create_microsoft_ad_describe():
assert directory["VpcSettings"]["VpcId"].startswith("vpc-")
assert len(directory["VpcSettings"]["SubnetIds"]) == 2
assert directory["Edition"] == "Standard"
assert set(directory["DnsIpAddrs"]) == set(["10.0.1.1", "10.0.0.1"])
assert len(directory["DnsIpAddrs"]) == 2
assert "NextToken" not in result