Feature DirectoryService support (#4483)

This commit is contained in:
kbalk 2021-10-30 06:09:44 -04:00 committed by GitHub
parent 03c170e206
commit e1298e334b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1269 additions and 90 deletions

View File

@ -914,6 +914,74 @@
- [ ] test_connection
</details>
## ds
<details>
<summary>10% implemented</summary>
- [ ] accept_shared_directory
- [ ] add_ip_routes
- [ ] add_region
- [X] add_tags_to_resource
- [ ] cancel_schema_extension
- [ ] connect_directory
- [ ] create_alias
- [ ] create_computer
- [ ] create_conditional_forwarder
- [X] create_directory
- [ ] create_log_subscription
- [ ] create_microsoft_ad
- [ ] create_snapshot
- [ ] create_trust
- [ ] delete_conditional_forwarder
- [X] delete_directory
- [ ] delete_log_subscription
- [ ] delete_snapshot
- [ ] delete_trust
- [ ] deregister_certificate
- [ ] deregister_event_topic
- [ ] describe_certificate
- [ ] describe_client_authentication_settings
- [ ] describe_conditional_forwarders
- [X] describe_directories
- [ ] describe_domain_controllers
- [ ] describe_event_topics
- [ ] describe_ldaps_settings
- [ ] describe_regions
- [ ] describe_shared_directories
- [ ] describe_snapshots
- [ ] describe_trusts
- [ ] disable_client_authentication
- [ ] disable_ldaps
- [ ] disable_radius
- [ ] disable_sso
- [ ] enable_client_authentication
- [ ] enable_ldaps
- [ ] enable_radius
- [ ] enable_sso
- [X] get_directory_limits
- [ ] get_snapshot_limits
- [ ] list_certificates
- [ ] list_ip_routes
- [ ] list_log_subscriptions
- [ ] list_schema_extensions
- [X] list_tags_for_resource
- [ ] register_certificate
- [ ] register_event_topic
- [ ] reject_shared_directory
- [ ] remove_ip_routes
- [ ] remove_region
- [X] remove_tags_from_resource
- [ ] reset_user_password
- [ ] restore_from_snapshot
- [ ] share_directory
- [ ] start_schema_extension
- [ ] unshare_directory
- [ ] update_conditional_forwarder
- [ ] update_number_of_domain_controllers
- [ ] update_radius
- [ ] update_trust
- [ ] verify_trust
</details>
## dynamodb
<details>
<summary>56% implemented</summary>
@ -4526,7 +4594,6 @@
- discovery
- dlm
- docdb
- ds
- ebs
- ecr-public
- elastic-inference

View File

@ -40,6 +40,8 @@ Currently implemented Services:
+---------------------------+-----------------------+------------------------------------+
| Data Pipeline | @mock_datapipeline | basic endpoints done |
+---------------------------+-----------------------+------------------------------------+
| Directory Service | @mock_ds | basic endpoints done |
+---------------------------+-----------------------+------------------------------------+
| DMS | @mock_dms | basic endpoints done |
+---------------------------+-----------------------+------------------------------------+
| DynamoDB | - @mock_dynamodb | - core endpoints done |

View File

@ -52,6 +52,7 @@ mock_datapipeline_deprecated = lazy_load(
)
mock_datasync = lazy_load(".datasync", "mock_datasync")
mock_dms = lazy_load(".dms", "mock_dms")
mock_ds = lazy_load(".ds", "mock_ds", boto3_name="ds")
mock_dynamodb = lazy_load(".dynamodb", "mock_dynamodb")
mock_dynamodb_deprecated = lazy_load(".dynamodb", "mock_dynamodb_deprecated")
mock_dynamodb2 = lazy_load(".dynamodb2", "mock_dynamodb2", backend="dynamodb_backends2")

View File

@ -25,6 +25,7 @@ backend_url_patterns = [
("datapipeline", re.compile("https?://datapipeline\\.(.+)\\.amazonaws\\.com")),
("datasync", re.compile("https?://(.*\\.)?(datasync)\\.(.+)\\.amazonaws.com")),
("dms", re.compile("https?://dms\\.(.+)\\.amazonaws\\.com")),
("ds", re.compile("https?://ds\\.(.+)\\.amazonaws\\.com")),
("dynamodb", re.compile("https?://dynamodb\\.(.+)\\.amazonaws\\.com")),
("dynamodb2", re.compile("https?://dynamodb\\.(.+)\\.amazonaws\\.com")),
(

5
moto/ds/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""ds module initialization; sets value for base decorator."""
from .models import ds_backends
from ..core.models import base_decorator
mock_ds = base_decorator(ds_backends)

96
moto/ds/exceptions.py Normal file
View File

@ -0,0 +1,96 @@
"""Exceptions raised by the Directory Service service."""
from moto.core.exceptions import JsonRESTError
class DsValidationException(JsonRESTError):
"""Report one of more parameter validation errors."""
code = 400
def __init__(self, error_tuples):
"""Validation errors are concatenated into one exception message.
error_tuples is a list of tuples. Each tuple contains:
- name of invalid parameter,
- value of invalid parameter,
- string describing the constraints for that parameter.
"""
msg_leader = (
f"{len(error_tuples)} "
f"validation error{'s' if len(error_tuples) > 1 else ''} detected: "
)
msgs = []
for arg_name, arg_value, constraint in error_tuples:
value = "at" if arg_name == "password" else f"'{arg_value}' at"
msgs.append(
f"Value {value} '{arg_name}' failed to satisfy constraint: "
f"Member must {constraint}"
)
super().__init__("ValidationException", msg_leader + "; ".join(msgs))
class ClientException(JsonRESTError):
"""Client exception has occurred. VPC parameters are invalid."""
code = 400
def __init__(self, message):
super().__init__("ClientException", message)
class DirectoryLimitExceededException(JsonRESTError):
"""Maximum number of directories in region has been reached."""
code = 400
def __init__(self, message):
super().__init__("DirectoryLimitExceededException", message)
class EntityDoesNotExistException(JsonRESTError):
"""The specified entity could not be found."""
code = 400
def __init__(self, message):
super().__init__("EntityDoesNotExistException", message)
class InvalidNextTokenException(JsonRESTError):
"""Invalid next token parameter used to return a list of entities."""
code = 400
def __init__(self):
super().__init__(
"InvalidNextTokenException",
"Invalid value passed for the NextToken parameter",
)
class InvalidParameterException(JsonRESTError):
"""Invalid parameter."""
code = 400
def __init__(self, message):
super().__init__("InvalidParameterException", message)
class TagLimitExceededException(JsonRESTError):
"""The maximum allowed number of tags was exceeded."""
code = 400
def __init__(self, message):
super().__init__("TagLimitExceededException", message)
class ValidationException(JsonRESTError):
"""Tag validation failed."""
code = 400
def __init__(self, message):
super().__init__("ValidationException", message)

346
moto/ds/models.py Normal file
View File

@ -0,0 +1,346 @@
"""DirectoryServiceBackend class with methods for supported APIs."""
from datetime import datetime, timezone
import re
from boto3 import Session
from moto.core import BaseBackend, BaseModel
from moto.core.utils import get_random_hex
from moto.ds.exceptions import (
ClientException,
DirectoryLimitExceededException,
EntityDoesNotExistException,
DsValidationException,
InvalidParameterException,
TagLimitExceededException,
ValidationException,
)
from moto.ec2.exceptions import InvalidSubnetIdError
from moto.utilities.paginator import paginate
from moto.utilities.tagging_service import TaggingService
from .utils import PAGINATION_MODEL
class Directory(BaseModel): # pylint: disable=too-many-instance-attributes
"""Representation of a Simple AD Directory."""
# The assumption here is that the limits are the same for all regions.
CLOUDONLY_DIRECTORIES_LIMIT = 10
CLOUDONLY_MICROSOFT_AD_LIMIT = 20
CONNECTED_DIRECTORIES_LIMIT = 10
MAX_TAGS_PER_DIRECTORY = 50
def __init__(
self,
name,
password,
size,
vpc_settings,
directory_type,
short_name=None,
description=None,
): # pylint: disable=too-many-arguments
self.name = name
self.password = password
self.size = size
self.vpc_settings = vpc_settings
self.directory_type = directory_type
self.short_name = short_name
self.description = description
# Calculated or default values for the directory attributes.
self.directory_id = f"d-{get_random_hex(10)}"
self.access_url = f"{self.directory_id}.awsapps.com"
self.alias = self.directory_id
self.desired_number_of_domain_controllers = 0
self.sso_enabled = False
self.stage = "Active"
self.launch_time = datetime.now(timezone.utc).isoformat()
self.stage_last_updated_date_time = datetime.now(timezone.utc).isoformat()
def to_json(self):
"""Convert the attributes into json with CamelCase tags."""
replacement_keys = {"directory_type": "Type"}
exclude_items = ["password"]
json_result = {}
for item, value in self.__dict__.items():
# Discard empty strings, but allow values set to False or zero.
if value == "" or item in exclude_items:
continue
if item in replacement_keys:
json_result[replacement_keys[item]] = value
else:
parts = item.split("_")
new_tag = "".join(x.title() for x in parts)
json_result[new_tag] = value
return json_result
class DirectoryServiceBackend(BaseBackend):
"""Implementation of DirectoryService APIs."""
def __init__(self, region_name=None):
self.region_name = region_name
self.directories = {}
self.tagger = TaggingService()
def reset(self):
"""Re-initialize all attributes for this instance."""
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
@staticmethod
def default_vpc_endpoint_service(service_region, zones):
"""List of dicts representing default VPC endpoints for this service."""
return BaseBackend.default_vpc_endpoint_service_factory(
service_region, zones, "ds"
)
@staticmethod
def _validate_create_directory_args(
name, passwd, size, vpc_settings, description, short_name,
): # pylint: disable=too-many-arguments
"""Raise exception if create_directory() args don't meet constraints.
The error messages are accumulated before the exception is raised.
"""
error_tuples = []
passwd_pattern = (
r"(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[^A-Za-z0-9\s])(?=.*[a-z])|"
r"(?=.*[^A-Za-z0-9\s])(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*"
)
if not re.match(passwd_pattern, passwd):
# Can't have an odd number of backslashes in a literal.
json_pattern = passwd_pattern.replace("\\", r"\\")
error_tuples.append(
(
"password",
passwd,
fr"satisfy regular expression pattern: {json_pattern}",
)
)
if size.lower() not in ["small", "large"]:
error_tuples.append(
("size", size, "satisfy enum value set: [Small, Large]")
)
name_pattern = r"^([a-zA-Z0-9]+[\\.-])+([a-zA-Z0-9])+$"
if not re.match(name_pattern, name):
error_tuples.append(
("name", name, fr"satisfy regular expression pattern: {name_pattern}")
)
subnet_id_pattern = r"^(subnet-[0-9a-f]{8}|subnet-[0-9a-f]{17})$"
for subnet in vpc_settings["SubnetIds"]:
if not re.match(subnet_id_pattern, subnet):
error_tuples.append(
(
"vpcSettings.subnetIds",
subnet,
fr"satisfy regular expression pattern: {subnet_id_pattern}",
)
)
if description and len(description) > 128:
error_tuples.append(
("description", description, "have length less than or equal to 128")
)
short_name_pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$'
if short_name and not re.match(short_name_pattern, short_name):
json_pattern = short_name_pattern.replace("\\", r"\\").replace('"', r"\"")
error_tuples.append(
(
"shortName",
short_name,
fr"satisfy regular expression pattern: {json_pattern}",
)
)
if error_tuples:
raise DsValidationException(error_tuples)
@staticmethod
def _validate_vpc_setting_values(region, vpc_settings):
"""Raise exception if vpc_settings are invalid.
If settings are valid, add AvailabilityZones to vpc_settings.
"""
if len(vpc_settings["SubnetIds"]) != 2:
raise InvalidParameterException(
"Invalid subnet ID(s). They must correspond to two subnets "
"in different Availability Zones."
)
from moto.ec2 import ec2_backends # pylint: disable=import-outside-toplevel
# Subnet IDs are checked before the VPC ID. The Subnet IDs must
# be valid and in different availability zones.
try:
subnets = ec2_backends[region].get_all_subnets(
subnet_ids=vpc_settings["SubnetIds"]
)
except InvalidSubnetIdError as exc:
raise InvalidParameterException(
"Invalid subnet ID(s). They must correspond to two subnets "
"in different Availability Zones."
) from exc
regions = [subnet.availability_zone for subnet in subnets]
if regions[0] == regions[1]:
raise ClientException(
"Invalid subnet ID(s). The two subnets must be in "
"different Availability Zones."
)
vpcs = ec2_backends[region].describe_vpcs()
if vpc_settings["VpcId"] not in [x.id for x in vpcs]:
raise ClientException("Invalid VPC ID.")
vpc_settings["AvailabilityZones"] = regions
def create_directory(
self, region, name, short_name, password, description, size, vpc_settings, tags
): # pylint: disable=too-many-arguments
"""Create a fake Simple Ad Directory."""
if len(self.directories) > Directory.CLOUDONLY_DIRECTORIES_LIMIT:
raise DirectoryLimitExceededException(
f"Directory limit exceeded. A maximum of "
f"{Directory.CLOUDONLY_DIRECTORIES_LIMIT} directories may be created"
)
# botocore doesn't look for missing vpc_settings, but boto3 does.
if not vpc_settings:
raise InvalidParameterException("VpcSettings must be specified.")
self._validate_create_directory_args(
name, password, size, vpc_settings, description, short_name,
)
self._validate_vpc_setting_values(region, vpc_settings)
errmsg = self.tagger.validate_tags(tags or [])
if errmsg:
raise ValidationException(errmsg)
if len(tags) > Directory.MAX_TAGS_PER_DIRECTORY:
raise DirectoryLimitExceededException("Tag Limit is exceeding")
directory = Directory(
name,
password,
size,
vpc_settings,
directory_type="SimpleAD",
short_name=short_name,
description=description,
)
self.directories[directory.directory_id] = directory
self.tagger.tag_resource(directory.directory_id, tags or [])
return directory.directory_id
def _validate_directory_id(self, directory_id):
"""Raise an exception if the directory id is invalid or unknown."""
# Validation of ID takes precedence over a check for its existence.
id_pattern = r"^d-[0-9a-f]{10}$"
if not re.match(id_pattern, directory_id):
raise DsValidationException(
[
(
"directoryId",
directory_id,
fr"satisfy regular expression pattern: {id_pattern}",
)
]
)
if directory_id not in self.directories:
raise EntityDoesNotExistException(
f"Directory {directory_id} does not exist"
)
def delete_directory(self, directory_id):
"""Delete directory with the matching ID."""
self._validate_directory_id(directory_id)
self.tagger.delete_all_tags_for_resource(directory_id)
self.directories.pop(directory_id)
return directory_id
@paginate(pagination_model=PAGINATION_MODEL)
def describe_directories(
self, directory_ids=None, next_token=None, limit=0
): # pylint: disable=unused-argument
"""Return info on all directories or directories with matching IDs."""
for directory_id in directory_ids or self.directories:
self._validate_directory_id(directory_id)
directories = list(self.directories.values())
if directory_ids:
directories = [x for x in directories if x.directory_id in directory_ids]
return sorted(directories, key=lambda x: x.launch_time)
def get_directory_limits(self):
"""Return hard-coded limits for the directories."""
counts = {"SimpleAD": 0, "MicrosoftAD": 0, "ConnectedAD": 0}
for directory in self.directories.values():
if directory.directory_type == "SimpleAD":
counts["SimpleAD"] += 1
elif directory.directory_type in ["MicrosoftAD", "SharedMicrosoftAD"]:
counts["MicrosoftAD"] += 1
elif directory.directory_type == "ADConnector":
counts["ConnectedAD"] += 1
return {
"CloudOnlyDirectoriesLimit": Directory.CLOUDONLY_DIRECTORIES_LIMIT,
"CloudOnlyDirectoriesCurrentCount": counts["SimpleAD"],
"CloudOnlyDirectoriesLimitReached": counts["SimpleAD"]
== Directory.CLOUDONLY_DIRECTORIES_LIMIT,
"CloudOnlyMicrosoftADLimit": Directory.CLOUDONLY_MICROSOFT_AD_LIMIT,
"CloudOnlyMicrosoftADCurrentCount": counts["MicrosoftAD"],
"CloudOnlyMicrosoftADLimitReached": counts["MicrosoftAD"]
== Directory.CLOUDONLY_MICROSOFT_AD_LIMIT,
"ConnectedDirectoriesLimit": Directory.CONNECTED_DIRECTORIES_LIMIT,
"ConnectedDirectoriesCurrentCount": counts["ConnectedAD"],
"ConnectedDirectoriesLimitReached": counts["ConnectedAD"]
== Directory.CONNECTED_DIRECTORIES_LIMIT,
}
def add_tags_to_resource(self, resource_id, tags):
"""Add or overwrite one or more tags for specified directory."""
self._validate_directory_id(resource_id)
errmsg = self.tagger.validate_tags(tags)
if errmsg:
raise ValidationException(errmsg)
if len(tags) > Directory.MAX_TAGS_PER_DIRECTORY:
raise TagLimitExceededException("Tag limit exceeded")
self.tagger.tag_resource(resource_id, tags)
def remove_tags_from_resource(self, resource_id, tag_keys):
"""Removes tags from a directory."""
self._validate_directory_id(resource_id)
self.tagger.untag_resource_using_names(resource_id, tag_keys)
@paginate(pagination_model=PAGINATION_MODEL)
def list_tags_for_resource(
self, resource_id, next_token=None, limit=None,
): # pylint: disable=unused-argument
"""List all tags on a directory."""
self._validate_directory_id(resource_id)
return self.tagger.list_tags_for_resource(resource_id).get("Tags")
ds_backends = {}
for available_region in Session().get_available_regions("ds"):
ds_backends[available_region] = DirectoryServiceBackend(available_region)
for available_region in Session().get_available_regions(
"ds", partition_name="aws-us-gov"
):
ds_backends[available_region] = DirectoryServiceBackend(available_region)
for available_region in Session().get_available_regions("ds", partition_name="aws-cn"):
ds_backends[available_region] = DirectoryServiceBackend(available_region)

98
moto/ds/responses.py Normal file
View File

@ -0,0 +1,98 @@
"""Handles Directory Service requests, invokes methods, returns responses."""
import json
from moto.core.exceptions import InvalidToken
from moto.core.responses import BaseResponse
from moto.ds.exceptions import InvalidNextTokenException
from moto.ds.models import ds_backends
class DirectoryServiceResponse(BaseResponse):
"""Handler for DirectoryService requests and responses."""
@property
def ds_backend(self):
"""Return backend instance specific for this region."""
return ds_backends[self.region]
def create_directory(self):
"""Create a Simple AD directory."""
name = self._get_param("Name")
short_name = self._get_param("ShortName")
password = self._get_param("Password")
description = self._get_param("Description")
size = self._get_param("Size")
vpc_settings = self._get_param("VpcSettings")
tags = self._get_param("Tags")
directory_id = self.ds_backend.create_directory(
region=self.region,
name=name,
short_name=short_name,
password=password,
description=description,
size=size,
vpc_settings=vpc_settings,
tags=tags,
)
return json.dumps({"DirectoryId": directory_id})
def delete_directory(self):
"""Delete a Directory Service directory."""
directory_id_arg = self._get_param("DirectoryId")
directory_id = self.ds_backend.delete_directory(directory_id_arg)
return json.dumps({"DirectoryId": directory_id})
def describe_directories(self):
"""Return directory info for the given IDs or all IDs."""
directory_ids = self._get_param("DirectoryIds")
next_token = self._get_param("NextToken")
limit = self._get_int_param("Limit")
try:
(descriptions, next_token) = self.ds_backend.describe_directories(
directory_ids, next_token=next_token, limit=limit
)
except InvalidToken as exc:
raise InvalidNextTokenException() from exc
response = {"DirectoryDescriptions": [x.to_json() for x in descriptions]}
if next_token:
response["NextToken"] = next_token
return json.dumps(response)
def get_directory_limits(self):
"""Return directory limit information for the current region."""
limits = self.ds_backend.get_directory_limits()
return json.dumps({"DirectoryLimits": limits})
def add_tags_to_resource(self):
"""Add or overwrite on or more tags for specified directory."""
resource_id = self._get_param("ResourceId")
tags = self._get_param("Tags")
self.ds_backend.add_tags_to_resource(resource_id=resource_id, tags=tags)
return ""
def remove_tags_from_resource(self):
"""Removes tags from a directory."""
resource_id = self._get_param("ResourceId")
tag_keys = self._get_param("TagKeys")
self.ds_backend.remove_tags_from_resource(
resource_id=resource_id, tag_keys=tag_keys
)
return ""
def list_tags_for_resource(self):
"""Lists all tags on a directory."""
resource_id = self._get_param("ResourceId")
next_token = self._get_param("NextToken")
limit = self._get_param("Limit")
try:
(tags, next_token) = self.ds_backend.list_tags_for_resource(
resource_id=resource_id, next_token=next_token, limit=limit
)
except InvalidToken as exc:
raise InvalidNextTokenException() from exc
response = {"Tags": tags}
if next_token:
response["NextToken"] = next_token
return json.dumps(response)

11
moto/ds/urls.py Normal file
View File

@ -0,0 +1,11 @@
"""ds base URL and path."""
from .responses import DirectoryServiceResponse
url_bases = [
r"https?://ds\.(.+)\.amazonaws\.com",
]
url_paths = {
"{0}/$": DirectoryServiceResponse.dispatch,
}

16
moto/ds/utils.py Normal file
View File

@ -0,0 +1,16 @@
"""Pagination control model for DirectoryService."""
PAGINATION_MODEL = {
"describe_directories": {
"input_token": "next_token",
"limit_key": "limit",
"limit_default": 100, # This should be the sum of the directory limits
"page_ending_range_keys": ["directory_id"],
},
"list_tags_for_resource": {
"input_token": "next_token",
"limit_key": "limit",
"limit_default": 50,
"page_ending_range_keys": ["Key"],
},
}

View File

@ -239,6 +239,13 @@ class FirehoseBackend(BaseBackend):
if errmsg:
raise ValidationException(errmsg)
if tags and len(tags) > MAX_TAGS_PER_DELIVERY_STREAM:
raise ValidationException(
f"1 validation error detected: Value '{tags}' at 'tags' "
f"failed to satisify contstraint: Member must have length "
f"less than or equal to {MAX_TAGS_PER_DELIVERY_STREAM}"
)
# Create a DeliveryStream instance that will be stored and indexed
# by delivery stream name. This instance will update the state and
# create the ARN.
@ -510,7 +517,7 @@ class FirehoseBackend(BaseBackend):
f"not found."
)
if len(tags) >= MAX_TAGS_PER_DELIVERY_STREAM:
if len(tags) > MAX_TAGS_PER_DELIVERY_STREAM:
raise ValidationException(
f"1 validation error detected: Value '{tags}' at 'tags' "
f"failed to satisify contstraint: Member must have length "

View File

@ -1,20 +1,23 @@
#!/usr/bin/env python
"""Generates template code and response body for specified boto3's operation.
You only have to select service and operation that you want to add.
To execute:
cd moto # top-level directory; script will not work from scripts dir
./scripts/scaffold.py
This script looks at the botocore's definition file of specified service and
operation, and auto-generates codes and reponses.
When prompted, select the service and operation that you want to add.
This script will look at the botocore's definition file for the selected
service and operation, then auto-generate the code and responses.
Basically, this script supports almost all services, as long as its
protocol is `query`, `json` or `rest-json`. Even if aws adds new
services, this script will work as long as the protocol is known.
Almost all services are supported, as long as the service's protocol is
`query`, `json` or `rest-json`. Even if aws adds new services, this script
will work if the protocol is known.
TODO:
- This script doesn't generates functions in `responses.py` for
`rest-json`. Someone will need to add this logic.
- In some services's operations, this script might crash. Create an
issue on github for the problem.
- This script doesn't generate functions in `responses.py` for
`rest-json`. That logic needs to be added.
- Some services's operations might cause this script to crash. If that
should happen, please create an issue for the problem.
"""
import os
import re
@ -43,7 +46,7 @@ OUTPUT_IGNORED_IN_BACKEND = ["NextMarker"]
def print_progress(title, body, color):
click.secho("\t{}\t".format(title), fg=color, nl=False)
click.secho(f"\t{title}\t", fg=color, nl=False)
click.echo(body)
@ -52,7 +55,7 @@ def select_service_and_operation():
service_completer = WordCompleter(service_names)
service_name = prompt("Select service: ", completer=service_completer)
if service_name not in service_names:
click.secho("{} is not valid service".format(service_name), fg="red")
click.secho(f"{service_name} is not valid service", fg="red")
raise click.Abort()
moto_client = get_moto_implementation(service_name)
real_client = boto3.client(service_name, region_name="us-east-1")
@ -72,16 +75,16 @@ def select_service_and_operation():
click.echo("==Current Implementation Status==")
for operation_name in operation_names:
check = "X" if operation_name in implemented else " "
click.secho("[{}] {}".format(check, operation_name))
click.secho(f"[{check}] {operation_name}")
click.echo("=================================")
operation_name = prompt("Select Operation: ", completer=operation_completer)
if operation_name not in operation_names:
click.secho("{} is not valid operation".format(operation_name), fg="red")
click.secho(f"{operation_name} is not valid operation", fg="red")
raise click.Abort()
if operation_name in implemented:
click.secho("{} is already implemented".format(operation_name), fg="red")
click.secho(f"{operation_name} is already implemented", fg="red")
raise click.Abort()
return service_name, operation_name
@ -95,7 +98,7 @@ def get_lib_dir(service):
def get_test_dir(service):
return os.path.join("tests", "test_{}".format(get_escaped_service(service)))
return os.path.join("tests", f"test_{get_escaped_service(service)}")
def render_template(tmpl_dir, tmpl_filename, context, service, alt_filename=None):
@ -123,17 +126,13 @@ def append_mock_to_init_py(service):
with open(path) as fhandle:
lines = [_.replace("\n", "") for _ in fhandle.readlines()]
if any(_ for _ in lines if re.match("^mock_{}.*lazy_load(.*)$".format(service), _)):
if any(_ for _ in lines if re.match(f"^mock_{service}.*lazy_load(.*)$", _)):
return
filtered_lines = [_ for _ in lines if re.match("^mock_.*lazy_load(.*)$", _)]
last_import_line_index = lines.index(filtered_lines[-1])
new_line = 'mock_{} = lazy_load(".{}", "mock_{}", boto3_name="{}")'.format(
get_escaped_service(service),
get_escaped_service(service),
get_escaped_service(service),
service,
)
escaped_service = get_escaped_service(service)
new_line = f'mock_{escaped_service} = lazy_load(".{escaped_service}", "mock_{escaped_service}", boto3_name="{service}")'
lines.insert(last_import_line_index + 1, new_line)
body = "\n".join(lines) + "\n"
@ -180,7 +179,7 @@ def initialize_service(service, api_protocol):
tmpl_dir = os.path.join(TEMPLATE_DIR, "test")
for tmpl_filename in os.listdir(tmpl_dir):
alt_filename = (
"test_{}.py".format(get_escaped_service(service))
f"test_{get_escaped_service(service)}.py"
if tmpl_filename == "test_service.py.j2"
else None
)
@ -213,6 +212,7 @@ def get_function_in_responses(service, operation, protocol):
You can see example of elbv2 from link below.
https://github.com/boto/botocore/blob/develop/botocore/data/elbv2/2015-12-01/service-2.json
"""
escaped_service = get_escaped_service(service)
client = boto3.client(service)
aws_operation_name = get_operation_name_in_keys(
@ -232,7 +232,7 @@ def get_function_in_responses(service, operation, protocol):
output_names = [
to_snake_case(_) for _ in outputs.keys() if _ not in OUTPUT_IGNORED_IN_BACKEND
]
body = "\ndef {}(self):\n".format(operation)
body = f"\ndef {operation}(self):\n"
for input_name, input_type in inputs.items():
type_name = input_type.type_name
@ -244,29 +244,21 @@ def get_function_in_responses(service, operation, protocol):
arg_line_tmpl = ' {}=self._get_param("{}")\n'
body += arg_line_tmpl.format(to_snake_case(input_name), input_name)
if output_names:
body += " {} = self.{}_backend.{}(\n".format(
", ".join(output_names), get_escaped_service(service), operation
)
body += f" {', '.join(output_names)} = self.{escaped_service}_backend.{operation}(\n"
else:
body += " self.{}_backend.{}(\n".format(
get_escaped_service(service), operation
)
body += f" self.{escaped_service}_backend.{operation}(\n"
for input_name in input_names:
body += f" {input_name}={input_name},\n"
body += " )\n"
if protocol == "query":
body += " template = self.response_template({}_TEMPLATE)\n".format(
operation.upper()
)
body += " return template.render({})\n".format(
", ".join([f"{n}={n}" for n in output_names])
)
body += f" template = self.response_template({operation.upper()}_TEMPLATE)\n"
names = ", ".join([f"{n}={n}" for n in output_names])
body += f" return template.render({names})\n"
elif protocol in ["json", "rest-json"]:
body += " # TODO: adjust response\n"
body += " return json.dumps(dict({}))\n".format(
", ".join(["{}={}".format(to_lower_camel_case(_), _) for _ in output_names])
)
names = ", ".join([f"{to_lower_camel_case(_)}={_}" for _ in output_names])
body += f" return json.dumps(dict({names}))\n"
return body
@ -293,11 +285,11 @@ def get_function_in_models(service, operation):
to_snake_case(_) for _ in outputs.keys() if _ not in OUTPUT_IGNORED_IN_BACKEND
]
if input_names:
body = "def {}(self, {}):\n".format(operation, ", ".join(input_names))
body = f"def {operation}(self, {', '.join(input_names)}):\n"
else:
body = "def {}(self)\n"
body += " # implement here\n"
body += " return {}\n\n".format(", ".join(output_names))
body += f" return {', '.join(output_names)}\n\n"
return body
@ -308,14 +300,15 @@ def _get_subtree(name, shape, replace_list, name_prefix=None):
class_name = shape.__class__.__name__
if class_name in ("StringShape", "Shape"):
tree = etree.Element(name)
tree = etree.Element(name) # pylint: disable=c-extension-no-member
if name_prefix:
tree.text = "{{ %s.%s }}" % (name_prefix[-1], to_snake_case(name))
tree.text = f"{{{{ {name_prefix[-1]}.{to_snake_case(name)} }}}}"
else:
tree.text = "{{ %s }}" % to_snake_case(name)
tree.text = f"{{{{ {to_snake_case(name)} }}}}"
return tree
if class_name in ("ListShape",):
# pylint: disable=c-extension-no-member
replace_list.append((name, name_prefix))
tree = etree.Element(name)
t_member = etree.Element("member")
@ -353,6 +346,7 @@ def get_response_query_template(service, operation):
xml_namespace = metadata["xmlNamespace"]
# build xml tree
# pylint: disable=c-extension-no-member
t_root = etree.Element(response_wrapper, xmlns=xml_namespace)
# build metadata
@ -376,25 +370,25 @@ def get_response_query_template(service, operation):
prefix = replace[1]
singular_name = singularize(name)
start_tag = "<%s>" % name
iter_name = "{}.{}".format(prefix[-1], name.lower()) if prefix else name.lower()
loop_start = "{%% for %s in %s %%}" % (singular_name.lower(), iter_name)
end_tag = "</%s>" % name
start_tag = f"<{name}>"
iter_name = f"{prefix[-1]}.{name.lower()}" if prefix else name.lower()
loop_start = f"{{%% for {singular_name.lower()} in {iter_name} %%}}"
end_tag = f"</{name}>"
loop_end = "{{ endfor }}"
start_tag_indexes = [i for i, l in enumerate(xml_body_lines) if start_tag in l]
if len(start_tag_indexes) != 1:
raise Exception("tag %s not found in response body" % start_tag)
raise Exception(f"tag {start_tag} not found in response body")
start_tag_index = start_tag_indexes[0]
xml_body_lines.insert(start_tag_index + 1, loop_start)
end_tag_indexes = [i for i, l in enumerate(xml_body_lines) if end_tag in l]
if len(end_tag_indexes) != 1:
raise Exception("tag %s not found in response body" % end_tag)
raise Exception(f"tag {end_tag} not found in response body")
end_tag_index = end_tag_indexes[0]
xml_body_lines.insert(end_tag_index, loop_end)
xml_body = "\n".join(xml_body_lines)
body = '\n{}_TEMPLATE = """{}"""'.format(operation.upper(), xml_body)
body = f'\n{operation.upper()}_TEMPLATE = """{xml_body}"""'
return body
@ -454,9 +448,9 @@ def insert_url(service, operation, api_protocol):
# generate url pattern
if api_protocol == "rest-json":
new_line = " '{0}/.*$': response.dispatch,"
new_line = ' "{0}/.*$": response.dispatch,'
else:
new_line = " '{0}%s$': %sResponse.dispatch," % (uri, service_class)
new_line = f' "{{0}}{uri}$": {service_class}Response.dispatch,'
if new_line in lines:
return
lines.insert(last_elem_line_index + 1, new_line)
@ -467,10 +461,11 @@ def insert_url(service, operation, api_protocol):
def insert_codes(service, operation, api_protocol):
escaped_service = get_escaped_service(service)
func_in_responses = get_function_in_responses(service, operation, api_protocol)
func_in_models = get_function_in_models(service, operation)
# edit responses.py
responses_path = "moto/{}/responses.py".format(get_escaped_service(service))
responses_path = f"moto/{escaped_service}/responses.py"
print_progress("inserting code", responses_path, "green")
insert_code_to_class(responses_path, BaseResponse, func_in_responses)
@ -484,7 +479,7 @@ def insert_codes(service, operation, api_protocol):
fhandle.write("\n".join(lines))
# edit models.py
models_path = "moto/{}/models.py".format(get_escaped_service(service))
models_path = f"moto/{escaped_service}/models.py"
print_progress("inserting code", models_path, "green")
insert_code_to_class(models_path, BaseBackend, func_in_models)
@ -503,15 +498,15 @@ def main():
else:
print_progress(
"skip inserting code",
'api protocol "{}" is not supported'.format(api_protocol),
f'api protocol "{api_protocol}" is not supported',
"yellow",
)
click.echo(
'Remaining setup:\n'
"Remaining setup:\n"
'- Add the mock into "docs/index.rst",\n'
'- Add the mock into "IMPLEMENTATION_COVERAGE.md",\n'
'- Run scripts/update_backend_index.py.'
"- Run scripts/update_backend_index.py."
)

View File

@ -40,7 +40,7 @@ install_requires = [
"MarkupSafe!=2.0.0a1", # This is a Jinja2 dependency, 2.0.0a1 currently seems broken
"Jinja2>=2.10.1",
"more-itertools",
"importlib_metadata ; python_version < '3.8'"
"importlib_metadata ; python_version < '3.8'",
]
_dep_PyYAML = "PyYAML>=5.1"
@ -72,31 +72,38 @@ all_extra_deps = [
all_server_deps = all_extra_deps + ["flask", "flask-cors"]
extras_per_service = {}
for service_name in [service[5:] for service in dir(service_list) if service.startswith("mock_")]:
for service_name in [
service[5:] for service in dir(service_list) if service.startswith("mock_")
]:
extras_per_service[service_name] = []
extras_per_service.update(
{
"apigateway": [_dep_python_jose, _dep_python_jose_ecdsa_pin],
"awslambda": [_dep_docker],
"batch": [_dep_docker],
"cloudformation": [_dep_docker, _dep_PyYAML, _dep_cfn_lint],
"cognitoidp": [_dep_python_jose, _dep_python_jose_ecdsa_pin],
"ec2": [_dep_sshpubkeys],
"iotdata": [_dep_jsondiff],
"s3": [_dep_PyYAML],
"ses": [],
"sns": [],
"sqs": [],
"ssm": [_dep_PyYAML, _dep_dataclasses],
# XRay module uses pkg_resources, but doesn't have an explicit dependency listed
# This should be fixed in the next version: https://github.com/aws/aws-xray-sdk-python/issues/305
"xray": [_dep_aws_xray_sdk, _setuptools],
})
{
"apigateway": [_dep_python_jose, _dep_python_jose_ecdsa_pin],
"awslambda": [_dep_docker],
"batch": [_dep_docker],
"cloudformation": [_dep_docker, _dep_PyYAML, _dep_cfn_lint],
"cognitoidp": [_dep_python_jose, _dep_python_jose_ecdsa_pin],
"ec2": [_dep_sshpubkeys],
"iotdata": [_dep_jsondiff],
"s3": [_dep_PyYAML],
"ses": [],
"sns": [],
"sqs": [],
"ssm": [_dep_PyYAML, _dep_dataclasses],
# XRay module uses pkg_resources, but doesn't have an explicit
# dependency listed. This should be fixed in the next version:
# https://github.com/aws/aws-xray-sdk-python/issues/305
"xray": [_dep_aws_xray_sdk, _setuptools],
}
)
# When a Table has a Stream, we'll always need to import AWSLambda to search for a corresponding function to send the table data to
extras_per_service["dynamodb2"] = extras_per_service["awslambda"]
extras_per_service["dynamodbstreams"] = extras_per_service["awslambda"]
# EFS depends on EC2 to find subnets etc
extras_per_service["efs"] = extras_per_service["ec2"]
# DirectoryService needs EC2 to verify VPCs and subnets.
extras_per_service["ds"] = extras_per_service["ec2"]
extras_require = {
"all": all_extra_deps,
"server": all_server_deps,
@ -115,11 +122,7 @@ setup(
author="Steve Pulec",
author_email="spulec@gmail.com",
url="https://github.com/spulec/moto",
entry_points={
"console_scripts": [
"moto_server = moto.server:main",
],
},
entry_points={"console_scripts": ["moto_server = moto.server:main"]},
packages=find_packages(exclude=("tests", "tests.*")),
install_requires=install_requires,
extras_require=extras_require,
@ -136,7 +139,5 @@ setup(
"License :: OSI Approved :: Apache Software License",
"Topic :: Software Development :: Testing",
],
project_urls={
"Documentation": "http://docs.getmoto.org/en/latest/",
},
project_urls={"Documentation": "http://docs.getmoto.org/en/latest/"},
)

View File

152
tests/test_ds/test_ds.py Normal file
View File

@ -0,0 +1,152 @@
"""Directory-related unit tests common to different directory types.
Simple AD directories are used for test data, but the operations are
common to the other directory types.
"""
from datetime import datetime, timezone
import boto3
from botocore.exceptions import ClientError
import pytest
from moto import mock_ds
from moto.core.utils import get_random_hex
from moto.ec2 import mock_ec2
from .test_ds_simple_ad_directory import create_test_directory, TEST_REGION
@mock_ec2
@mock_ds
def test_ds_delete_directory():
"""Test good and bad invocations of delete_directory()."""
client = boto3.client("ds", region_name=TEST_REGION)
# Delete a directory when there are none.
random_directory_id = f"d-{get_random_hex(10)}"
with pytest.raises(ClientError) as exc:
client.delete_directory(DirectoryId=random_directory_id)
err = exc.value.response["Error"]
assert err["Code"] == "EntityDoesNotExistException"
assert f"Directory {random_directory_id} does not exist" in err["Message"]
# Delete an existing directory.
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
directory_id = create_test_directory(client, ec2_client)
result = client.delete_directory(DirectoryId=directory_id)
assert result["DirectoryId"] == directory_id
# Attempt to delete a non-existent directory.
nonexistent_id = f"d-{get_random_hex(10)}"
with pytest.raises(ClientError) as exc:
client.delete_directory(DirectoryId=nonexistent_id)
err = exc.value.response["Error"]
assert err["Code"] == "EntityDoesNotExistException"
assert f"Directory {nonexistent_id} does not exist" in err["Message"]
# Attempt to use an invalid directory ID.
bad_id = get_random_hex(3)
with pytest.raises(ClientError) as exc:
client.delete_directory(DirectoryId=bad_id)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
f"Value '{bad_id}' at 'directoryId' failed to satisfy constraint: "
f"Member must satisfy regular expression pattern: ^d-[0-9a-f]{{10}}$"
) in err["Message"]
@mock_ec2
@mock_ds
def test_ds_get_directory_limits():
"""Test return value for directory limits."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
limits = client.get_directory_limits()["DirectoryLimits"]
assert limits["CloudOnlyDirectoriesCurrentCount"] == 0
assert limits["CloudOnlyDirectoriesLimit"] > 0
assert not limits["CloudOnlyDirectoriesLimitReached"]
# Create a bunch of directories and verify the current count has been
# updated.
for _ in range(limits["CloudOnlyDirectoriesLimit"]):
create_test_directory(client, ec2_client)
limits = client.get_directory_limits()["DirectoryLimits"]
assert (
limits["CloudOnlyDirectoriesLimit"]
== limits["CloudOnlyDirectoriesCurrentCount"]
)
assert limits["CloudOnlyDirectoriesLimitReached"]
@mock_ec2
@mock_ds
def test_ds_describe_directories():
"""Test good and bad invocations of describe_directories()."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
expected_ids = set()
limit = 10
for _ in range(limit):
expected_ids.add(create_test_directory(client, ec2_client))
# Test that if no directory IDs are specified, all are returned.
result = client.describe_directories()
directories = result["DirectoryDescriptions"]
directory_ids = [x["DirectoryId"] for x in directories]
assert len(directories) == limit
assert set(directory_ids) == expected_ids
for idx, dir_info in enumerate(directories):
assert dir_info["DesiredNumberOfDomainControllers"] == 0
assert not dir_info["SsoEnabled"]
assert dir_info["DirectoryId"] == directory_ids[idx]
assert dir_info["Name"].startswith("test-")
assert dir_info["Size"] == "Large"
assert dir_info["Alias"] == directory_ids[idx]
assert dir_info["AccessUrl"] == f"{directory_ids[idx]}.awsapps.com"
assert dir_info["Stage"] == "Active"
assert dir_info["LaunchTime"] <= datetime.now(timezone.utc)
assert dir_info["StageLastUpdatedDateTime"] <= datetime.now(timezone.utc)
assert dir_info["Type"] == "SimpleAD"
assert dir_info["VpcSettings"]["VpcId"].startswith("vpc-")
assert len(dir_info["VpcSettings"]["SubnetIds"]) == 2
assert "NextToken" not in result
# Test with a specific directory ID.
result = client.describe_directories(DirectoryIds=[directory_ids[5]])
assert len(result["DirectoryDescriptions"]) == 1
assert result["DirectoryDescriptions"][0]["DirectoryId"] == directory_ids[5]
# Test with a bad directory ID.
bad_id = get_random_hex(3)
with pytest.raises(ClientError) as exc:
client.describe_directories(DirectoryIds=[bad_id])
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
f"Value '{bad_id}' at 'directoryId' failed to satisfy constraint: "
f"Member must satisfy regular expression pattern: ^d-[0-9a-f]{{10}}$"
) in err["Message"]
# Test with an invalid next token.
with pytest.raises(ClientError) as exc:
client.describe_directories(NextToken="bogus")
err = exc.value.response["Error"]
assert err["Code"] == "InvalidNextTokenException"
assert "Invalid value passed for the NextToken parameter" in err["Message"]
# Test with a limit.
result = client.describe_directories(Limit=5)
assert len(result["DirectoryDescriptions"]) == 5
directories = result["DirectoryDescriptions"]
for idx in range(5):
assert directories[idx]["DirectoryId"] == directory_ids[idx]
assert result["NextToken"]
result = client.describe_directories(Limit=1, NextToken=result["NextToken"])
assert len(result["DirectoryDescriptions"]) == 1
assert result["DirectoryDescriptions"][0]["DirectoryId"] == directory_ids[5]

View File

@ -0,0 +1,246 @@
"""Directory-related unit tests for Simple AD Directory Services."""
import boto3
from botocore.exceptions import ClientError
import pytest
from moto import mock_ds
from moto import settings
from moto.core.utils import get_random_hex
from moto.ec2 import mock_ec2
TEST_REGION = "us-east-1" if settings.TEST_SERVER_MODE else "us-west-2"
def create_vpc(ec2_client):
"""Return the ID for a valid VPC."""
return ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]["VpcId"]
def create_subnets(
ec2_client, vpc_id, region1=TEST_REGION + "a", region2=TEST_REGION + "b"
):
"""Return list of two subnets IDs."""
subnet_ids = []
for cidr_block, region in [("10.0.1.0/24", region1), ("10.0.0.0/24", region2)]:
subnet_ids.append(
ec2_client.create_subnet(
VpcId=vpc_id, CidrBlock=cidr_block, AvailabilityZone=region,
)["Subnet"]["SubnetId"]
)
return subnet_ids
def create_test_directory(ds_client, ec2_client, vpc_settings=None, tags=None):
"""Return ID of a newly created valid directory."""
if not vpc_settings:
good_vpc_id = create_vpc(ec2_client)
good_subnet_ids = create_subnets(ec2_client, good_vpc_id)
vpc_settings = {"VpcId": good_vpc_id, "SubnetIds": good_subnet_ids}
if not tags:
tags = []
result = ds_client.create_directory(
Name=f"test-{get_random_hex(6)}.test",
Password="Password4TheAges",
Size="Large",
VpcSettings=vpc_settings,
Tags=tags,
)
return result["DirectoryId"]
@mock_ds
def test_ds_create_directory_validations():
"""Test validation errs that aren't caught by botocore."""
client = boto3.client("ds", region_name=TEST_REGION)
random_num = get_random_hex(6)
# Verify ValidationException error messages are accumulated properly.
bad_name = f"bad_name_{random_num}"
bad_password = "bad_password"
bad_size = "big"
ok_vpc_settings = {
"VpcId": f"vpc-{random_num}",
"SubnetIds": [f"subnet-{random_num}01", f"subnet-{random_num}02"],
}
with pytest.raises(ClientError) as exc:
client.create_directory(
Name=bad_name,
Password=bad_password,
Size=bad_size,
VpcSettings=ok_vpc_settings,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "3 validation errors detected" in err["Message"]
assert (
r"Value at 'password' failed to satisfy constraint: "
r"Member must satisfy regular expression pattern: "
r"(?=^.{8,64}$)((?=.*\d)(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[^A-Za-z0-9\s])(?=.*[a-z])|"
r"(?=.*[^A-Za-z0-9\s])(?=.*[A-Z])(?=.*[a-z])|"
r"(?=.*\d)(?=.*[A-Z])(?=.*[^A-Za-z0-9\s]))^.*;" in err["Message"]
)
assert (
f"Value '{bad_size}' at 'size' failed to satisfy constraint: "
f"Member must satisfy enum value set: [Small, Large];" in err["Message"]
)
assert (
fr"Value '{bad_name}' at 'name' failed to satisfy constraint: "
fr"Member must satisfy regular expression pattern: "
fr"^([a-zA-Z0-9]+[\.-])+([a-zA-Z0-9])+$" in err["Message"]
)
too_long = (
"Test of directory service 0123456789 0123456789 0123456789 "
"0123456789 0123456789 0123456789 0123456789 0123456789 0123456789 "
"0123456789 0123456789"
)
short_name = "a:b.c"
with pytest.raises(ClientError) as exc:
client.create_directory(
Name=f"test{random_num}.test",
Password="TESTfoobar1",
Size="Large",
VpcSettings=ok_vpc_settings,
Description=too_long,
ShortName=short_name,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "2 validation errors detected" in err["Message"]
assert (
f"Value '{too_long}' at 'description' failed to satisfy constraint: "
f"Member must have length less than or equal to 128" in err["Message"]
)
pattern = r'^[^\/:*?"<>|.]+[^\/:*?"<>|]*$'
assert (
f"Value '{short_name}' at 'shortName' failed to satisfy constraint: "
f"Member must satisfy regular expression pattern: " + pattern
) in err["Message"]
bad_vpc_settings = {"VpcId": f"vpc-{random_num}", "SubnetIds": ["foo"]}
with pytest.raises(ClientError) as exc:
client.create_directory(
Name=f"test{random_num}.test",
Password="TESTfoobar1",
Size="Large",
VpcSettings=bad_vpc_settings,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
fr"Value '{bad_vpc_settings['SubnetIds'][0]}' at "
fr"'vpcSettings.subnetIds' failed to satisfy constraint: "
fr"Member must satisfy regular expression pattern: "
fr"^(subnet-[0-9a-f]{{8}}|subnet-[0-9a-f]{{17}})$" in err["Message"]
)
@mock_ec2
@mock_ds
def test_ds_create_directory_bad_vpc_settings():
"""Test validation of bad vpc that doesn't raise ValidationException."""
client = boto3.client("ds", region_name=TEST_REGION)
# Error if no VpcSettings argument.
with pytest.raises(ClientError) as exc:
client.create_directory(
Name=f"test-{get_random_hex(6)}.test", Password="TESTfoobar1", Size="Small",
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert "VpcSettings must be specified" in err["Message"]
# Error if VPC is bogus.
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
good_subnet_ids = create_subnets(ec2_client, create_vpc(ec2_client))
with pytest.raises(ClientError) as exc:
create_test_directory(
client, ec2_client, {"VpcId": "vpc-12345678", "SubnetIds": good_subnet_ids},
)
err = exc.value.response["Error"]
assert err["Code"] == "ClientException"
assert "Invalid VPC ID" in err["Message"]
@mock_ec2
@mock_ds
def test_ds_create_directory_bad_subnets():
"""Test validation of VPC subnets."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Error if VPC subnets are bogus.
good_vpc_id = create_vpc(ec2_client)
with pytest.raises(ClientError) as exc:
create_test_directory(
client,
ec2_client,
{"VpcId": good_vpc_id, "SubnetIds": ["subnet-12345678", "subnet-87654321"]},
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert (
"Invalid subnet ID(s). They must correspond to two subnets in "
"different Availability Zones."
) in err["Message"]
# Error if both VPC subnets are in the same region.
subnets_same_region = create_subnets(
ec2_client, good_vpc_id, region1=TEST_REGION + "a", region2=TEST_REGION + "a"
)
with pytest.raises(ClientError) as exc:
create_test_directory(
client,
ec2_client,
{"VpcId": good_vpc_id, "SubnetIds": subnets_same_region},
)
err = exc.value.response["Error"]
assert err["Code"] == "ClientException"
assert (
"Invalid subnet ID(s). The two subnets must be in different "
"Availability Zones."
) in err["Message"]
ec2_client.delete_subnet(SubnetId=subnets_same_region[0])
ec2_client.delete_subnet(SubnetId=subnets_same_region[1])
# Error if only one VPC subnet.
good_subnet_ids = create_subnets(ec2_client, good_vpc_id)
with pytest.raises(ClientError) as exc:
create_test_directory(
client,
ec2_client,
{"VpcId": good_vpc_id, "SubnetIds": [good_subnet_ids[0]]},
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert "Invalid subnet ID(s). They must correspond to two subnets" in err["Message"]
@mock_ec2
@mock_ds
def test_ds_create_directory_good_args():
"""Test creation of AD directory using good arguments."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Verify a good call to create_directory()
directory_id = create_test_directory(client, ec2_client)
assert directory_id.startswith("d-")
# Verify that too many directories can't be created.
limits = client.get_directory_limits()["DirectoryLimits"]
for _ in range(limits["CloudOnlyDirectoriesLimit"]):
create_test_directory(client, ec2_client)
with pytest.raises(ClientError) as exc:
create_test_directory(client, ec2_client)
err = exc.value.response["Error"]
assert err["Code"] == "DirectoryLimitExceededException"
assert (
f"Directory limit exceeded. A maximum of "
f"{limits['CloudOnlyDirectoriesLimit']} "
f"directories may be created" in err["Message"]
)

View File

@ -0,0 +1,133 @@
"""Directory-related unit tests focusing on tag-related functionality
Simple AD directories are used for test data, but the operations are
common to the other directory types.
"""
import boto3
from botocore.exceptions import ClientError
import pytest
from moto import mock_ds
from moto.ds.models import Directory
from moto.ec2 import mock_ec2
from .test_ds_simple_ad_directory import create_test_directory, TEST_REGION
@mock_ec2
@mock_ds
def test_ds_add_tags_to_resource():
"""Test the addition of tags to a resource."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
directory_id = create_test_directory(client, ec2_client)
# Unknown directory ID.
bad_id = "d-0123456789"
with pytest.raises(ClientError) as exc:
client.add_tags_to_resource(
ResourceId=bad_id, Tags=[{"Key": "foo", "Value": "bar"}]
)
err = exc.value.response["Error"]
assert err["Code"] == "EntityDoesNotExistException"
assert f"Directory {bad_id} does not exist" in err["Message"]
# Too many tags.
tags = [
{"Key": f"{x}", "Value": f"{x}"}
for x in range(Directory.MAX_TAGS_PER_DIRECTORY + 1)
]
with pytest.raises(ClientError) as exc:
client.add_tags_to_resource(ResourceId=directory_id, Tags=tags)
err = exc.value.response["Error"]
assert err["Code"] == "TagLimitExceededException"
assert "Tag limit exceeded" in err["Message"]
# Bad tags.
with pytest.raises(ClientError) as exc:
client.add_tags_to_resource(
ResourceId=directory_id, Tags=[{"Key": "foo!", "Value": "bar"}],
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
"1 validation error detected: Value 'foo!' at 'tags.1.member.key' "
"failed to satisfy constraint: Member must satisfy regular "
"expression pattern"
) in err["Message"]
# Successful addition of tags.
added_tags = [{"Key": f"{x}", "Value": f"{x}"} for x in range(10)]
client.add_tags_to_resource(ResourceId=directory_id, Tags=added_tags)
result = client.list_tags_for_resource(ResourceId=directory_id)
assert len(result["Tags"]) == 10
assert result["Tags"] == added_tags
@mock_ec2
@mock_ds
def test_ds_remove_tags_from_resource():
"""Test the removal of tags to a resource."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Create a directory for testing purposes.
tag_list = [
{"Key": "one", "Value": "1"},
{"Key": "two", "Value": "2"},
{"Key": "three", "Value": "3"},
]
directory_id = create_test_directory(client, ec2_client, tags=tag_list)
# Untag all of the tags. Verify there are no more tags.
client.remove_tags_from_resource(
ResourceId=directory_id, TagKeys=[x["Key"] for x in tag_list]
)
result = client.list_tags_for_resource(ResourceId=directory_id)
assert not result["Tags"]
assert "NextToken" not in result
@mock_ec2
@mock_ds
def test_ds_list_tags_for_resource():
"""Test ability to list all tags for a resource."""
client = boto3.client("ds", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Create a directory to work with.
tags = [
{"Key": f"{x}_k", "Value": f"{x}_v"}
for x in range(1, Directory.MAX_TAGS_PER_DIRECTORY + 1)
]
directory_id = create_test_directory(client, ec2_client, tags=tags)
# Verify limit and next token works.
result = client.list_tags_for_resource(ResourceId=directory_id, Limit=1)
assert len(result["Tags"]) == 1
assert result["Tags"] == [{"Key": "1_k", "Value": "1_v"}]
assert result["NextToken"]
result = client.list_tags_for_resource(
ResourceId=directory_id, Limit=10, NextToken=result["NextToken"]
)
assert len(result["Tags"]) == 10
assert result["Tags"] == [
{"Key": f"{x}_k", "Value": f"{x}_v"} for x in range(2, 12)
]
assert result["NextToken"]
# Bad directory ID.
bad_id = "d-0123456789"
with pytest.raises(ClientError) as exc:
client.list_tags_for_resource(ResourceId=bad_id)
err = exc.value.response["Error"]
assert err["Code"] == "EntityDoesNotExistException"
assert f"Directory {bad_id} does not exist" in err["Message"]
# Bad next token.
with pytest.raises(ClientError) as exc:
client.list_tags_for_resource(ResourceId=directory_id, NextToken="foo")
err = exc.value.response["Error"]
assert err["Code"] == "InvalidNextTokenException"
assert "Invalid value passed for the NextToken parameter" in err["Message"]

View File

@ -125,10 +125,12 @@ def test_tag_delivery_stream():
) in err["Message"]
# Successful addition of tags.
added_tags = [{"Key": f"{x}", "Value": f"{x}"} for x in range(10)]
added_tags = [
{"Key": f"{x}", "Value": f"{x}"} for x in range(MAX_TAGS_PER_DELIVERY_STREAM)
]
client.tag_delivery_stream(DeliveryStreamName=stream_name, Tags=added_tags)
results = client.list_tags_for_delivery_stream(DeliveryStreamName=stream_name)
assert len(results["Tags"]) == 10
assert len(results["Tags"]) == MAX_TAGS_PER_DELIVERY_STREAM
assert results["Tags"] == added_tags