From 8123e6d71f82ddda349af6f699fd7e577dd7d73d Mon Sep 17 00:00:00 2001 From: archinksagar <68829863+archinksagar@users.noreply.github.com> Date: Thu, 21 Mar 2024 06:30:00 -0400 Subject: [PATCH] feat: Workspaceapi (#7493) --- moto/backend_index.py | 1 + moto/backends.py | 4 + moto/resourcegroupstaggingapi/models.py | 56 +- moto/workspaces/__init__.py | 1 + moto/workspaces/exceptions.py | 30 + moto/workspaces/models.py | 613 ++++++++++++++++ moto/workspaces/responses.py | 179 +++++ moto/workspaces/urls.py | 10 + .../test_resourcegroupstagging_glue.py | 6 +- .../test_resourcegroupstaggingapi.py | 127 +++- tests/test_workspaces/__init__.py | 0 tests/test_workspaces/test_workspaces.py | 686 ++++++++++++++++++ 12 files changed, 1706 insertions(+), 7 deletions(-) create mode 100644 moto/workspaces/__init__.py create mode 100644 moto/workspaces/exceptions.py create mode 100644 moto/workspaces/models.py create mode 100644 moto/workspaces/responses.py create mode 100644 moto/workspaces/urls.py create mode 100644 tests/test_workspaces/__init__.py create mode 100644 tests/test_workspaces/test_workspaces.py diff --git a/moto/backend_index.py b/moto/backend_index.py index fd17dfc70..23bbae341 100644 --- a/moto/backend_index.py +++ b/moto/backend_index.py @@ -192,5 +192,6 @@ backend_url_patterns = [ ), ("transcribe", re.compile("https?://transcribe\\.(.+)\\.amazonaws\\.com")), ("wafv2", re.compile("https?://wafv2\\.(.+)\\.amazonaws.com")), + ("workspaces", re.compile("https?://workspaces\\.(.+)\\.amazonaws\\.com")), ("xray", re.compile("https?://xray\\.(.+)\\.amazonaws.com")), ] diff --git a/moto/backends.py b/moto/backends.py index 64179c190..9ebedd0e9 100644 --- a/moto/backends.py +++ b/moto/backends.py @@ -135,6 +135,7 @@ if TYPE_CHECKING: from moto.timestreamwrite.models import TimestreamWriteBackend from moto.transcribe.models import TranscribeBackend from moto.wafv2.models import WAFV2Backend + from moto.workspaces.models import WorkSpacesBackend from moto.xray.models import XRayBackend @@ -295,6 +296,7 @@ SERVICE_NAMES = Union[ "Literal['timestream-write']", "Literal['transcribe']", "Literal['wafv2']", + "Literal['workspaces']", "Literal['xray']", ] @@ -564,6 +566,8 @@ def get_backend(name: "Literal['transcribe']") -> "BackendDict[TranscribeBackend @overload def get_backend(name: "Literal['wafv2']") -> "BackendDict[WAFV2Backend]": ... @overload +def get_backend(name: "Literal['workspaces']") -> "BackendDict[WorkSpacesBackend]": ... +@overload def get_backend(name: "Literal['xray']") -> "BackendDict[XRayBackend]": ... # fmt: on diff --git a/moto/resourcegroupstaggingapi/models.py b/moto/resourcegroupstaggingapi/models.py index db3a0eb3c..fcdb8e489 100644 --- a/moto/resourcegroupstaggingapi/models.py +++ b/moto/resourcegroupstaggingapi/models.py @@ -23,9 +23,10 @@ from moto.s3.models import S3Backend, s3_backends from moto.sns.models import SNSBackend, sns_backends from moto.sqs.models import SQSBackend, sqs_backends from moto.ssm.models import SimpleSystemManagerBackend, ssm_backends +from moto.workspaces.models import WorkSpacesBackend, workspaces_backends from moto.utilities.tagging_service import TaggingService -# Left: EC2 ElastiCache RDS ELB CloudFront WorkSpaces Lambda EMR Glacier Kinesis Redshift Route53 +# Left: EC2 ElastiCache RDS ELB CloudFront Lambda EMR Glacier Kinesis Redshift Route53 # StorageGateway DynamoDB MachineLearning ACM DirectConnect DirectoryService CloudHSM # Inspector Elasticsearch @@ -120,6 +121,10 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): def dynamodb_backend(self) -> DynamoDBBackend: return dynamodb_backends[self.account_id][self.region_name] + @property + def workspaces_backend(self) -> WorkSpacesBackend: + return workspaces_backends[self.account_id][self.region_name] + def _get_resources_generator( self, tag_filters: Optional[List[Dict[str, Any]]] = None, @@ -533,6 +538,48 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): } + # Workspaces + if not resource_type_filters or "workspaces" in resource_type_filters: + for ws in self.workspaces_backend.workspaces.values(): + tags = format_tag_keys(ws.tags, ["Key", "Value"]) + if not tags or not tag_filter( + tags + ): # Skip if no tags, or invalid filter + continue + + yield { + "ResourceARN": f"arn:aws:workspaces:{self.region_name}:{self.account_id}:workspace/{ws.workspace_id}", + "Tags": tags, + } + + # Workspace Directories + if not resource_type_filters or "workspaces-directory" in resource_type_filters: + for wd in self.workspaces_backend.workspace_directories.values(): + tags = format_tag_keys(wd.tags, ["Key", "Value"]) + if not tags or not tag_filter( + tags + ): # Skip if no tags, or invalid filter + continue + + yield { + "ResourceARN": f"arn:aws:workspaces:{self.region_name}:{self.account_id}:directory/{wd.directory_id}", + "Tags": tags, + } + + # Workspace Images + if not resource_type_filters or "workspaces-image" in resource_type_filters: + for wi in self.workspaces_backend.workspace_images.values(): + tags = format_tag_keys(wi.tags, ["Key", "Value"]) + if not tags or not tag_filter( + tags + ): # Skip if no tags, or invalid filter + continue + + yield { + "ResourceARN": f"arn:aws:workspaces:{self.region_name}:{self.account_id}:workspaceimage/{wi.image_id}", + "Tags": tags, + } + # VPC if ( not resource_type_filters @@ -885,7 +932,12 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend): self.rds_backend.add_tags_to_resource( arn, TaggingService.convert_dict_to_tags_input(tags) ) - if arn.startswith("arn:aws:logs:"): + elif arn.startswith("arn:aws:workspaces:"): + resource_id = arn.split("/")[-1] + self.workspaces_backend.create_tags( + resource_id, TaggingService.convert_dict_to_tags_input(tags) + ) + elif arn.startswith("arn:aws:logs:"): self.logs_backend.tag_resource(arn, tags) if arn.startswith("arn:aws:dynamodb"): self.dynamodb_backend.tag_resource( diff --git a/moto/workspaces/__init__.py b/moto/workspaces/__init__.py new file mode 100644 index 000000000..e622cbb68 --- /dev/null +++ b/moto/workspaces/__init__.py @@ -0,0 +1 @@ +from .models import workspaces_backends # noqa: F401 diff --git a/moto/workspaces/exceptions.py b/moto/workspaces/exceptions.py new file mode 100644 index 000000000..299b32c46 --- /dev/null +++ b/moto/workspaces/exceptions.py @@ -0,0 +1,30 @@ +"""Exceptions raised by the workspaces service.""" +from moto.core.exceptions import JsonRESTError + + +class ValidationException(JsonRESTError): + code = 400 + + def __init__(self, message: str): + super().__init__("ValidationException", message) + + +class InvalidParameterValuesException(JsonRESTError): + code = 400 + + def __init__(self, message: str): + super().__init__("InvalidParameterValuesException", message) + + +class ResourceAlreadyExistsException(JsonRESTError): + code = 400 + + def __init__(self, message: str): + super().__init__("ResourceAlreadyExistsException", message) + + +class ResourceNotFoundException(JsonRESTError): + code = 400 + + def __init__(self, message: str): + super().__init__("ResourceNotFoundException", message) diff --git a/moto/workspaces/models.py b/moto/workspaces/models.py new file mode 100644 index 000000000..200b4b237 --- /dev/null +++ b/moto/workspaces/models.py @@ -0,0 +1,613 @@ +"""WorkSpacesBackend class with methods for supported APIs.""" +import re +from collections.abc import Mapping +from typing import Any, Dict, List, Optional, Tuple + +from moto.core.base_backend import BackendDict, BaseBackend +from moto.core.common_models import BaseModel +from moto.core.utils import unix_time +from moto.ds import ds_backends +from moto.ds.models import Directory +from moto.ec2 import ec2_backends +from moto.moto_api._internal import mock_random +from moto.workspaces.exceptions import ( + InvalidParameterValuesException, + ResourceAlreadyExistsException, + ResourceNotFoundException, + ValidationException, +) + + +class Workspace(BaseModel): + def __init__( + self, + workspace: Dict[str, Any], + running_mode: str, + error_code: str, + error_msg: str, + ): + self.workspace_properties: Dict[str, Any] + self.workspace = workspace + self.workspace_id = f"ws-{mock_random.get_random_hex(9)}" + # Create_workspaces operation is asynchronous and returns before the WorkSpaces are created. + # Initially the 'state' is 'PENDING', but here the 'state' will be set as 'AVAILABLE' since + # this operation is being mocked. + self.directory_id = workspace["DirectoryId"] + self.bundle_id = workspace["BundleId"] + self.user_name = workspace["UserName"] + self.state = "AVAILABLE" + self.error_message = error_msg or "" + self.error_code = error_code or "" + self.volume_encryption_key = workspace.get("VolumeEncryptionKey", "") + self.user_volume_encryption_enabled = workspace.get( + "UserVolumeEncryptionEnabled", "" + ) + self.root_volume_encryption_enabled = workspace.get( + "RootVolumeEncryptionEnabled", "" + ) + workspace_properties = {"RunningMode": running_mode} + self.workspace_properties = workspace.get("WorkspaceProperties", "") + + if self.workspace_properties: + self.workspace_properties["RunningMode"] = running_mode + else: + self.workspace_properties = workspace_properties + + self.computer_name = "" # Workspace Bundle + self.modification_states: List[ + Dict[str, str] + ] = [] # modify_workspace_properties + self.related_workspaces: List[Dict[str, str]] = [] # create_standy_workspace + self.data_replication_settings: Dict[str, Any] = {} + # The properties of the standby WorkSpace related to related_workspaces + self.standby_workspaces_properties: List[Dict[str, Any]] = [] + self.tags = workspace.get("Tags", []) + + def to_dict_pending(self) -> Dict[str, Any]: + dct = { + "WorkspaceId": self.workspace_id, + "DirectoryId": self.directory_id, + "UserName": self.user_name, + "IpAddress": "", # UnKnown + "State": self.state, + "BundleId": self.bundle_id, + "SubnetId": "", # UnKnown + "ErrorMessage": self.error_message, + "ErrorCode": self.error_code, + "ComputerName": self.computer_name, + "VolumeEncryptionKey": self.volume_encryption_key, + "UserVolumeEncryptionEnabled": self.user_volume_encryption_enabled, + "RootVolumeEncryptionEnabled": self.root_volume_encryption_enabled, + "WorkspaceProperties": self.workspace_properties, + "ModificationStates": self.modification_states, + "RelatedWorkspaces": self.related_workspaces, + "DataReplicationSettings": self.data_replication_settings, + "StandbyWorkspacesProperties": self.standby_workspaces_properties, + } + return {k: v for k, v in dct.items() if v} + + def filter_empty_values(self, d: Dict[str, Any]) -> Dict[str, Any]: + if isinstance(d, Mapping): + return dict((k, self.filter_empty_values(v)) for k, v, in d.items() if v) + else: + return d + + def to_dict_failed(self) -> Dict[str, Any]: + dct = { + "WorkspaceRequest": { + "DirectoryId": self.workspace["DirectoryId"], + "UserName": self.workspace["UserName"], + "BundleId": self.workspace["BundleId"], + "SubnetId": "", # UnKnown + "VolumeEncryptionKey": self.volume_encryption_key, + "UserVolumeEncryptionEnabled": self.user_volume_encryption_enabled, + "RootVolumeEncryptionEnabled": self.root_volume_encryption_enabled, + "WorkspaceProperties": self.workspace_properties, + "Tags": self.tags, + }, + "ErrorCode": self.error_code, + "ErrorMessage": self.error_message, + } + return self.filter_empty_values(dct) + + +class WorkSpaceDirectory(BaseModel): + def __init__( + self, + account_id: str, + region: str, + directory: Directory, + registration_code: str, + security_group_id: str, + subnet_ids: List[str], + enable_work_docs: bool, + enable_self_service: bool, + tenancy: str, + tags: List[Dict[str, str]], + ): + self.account_id = account_id + self.region = region + self.directory_id = directory.directory_id + self.alias = directory.alias + self.directory_name = directory.name + self.launch_time = directory.launch_time + self.registration_code = registration_code + if directory.directory_type == "ADConnector": + dir_subnet_ids = directory.connect_settings["SubnetIds"] # type: ignore[index] + else: + dir_subnet_ids = directory.vpc_settings["SubnetIds"] # type: ignore[index] + self.subnet_ids = subnet_ids or dir_subnet_ids + self.dns_ip_addresses = directory.dns_ip_addrs + self.customer_username = "Administrator" + self.iam_rold_id = f"arn:aws:iam::{account_id}:role/workspaces_DefaultRole" + self.directory_type = directory.directory_type + self.workspace_security_group_id = security_group_id + self.state = "REGISTERED" + # Default values for workspace_creation_properties + workspace_creation_properties = { + "EnableWorkDocs": enable_work_docs, + "EnableInternetAccess": False, + "DefaultOu": "", + "CustomSecurityGroupId": "", + "UserEnabledAsLocalAdministrator": ( + True if self.customer_username == "Administrator" else False + ), + "EnableMaintenanceMode": True, + } + # modify creation properites + self.workspace_creation_properties = workspace_creation_properties + self.ip_group_ids = "" # create_ip_group + # Default values for workspace access properties + workspace_access_properties = { + "DeviceTypeWindows": ( + "DENY" if self.directory_type == "AD_CONNECTOR" else "ALLOW" + ), + "DeviceTypeOsx": "ALLOW", + "DeviceTypeWeb": "DENY", + "DeviceTypeIos": "ALLOW", + "DeviceTypeAndroid": "ALLOW", + "DeviceTypeChromeOs": "ALLOW", + "DeviceTypeZeroClient": ( + "DENY" if self.directory_type == "AD_CONNECTOR" else "ALLOW" + ), + "DeviceTypeLinux": "DENY", + } + # modify_workspace_access_properties + self.workspace_access_properties = workspace_access_properties + self.tenancy = tenancy or "SHARED" + + # Default values for self service permissions + mode = "DISABLED" + if enable_self_service: + mode = "ENABLED" + self_service_permissions = { + "RestartWorkspace": "ENABLED", + "IncreaseVolumeSize": mode, + "ChangeComputeType": mode, + "SwitchRunningMode": mode, + "RebuildWorkspace": mode, + } + self.self_service_permissions = self_service_permissions + # Default values for saml properties + saml_properties = { + "Status": "DISABLED", + "UserAccessUrl": "", + "RelayStateParameterName": "RelayState", + } + self.saml_properties = saml_properties + # Default values for certificate bases auth properties + self.certificate_based_auth_properties = { + "Status": "DISABLED", + } + # ModifyCertificateBasedAuthProperties + self.tags = tags or [] + client_properties = { + # Log uploading is enabled by default. + "ReconnectEnabled": "ENABLED", + "LogUploadEnabled": "ENABLED", # Remember me is enabled by default + } + self.client_properties = client_properties + + def delete_security_group(self) -> None: + """Delete the given security group.""" + ec2_backends[self.account_id][self.region].delete_security_group( + group_id=self.workspace_security_group_id + ) + + def to_dict(self) -> Dict[str, Any]: + dct = { + "DirectoryId": self.directory_id, + "Alias": self.alias, + "DirectoryName": self.directory_name, + "RegistrationCode": self.registration_code, + "SubnetIds": self.subnet_ids, + "DnsIpAddresses": self.dns_ip_addresses, + "CustomerUserName": self.customer_username, + "IamRoleId": self.iam_rold_id, + "DirectoryType": self.directory_type, + "WorkspaceSecurityGroupId": self.workspace_security_group_id, + "State": self.state, + "WorkspaceCreationProperties": self.workspace_creation_properties, + "ipGroupIds": self.ip_group_ids, + "WorkspaceAccessProperties": self.workspace_access_properties, + "Tenancy": self.tenancy, + "SelfservicePermissions": self.self_service_permissions, + "SamlProperties": self.saml_properties, + "CertificateBasedAuthProperties": self.certificate_based_auth_properties, + } + return {k: v for k, v in dct.items() if v} + + +class WorkspaceImage(BaseModel): + def __init__( + self, + name: str, + description: str, + tags: List[Dict[str, str]], + account_id: str, + ): + self.image_id = f"wsi-{mock_random.get_random_hex(9)}" + self.name = name + self.description = description + self.operating_system: Dict[str, str] = {} # Unknown + # Initially the 'state' is 'PENDING', but here the 'state' will be set as 'AVAILABLE' since + # this operation is being mocked. + self.state = "AVAILABLE" + self.required_tenancy = "DEFAULT" + self.created = unix_time() + self.owner_account = account_id + self.error_code = "" + self.error_message = "" + self.image_permissions: List[Dict[str, str]] = [] + self.tags = tags + + # Default updates + self.updates = { + "UpdateAvailable": False, + "Description": "This WorkSpace image does not have updates available", + } + self.error_details: List[Dict[str, str]] = [] + + def to_dict(self) -> Dict[str, Any]: + dct = { + "ImageId": self.image_id, + "Name": self.name, + "Description": self.description, + "OperatingSystem": self.operating_system, + "State": self.state, + "RequiredTenancy": self.required_tenancy, + "Created": self.created, + "OwnerAccountId": self.owner_account, + } + return {k: v for k, v in dct.items() if v} + + def to_desc_dict(self) -> Dict[str, Any]: + dct = self.to_dict() + dct_options = { + "ErrorCode": self.error_code, + "ErrorMessage": self.error_message, + "Updates": self.updates, + "ErrorDetails": self.error_details, + } + for key, value in dct_options.items(): + if value is not None: + dct[key] = value + return dct + + +class WorkSpacesBackend(BaseBackend): + """Implementation of WorkSpaces APIs.""" + + # The assumption here is that the limits are the same for all regions. + DIRECTORIES_LIMIT = 50 + + def __init__(self, region_name: str, account_id: str): + super().__init__(region_name, account_id) + self.workspaces: Dict[str, Workspace] = dict() + self.workspace_directories: Dict[str, WorkSpaceDirectory] = dict() + self.workspace_images: Dict[str, WorkspaceImage] = dict() + self.directories: List[Directory] + + def validate_directory_id(self, value: str, msg: str) -> None: + """Raise exception if the directory id is invalid.""" + id_pattern = r"^d-[0-9a-f]{10}$" + if not re.match(id_pattern, value): + raise ValidationException(msg) + + def validate_image_id(self, value: str, msg: str) -> None: + """Raise exception if the image id is invalid.""" + id_pattern = r"^wsi-[0-9a-z]{9}$" + if not re.match(id_pattern, value): + raise ValidationException(msg) + + def create_security_group(self, directory_id: str, vpc_id: str) -> str: + """Create security group for the workspace directory.""" + security_group_info = ec2_backends[self.account_id][ + self.region_name + ].create_security_group( + name=f"{directory_id}_workspacesMembers", + description=("Amazon WorkSpaces Security Group"), + vpc_id=vpc_id, + ) + return security_group_info.id + + def create_workspaces( + self, workspaces: List[Dict[str, Any]] + ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + failed_requests = [] + pending_requests = [] + + for ws in workspaces: + error_code = "" + error_msg = "" + directory_id = ws["DirectoryId"] + msg = f"The Directory ID {directory_id} in the request is invalid." + self.validate_directory_id(directory_id, msg) + + # FailedRequests are created if the directory_id is unknown + if directory_id not in self.workspace_directories: + error_code = "ResourceNotFound.Directory" + error_msg = "The specified directory could not be found in the specified region." + + running_mode = "ALWAYS_ON" + workspace_properties = ws.get("WorkspaceProperties", "") + if workspace_properties: + running_mode = workspace_properties.get("RunningMode", running_mode) + auto_stop_timeout = workspace_properties.get( + "RunningModeAutoStopTimeoutInMinutes", "" + ) + + # Requests fail if AutoStopTimeout is given for an AlwaysOn Running mode + if auto_stop_timeout and running_mode == "ALWAYS_ON": + error_code = "AutoStopTimeoutIsNotApplicableForAnAlwaysOnWorkspace" + error_msg = "RunningModeAutoStopTimeoutInMinutes is not applicable for WorkSpace with running mode set to ALWAYS_ON." + + # Requests fail if AutoStopTimeout is given for an Manual Running mode + if auto_stop_timeout and running_mode == "MANUAL": + error_code = "AutoStopTimeoutIsNotDefaultForManualWorkspace" + + workspace = Workspace( + workspace=ws, + running_mode=running_mode, + error_code=error_code, + error_msg=error_msg, + ) + if error_code: + failed_requests.append(workspace.to_dict_failed()) + else: + pending_requests.append(workspace.to_dict_pending()) + self.workspaces[workspace.workspace_id] = workspace + + return failed_requests, pending_requests + + def describe_workspaces( + self, + workspace_ids: List[str], + directory_id: str, + user_name: str, + bundle_id: str, + ) -> List[Workspace]: + # Pagination not yet implemented + + # Only one of the following are allowed to be specified: BundleId, DirectoryId, WorkSpaceIds. + if ( + (workspace_ids and directory_id) + or (directory_id and bundle_id) + or (workspace_ids and bundle_id) + ): + msg = "An invalid number of parameters provided with DescribeWorkspaces. Only one of the following are allowed to be specified: BundleId, DirectoryId, WorkSpaceIds, Filters." + raise InvalidParameterValuesException(msg) + + # Directory_id parameter is required when Username is given. + if user_name and not directory_id: + msg = "The DirectoryId parameter is required when UserName is used." + raise InvalidParameterValuesException(msg) + + workspaces = list(self.workspaces.values()) + if workspace_ids: + workspaces = [x for x in workspaces if x.workspace_id in workspace_ids] + if directory_id: + workspaces = [x for x in workspaces if x.directory_id == directory_id] + if directory_id and user_name: + workspaces = [ + x + for x in workspaces + if (x.directory_id == directory_id) and (x.user_name == user_name) + ] + if bundle_id: + workspaces = [x for x in workspaces if x.bundle_id == bundle_id] + # workspaces = [w.to_dict_pending() for w in workspaces] + return workspaces + + def register_workspace_directory( + self, + directory_id: str, + subnet_ids: List[str], + enable_work_docs: bool, + enable_self_service: bool, + tenancy: str, + tags: List[Dict[str, str]], + ) -> None: + ran_str = mock_random.get_random_string(length=6) + registration_code = f"SLiad+{ran_str.upper()}" + + (self.directories, _) = ds_backends[self.account_id][ + self.region_name + ].describe_directories(directory_ids=[directory_id]) + directory = self.directories[0] + + if directory.directory_type == "ADConnector": + vpc_id = directory.connect_settings["VpcId"] # type: ignore[index] + else: + vpc_id = directory.vpc_settings["VpcId"] # type: ignore[index] + + security_group_id = self.create_security_group(directory_id, vpc_id) + + workspace_directory = WorkSpaceDirectory( + account_id=self.account_id, + region=self.region_name, + directory=directory, + registration_code=registration_code, + security_group_id=security_group_id, + subnet_ids=subnet_ids, + enable_work_docs=enable_work_docs, + enable_self_service=enable_self_service, + tenancy=tenancy, + tags=tags, + ) + self.workspace_directories[ + workspace_directory.directory_id + ] = workspace_directory + + def describe_workspace_directories( + self, directory_ids: Optional[List[str]] = None + ) -> List[WorkSpaceDirectory]: + """Return info on all directories or directories with matching IDs.""" + # Pagination not yet implemented + + workspace_directories = list(self.workspace_directories.values()) + if directory_ids: + for d in directory_ids: + msg = "The request is invalid." + self.validate_directory_id(d, msg) + workspace_directories = [ + x for x in workspace_directories if x.directory_id in directory_ids + ] + + return sorted(workspace_directories, key=lambda x: x.launch_time) + + def modify_workspace_creation_properties( + self, resource_id: str, workspace_creation_properties: Dict[str, Any] + ) -> None: + + # Raise Exception if Directory doesnot exist. + if resource_id not in self.workspace_directories: + raise ValidationException("The request is invalid.") + + res = self.workspace_directories[resource_id] + res.workspace_creation_properties = workspace_creation_properties + + def create_tags(self, resource_id: str, tags: List[Dict[str, str]]) -> None: + if resource_id.startswith("d-"): + ds = self.workspace_directories[resource_id] + ds.tags.extend(tags) + if resource_id.startswith("ws-"): + ws = self.workspaces[resource_id] + ws.tags.extend(tags) + + def describe_tags(self, resource_id: str) -> List[Dict[str, str]]: + if resource_id.startswith("d-"): + ds = self.workspace_directories[resource_id] + tag_list = ds.tags + if resource_id.startswith("ws-"): + ws = self.workspaces[resource_id] + tag_list = ws.tags + if resource_id.startswith("wsi-"): + wsi = self.workspace_images[resource_id] + tag_list = wsi.tags + return tag_list + + def describe_client_properties(self, resource_ids: str) -> List[Dict[str, Any]]: + workspace_directories = list(self.workspace_directories.values()) + workspace_directories = [ + x for x in workspace_directories if x.directory_id in resource_ids + ] + client_properties_list = [] + for wd in workspace_directories: + cpl = { + "ResourceId": wd.directory_id, + "ClientProperties": wd.client_properties, + } + client_properties_list.append(cpl) + return client_properties_list + + def modify_client_properties( + self, resource_id: str, client_properties: Dict[str, str] + ) -> None: + res = self.workspace_directories[resource_id] + res.client_properties = client_properties + + def create_workspace_image( + self, name: str, description: str, workspace_id: str, tags: List[Dict[str, str]] + ) -> Dict[str, Any]: + + # Check if workspace exists. + if workspace_id not in self.workspaces: + raise ResourceNotFoundException( + "The specified WorkSpace cannot be found. Confirm that the workspace exists in your AWS account, and try again." + ) + # Check if image name already exists. + if name in [x.name for x in self.workspace_images.values()]: + raise ResourceAlreadyExistsException( + "A WorkSpace image with the same name exists in the destination Region. Provide a unique destination image name, and try again." + ) + + workspace_image = WorkspaceImage( + name=name, + description=description, + tags=tags, + account_id=self.account_id, + ) + self.workspace_images[workspace_image.image_id] = workspace_image + return workspace_image.to_dict() + + def describe_workspace_images( + self, image_ids: Optional[List[str]], image_type: Optional[str] + ) -> List[Dict[str, Any]]: + # Pagination not yet implemented + workspace_images = list(self.workspace_images.values()) + if image_type == "OWNED": + workspace_images = [ + i for i in workspace_images if i.owner_account == self.account_id + ] + elif image_type == "SHARED": + workspace_images = [ + i for i in workspace_images if i.owner_account != self.account_id + ] + if image_ids: + workspace_images = [i for i in workspace_images if i.image_id in image_ids] + return [w.to_desc_dict() for w in workspace_images] + + def update_workspace_image_permission( + self, image_id: str, allow_copy_image: bool, shared_account_id: str + ) -> None: + shared_account = {"SharedAccountId": shared_account_id} + res = self.workspace_images[image_id] + shared_accounts = [] + shared_accounts = res.image_permissions + + if shared_account not in shared_accounts and allow_copy_image: + shared_accounts.append(shared_account) + if shared_account in shared_accounts and not allow_copy_image: + shared_accounts.remove(shared_account) + + res.image_permissions = shared_accounts + + def describe_workspace_image_permissions( + self, image_id: str + ) -> Tuple[str, List[Dict[str, str]]]: + # Pagination not yet implemented + + msg = f"The Image ID {image_id} in the request is invalid" + self.validate_image_id(image_id, msg) + + image_permissions = [] + if image_id in self.workspace_images: + res = self.workspace_images[image_id] + image_permissions = res.image_permissions + return image_id, image_permissions + + def deregister_workspace_directory(self, directory_id: str) -> None: + """Deregister Workspace Directory with the matching ID.""" + # self._validate_directory_id(directory_id) + self.workspace_directories[directory_id].delete_security_group() + self.workspace_directories.pop(directory_id) + + def modify_selfservice_permissions( + self, resource_id: str, selfservice_permissions: Dict[str, str] + ) -> None: + res = self.workspace_directories[resource_id] + res.self_service_permissions = selfservice_permissions + + +workspaces_backends = BackendDict(WorkSpacesBackend, "workspaces") diff --git a/moto/workspaces/responses.py b/moto/workspaces/responses.py new file mode 100644 index 000000000..cbf6e5752 --- /dev/null +++ b/moto/workspaces/responses.py @@ -0,0 +1,179 @@ +"""Handles incoming workspaces requests, invokes methods, returns responses.""" +import json + +from moto.core.responses import BaseResponse + +from .models import WorkSpacesBackend, workspaces_backends + + +class WorkSpacesResponse(BaseResponse): + """Handler for WorkSpaces requests and responses.""" + + def __init__(self) -> None: + super().__init__(service_name="workspaces") + + @property + def workspaces_backend(self) -> WorkSpacesBackend: + """Return backend instance specific for this region.""" + return workspaces_backends[self.current_account][self.region] + + def create_workspaces(self) -> str: + params = json.loads(self.body) + workspaces = params.get("Workspaces") + failed_requests, pending_requests = self.workspaces_backend.create_workspaces( + workspaces=workspaces, + ) + return json.dumps( + dict(FailedRequests=failed_requests, PendingRequests=pending_requests) + ) + + def describe_workspaces(self) -> str: + params = json.loads(self.body) + workspace_ids = params.get("WorkspaceIds") + directory_id = params.get("DirectoryId") + user_name = params.get("UserName") + bundle_id = params.get("BundleId") + workspaces = self.workspaces_backend.describe_workspaces( + workspace_ids=workspace_ids, + directory_id=directory_id, + user_name=user_name, + bundle_id=bundle_id, + ) + return json.dumps(dict(Workspaces=[x.to_dict_pending() for x in workspaces])) + + def describe_workspace_directories(self) -> str: + params = json.loads(self.body) + directory_ids = params.get("DirectoryIds") + directories = self.workspaces_backend.describe_workspace_directories( + directory_ids=directory_ids, + ) + return json.dumps(dict(Directories=[d.to_dict() for d in directories])) + + def register_workspace_directory(self) -> str: + params = json.loads(self.body) + directory_id = params.get("DirectoryId") + subnet_ids = params.get("SubnetIds") + enable_work_docs = params.get("EnableWorkDocs") + enable_self_service = params.get("EnableSelfService") + tenancy = params.get("Tenancy") + tags = params.get("Tags") + self.workspaces_backend.register_workspace_directory( + directory_id=directory_id, + subnet_ids=subnet_ids, + enable_work_docs=enable_work_docs, + enable_self_service=enable_self_service, + tenancy=tenancy, + tags=tags, + ) + return json.dumps(dict()) + + def modify_workspace_creation_properties(self) -> str: + params = json.loads(self.body) + resource_id = params.get("ResourceId") + workspace_creation_properties = params.get("WorkspaceCreationProperties") + self.workspaces_backend.modify_workspace_creation_properties( + resource_id=resource_id, + workspace_creation_properties=workspace_creation_properties, + ) + return "{}" + + def create_tags(self) -> str: + params = json.loads(self.body) + resource_id = params.get("ResourceId") + tags = params.get("Tags") + self.workspaces_backend.create_tags( + resource_id=resource_id, + tags=tags, + ) + return "{}" + + def describe_tags(self) -> str: + params = json.loads(self.body) + resource_id = params.get("ResourceId") + tag_list = self.workspaces_backend.describe_tags( + resource_id=resource_id, + ) + return json.dumps(dict(TagList=tag_list)) + + def describe_client_properties(self) -> str: + params = json.loads(self.body) + resource_ids = params.get("ResourceIds") + client_properties_list = self.workspaces_backend.describe_client_properties( + resource_ids=resource_ids, + ) + return json.dumps(dict(ClientPropertiesList=client_properties_list)) + + def modify_client_properties(self) -> str: + params = json.loads(self.body) + resource_id = params.get("ResourceId") + client_properties = params.get("ClientProperties") + self.workspaces_backend.modify_client_properties( + resource_id=resource_id, + client_properties=client_properties, + ) + return "{}" + + def create_workspace_image(self) -> str: + params = json.loads(self.body) + name = params.get("Name") + description = params.get("Description") + workspace_id = params.get("WorkspaceId") + tags = params.get("Tags") + workspace_image = self.workspaces_backend.create_workspace_image( + name=name, + description=description, + workspace_id=workspace_id, + tags=tags, + ) + return json.dumps(workspace_image) + + def describe_workspace_images(self) -> str: + params = json.loads(self.body) + image_ids = params.get("ImageIds") + image_type = params.get("ImageType") + images = self.workspaces_backend.describe_workspace_images( + image_ids=image_ids, + image_type=image_type, + ) + return json.dumps(dict(Images=images)) + + def update_workspace_image_permission(self) -> str: + params = json.loads(self.body) + image_id = params.get("ImageId") + allow_copy_image = params.get("AllowCopyImage") + shared_account_id = params.get("SharedAccountId") + self.workspaces_backend.update_workspace_image_permission( + image_id=image_id, + allow_copy_image=allow_copy_image, + shared_account_id=shared_account_id, + ) + return "{}" + + def describe_workspace_image_permissions(self) -> str: + params = json.loads(self.body) + image_id = params.get("ImageId") + ( + image_id, + image_permissions, + ) = self.workspaces_backend.describe_workspace_image_permissions( + image_id=image_id, + ) + return json.dumps(dict(ImageId=image_id, ImagePermissions=image_permissions)) + + def deregister_workspace_directory(self) -> str: + params = json.loads(self.body) + directory_id = params.get("DirectoryId") + self.workspaces_backend.deregister_workspace_directory( + directory_id=directory_id, + ) + return "{}" + + def modify_selfservice_permissions(self) -> str: + params = json.loads(self.body) + resource_id = params.get("ResourceId") + selfservice_permissions = params.get("SelfservicePermissions") + self.workspaces_backend.modify_selfservice_permissions( + resource_id=resource_id, + selfservice_permissions=selfservice_permissions, + ) + return "{}" diff --git a/moto/workspaces/urls.py b/moto/workspaces/urls.py new file mode 100644 index 000000000..7bf27b8ac --- /dev/null +++ b/moto/workspaces/urls.py @@ -0,0 +1,10 @@ +"""workspaces base URL and path.""" +from .responses import WorkSpacesResponse + +url_bases = [ + r"https?://workspaces\.(.+)\.amazonaws\.com", +] + +url_paths = { + "{0}/$": WorkSpacesResponse.dispatch, +} diff --git a/tests/test_resourcegroupstaggingapi/test_resourcegroupstagging_glue.py b/tests/test_resourcegroupstaggingapi/test_resourcegroupstagging_glue.py index 627a2cd31..b6165fa97 100644 --- a/tests/test_resourcegroupstaggingapi/test_resourcegroupstagging_glue.py +++ b/tests/test_resourcegroupstaggingapi/test_resourcegroupstagging_glue.py @@ -8,7 +8,7 @@ from moto.core import DEFAULT_ACCOUNT_ID @mock_aws def test_glue_jobs(): - glue = boto3.client("glue", region_name="us-west-1") + glue = boto3.client("glue", region_name="us-west-2") tag_key = str(uuid4())[0:6] tag_val = str(uuid4())[0:6] job_name = glue.create_job( @@ -17,9 +17,9 @@ def test_glue_jobs(): Command={"Name": "test_command"}, Tags={tag_key: tag_val}, )["Name"] - job_arn = f"arn:aws:glue:us-west-1:{DEFAULT_ACCOUNT_ID}:job/{job_name}" + job_arn = f"arn:aws:glue:us-west-2:{DEFAULT_ACCOUNT_ID}:job/{job_name}" - rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-west-1") + rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-west-2") resources = rtapi.get_resources(ResourceTypeFilters=["glue"])[ "ResourceTagMappingList" ] diff --git a/tests/test_resourcegroupstaggingapi/test_resourcegroupstaggingapi.py b/tests/test_resourcegroupstaggingapi/test_resourcegroupstaggingapi.py index 5b410c563..7ffbe5459 100644 --- a/tests/test_resourcegroupstaggingapi/test_resourcegroupstaggingapi.py +++ b/tests/test_resourcegroupstaggingapi/test_resourcegroupstaggingapi.py @@ -5,6 +5,7 @@ from botocore.client import ClientError from moto import mock_aws from tests import EXAMPLE_AMI_ID, EXAMPLE_AMI_ID2 +from tests.test_ds.test_ds_simple_ad_directory import create_test_directory @mock_aws @@ -330,7 +331,7 @@ def test_get_resources_ec2(): @mock_aws def test_get_resources_ec2_vpc(): - ec2 = boto3.resource("ec2", region_name="us-west-1") + ec2 = boto3.resource("ec2", region_name="us-west-2") vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") ec2.create_tags(Resources=[vpc.id], Tags=[{"Key": "test", "Value": "test"}]) @@ -339,7 +340,7 @@ def test_get_resources_ec2_vpc(): assert len(results) == 1 assert vpc.id in results[0]["ResourceARN"] - rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-west-1") + rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-west-2") resp = rtapi.get_resources(ResourceTypeFilters=["ec2"]) assert_response(resp) resp = rtapi.get_resources(ResourceTypeFilters=["ec2:vpc"]) @@ -744,6 +745,128 @@ def test_get_resources_sqs(): assert len(resp["ResourceTagMappingList"]) == 1 assert {"Key": "Test", "Value": "1"} in resp["ResourceTagMappingList"][0]["Tags"] +def create_directory(): + ec2_client = boto3.client("ec2", region_name="eu-central-1") + ds_client = boto3.client("ds", region_name="eu-central-1") + directory_id = create_test_directory(ds_client, ec2_client) + return directory_id + +@mock_aws +def test_get_resources_workspaces(): + workspaces = boto3.client("workspaces", region_name="eu-central-1") + + # Create two tagged Workspaces + directory_id = create_directory() + workspaces.register_workspace_directory( + DirectoryId=directory_id, + EnableWorkDocs=False + ) + workspaces.create_workspaces( + Workspaces=[ + { + 'DirectoryId': directory_id, + 'UserName': 'Administrator', + 'BundleId': 'wsb-bh8rsxt14', + 'Tags': [ + {"Key": "Test", "Value": "1"}, + ], + }, + { + 'DirectoryId': directory_id, + 'UserName': 'Administrator', + 'BundleId': 'wsb-bh8rsxt14', + 'Tags': [ + {"Key": "Test", "Value": "2"}, + ], + }, + ] + ) + + rtapi = boto3.client("resourcegroupstaggingapi", region_name="eu-central-1") + + # Basic test + resp = rtapi.get_resources(ResourceTypeFilters=["workspaces"]) + assert len(resp["ResourceTagMappingList"]) == 2 + + # Test tag filtering + resp = rtapi.get_resources( + ResourceTypeFilters=["workspaces"], + TagFilters=[{"Key": "Test", "Values": ["1"]}], + ) + assert len(resp["ResourceTagMappingList"]) == 1 + assert {"Key": "Test", "Value": "1"} in resp["ResourceTagMappingList"][0]["Tags"] + +@mock_aws +def test_get_resources_workspace_directories(): + workspaces = boto3.client("workspaces", region_name="eu-central-1") + + # Create two tagged Workspaces Directories + for i in range(1, 3): + i_str = str(i) + directory_id = create_directory() + workspaces.register_workspace_directory( + DirectoryId=directory_id, + EnableWorkDocs=False, + Tags=[ + {"Key": "Test", "Value": i_str}, + ], + ) + + rtapi = boto3.client("resourcegroupstaggingapi", region_name="eu-central-1") + + # Basic test + resp = rtapi.get_resources(ResourceTypeFilters=["workspaces-directory"]) + assert len(resp["ResourceTagMappingList"]) == 2 + + # Test tag filtering + resp = rtapi.get_resources( + ResourceTypeFilters=["workspaces-directory"], + TagFilters=[{"Key": "Test", "Values": ["1"]}], + ) + assert len(resp["ResourceTagMappingList"]) == 1 + assert {"Key": "Test", "Value": "1"} in resp["ResourceTagMappingList"][0]["Tags"] + +@mock_aws +def test_get_resources_workspace_images(): + workspaces = boto3.client("workspaces", region_name="eu-central-1") + + # Create two tagged Workspace Images + directory_id = create_directory() + workspaces.register_workspace_directory( + DirectoryId=directory_id, + EnableWorkDocs=False + ) + resp = workspaces.create_workspaces( + Workspaces=[ + { + 'DirectoryId': directory_id, + 'UserName': 'Administrator', + 'BundleId': 'wsb-bh8rsxt14', + }, + ] + ) + workspace_id = resp["PendingRequests"][0]["WorkspaceId"] + for i in range(1, 3): + i_str = str(i) + image = workspaces.create_workspace_image( + Name=f'test-image-{i_str}', + Description='Test workspace image', + WorkspaceId=workspace_id, + Tags=[{"Key": "Test", "Value": i_str}] + ) + rtapi = boto3.client("resourcegroupstaggingapi", region_name="eu-central-1") + + # Basic test + resp = rtapi.get_resources(ResourceTypeFilters=["workspaces-image"]) + assert len(resp["ResourceTagMappingList"]) == 2 + + # Test tag filtering + resp = rtapi.get_resources( + ResourceTypeFilters=["workspaces-image"], + TagFilters=[{"Key": "Test", "Values": ["1"]}], + ) + assert len(resp["ResourceTagMappingList"]) == 1 + assert {"Key": "Test", "Value": "1"} in resp["ResourceTagMappingList"][0]["Tags"] @mock_aws def test_get_resources_sns(): diff --git a/tests/test_workspaces/__init__.py b/tests/test_workspaces/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_workspaces/test_workspaces.py b/tests/test_workspaces/test_workspaces.py new file mode 100644 index 000000000..99eff463b --- /dev/null +++ b/tests/test_workspaces/test_workspaces.py @@ -0,0 +1,686 @@ +"""Unit tests for workspaces-supported APIs.""" +import boto3 +import pytest +from botocore.exceptions import ClientError + +from moto import mock_aws +from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID +from tests.test_ds.test_ds_simple_ad_directory import create_test_directory + + +def create_directory(): + """Create a Directory""" + ec2_client = boto3.client("ec2", region_name="eu-west-1") + ds_client = boto3.client("ds", region_name="eu-west-1") + directory_id = create_test_directory(ds_client, ec2_client) + return directory_id + + +def create_security_group(client): + """Return the ID for a valid Security group.""" + return client.create_security_group( + GroupName="custom-sg", Description="Custom SG for workspaces" + ) + + +@mock_aws +def test_create_workspaces(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + resp = client.create_workspaces( + Workspaces=[ + { + "DirectoryId": directory_id, + "UserName": "Administrator", + "BundleId": "wsb-bh8rsxt14", + "VolumeEncryptionKey": f"arn:aws:kms:eu-west-1:{ACCOUNT_ID}:key/51d81fab-b138-4bd2-8a09-07fd6d37224d", + "UserVolumeEncryptionEnabled": True, + "RootVolumeEncryptionEnabled": True, + "WorkspaceProperties": { + "RunningMode": "ALWAYS_ON", + "RootVolumeSizeGib": 10, + "UserVolumeSizeGib": 10, + "ComputeTypeName": "VALUE", + "Protocols": [ + "PCOIP", + ], + }, + "Tags": [ + {"Key": "foo", "Value": "bar"}, + ], + }, + ] + ) + pending_requests = resp["PendingRequests"] + assert len(pending_requests) > 0 + assert "WorkspaceId" in pending_requests[0] + assert "DirectoryId" in pending_requests[0] + assert "UserName" in pending_requests[0] + assert "State" in pending_requests[0] + assert "BundleId" in pending_requests[0] + assert "VolumeEncryptionKey" in pending_requests[0] + assert "UserVolumeEncryptionEnabled" in pending_requests[0] + assert "RootVolumeEncryptionEnabled" in pending_requests[0] + assert "WorkspaceProperties" in pending_requests[0] + + +@mock_aws +def test_create_workspaces_with_invalid_directory_id(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = "d-906787e2cx" + with pytest.raises(ClientError) as exc: + client.create_workspaces( + Workspaces=[ + { + "DirectoryId": directory_id, + "UserName": "Administrator", + "BundleId": "wsb-bh8rsxt14", + }, + ] + ) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + + +@mock_aws +def test_create_workspaces_with_unknown_directory_id(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = "d-906787e2ce" + resp = client.create_workspaces( + Workspaces=[ + { + "DirectoryId": directory_id, + "UserName": "Administrator", + "BundleId": "wsb-bh8rsxt14", + }, + ] + ) + failed_requests = resp["FailedRequests"] + assert len(failed_requests) > 0 + assert "WorkspaceRequest" in failed_requests[0] + assert "ErrorCode" in failed_requests[0] + assert "ErrorMessage" in failed_requests[0] + + +@mock_aws +def test_create_workspaces_with_auto_stop_timeout_and_alwayson(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + resp = client.create_workspaces( + Workspaces=[ + { + "DirectoryId": directory_id, + "UserName": "Administrator", + "BundleId": "wsb-bh8rsxt14", + "WorkspaceProperties": { + "RunningMode": "ALWAYS_ON", + "RunningModeAutoStopTimeoutInMinutes": 123, + }, + }, + ] + ) + failed_requests = resp["FailedRequests"] + assert len(failed_requests) > 0 + assert ( + failed_requests[0]["ErrorCode"] + == "AutoStopTimeoutIsNotApplicableForAnAlwaysOnWorkspace" + ) + + +@mock_aws +def test_create_workspaces_with_auto_stop_timeout_and_manual(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + resp = client.create_workspaces( + Workspaces=[ + { + "DirectoryId": directory_id, + "UserName": "Administrator", + "BundleId": "wsb-bh8rsxt14", + "WorkspaceProperties": { + "RunningMode": "MANUAL", + "RunningModeAutoStopTimeoutInMinutes": 123, + }, + }, + ] + ) + failed_requests = resp["FailedRequests"] + assert len(failed_requests) > 0 + assert ( + failed_requests[0]["ErrorCode"] + == "AutoStopTimeoutIsNotDefaultForManualWorkspace" + ) + + +@mock_aws +def test_describe_workspaces(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + for _ in range(2): + client.create_workspaces( + Workspaces=[ + { + "DirectoryId": directory_id, + "UserName": "Administrator", + "BundleId": "wsb-bh8rsxt14", + } + ] + ) + resp = client.describe_workspaces() + assert len(resp["Workspaces"]) == 2 + workspace = resp["Workspaces"][0] + assert "WorkspaceId" in workspace + assert "DirectoryId" in workspace + assert "UserName" in workspace + assert "State" in workspace + assert "BundleId" in workspace + + +@mock_aws +def test_describe_workspaces_with_directory_and_username(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + client.create_workspaces( + Workspaces=[ + { + "DirectoryId": directory_id, + "UserName": "Administrator", + "BundleId": "wsb-bh8rsxt14", + } + ] + ) + resp = client.describe_workspaces( + DirectoryId=directory_id, UserName="Administrator" + ) + + workspace = resp["Workspaces"][0] + assert workspace["DirectoryId"] == directory_id + assert workspace["UserName"] == "Administrator" + + +@mock_aws +def test_describe_workspaces_invalid_parameters(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + response = client.create_workspaces( + Workspaces=[ + { + "DirectoryId": directory_id, + "UserName": "Administrator", + "BundleId": "wsb-bh8rsxt14", + } + ] + ) + workspace_id = response["PendingRequests"][0]["WorkspaceId"] + with pytest.raises(ClientError) as exc: + client.describe_workspaces( + WorkspaceIds=[workspace_id], DirectoryId=directory_id + ) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterValuesException" + + with pytest.raises(ClientError) as exc: + client.describe_workspaces( + WorkspaceIds=[workspace_id], BundleId="wsb-bh8rsxt14" + ) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterValuesException" + + with pytest.raises(ClientError) as exc: + client.describe_workspaces(DirectoryId=directory_id, BundleId="wsb-bh8rsxt14") + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterValuesException" + + +@mock_aws +def test_describe_workspaces_only_user_name_used(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + client.create_workspaces( + Workspaces=[ + { + "DirectoryId": directory_id, + "UserName": "Administrator", + "BundleId": "wsb-bh8rsxt14", + } + ] + ) + with pytest.raises(ClientError) as exc: + client.describe_workspaces( + UserName="user1", + ) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterValuesException" + + +@mock_aws +def test_register_workspace_directory(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + resp = client.describe_workspace_directories(DirectoryIds=[directory_id]) + assert "RegistrationCode" in resp["Directories"][0] + assert ( + resp["Directories"][0]["WorkspaceCreationProperties"]["EnableWorkDocs"] is False + ) + assert resp["Directories"][0]["Tenancy"] == "SHARED" + + +@mock_aws +def test_register_workspace_directory_enable_self_service(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory( + DirectoryId=directory_id, + EnableWorkDocs=True, + EnableSelfService=True, + Tenancy="DEDICATED", + ) + resp = client.describe_workspace_directories(DirectoryIds=[directory_id]) + self_service_permissions = resp["Directories"][0]["SelfservicePermissions"] + assert "RegistrationCode" in resp["Directories"][0] + assert ( + resp["Directories"][0]["WorkspaceCreationProperties"]["EnableWorkDocs"] is True + ) + assert self_service_permissions["IncreaseVolumeSize"] == "ENABLED" + assert self_service_permissions["ChangeComputeType"] == "ENABLED" + assert self_service_permissions["SwitchRunningMode"] == "ENABLED" + assert self_service_permissions["RebuildWorkspace"] == "ENABLED" + assert resp["Directories"][0]["Tenancy"] == "DEDICATED" + + +@mock_aws +def test_register_workspace_directory_with_subnets(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + resp = client.describe_workspace_directories(DirectoryIds=[directory_id]) + assert "RegistrationCode" in resp["Directories"][0] + assert ( + resp["Directories"][0]["WorkspaceCreationProperties"]["EnableWorkDocs"] is False + ) + assert resp["Directories"][0]["Tenancy"] == "SHARED" + + +@mock_aws +def test_describe_workspace_directories(): + client = boto3.client("workspaces", region_name="eu-west-1") + for _ in range(2): + directory_id = create_directory() + client.register_workspace_directory( + DirectoryId=directory_id, + EnableWorkDocs=True, + ) + resp = client.describe_workspace_directories() + assert len(resp["Directories"]) == 2 + directory = resp["Directories"][0] + assert "DirectoryId" in directory + assert "DirectoryName" in directory + assert "RegistrationCode" in directory + assert "SubnetIds" in directory + assert "DnsIpAddresses" in directory + assert "CustomerUserName" in directory + assert "IamRoleId" in directory + assert "DirectoryType" in directory + assert "WorkspaceSecurityGroupId" in directory + assert "State" in directory + assert "WorkspaceCreationProperties" in directory + assert "WorkspaceAccessProperties" in directory + assert "Tenancy" in directory + assert "SelfservicePermissions" in directory + assert "SamlProperties" in directory + assert "CertificateBasedAuthProperties" in directory + + +@mock_aws +def test_describe_workspace_directories_with_directory_id(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory( + DirectoryId=directory_id, + EnableWorkDocs=True, + ) + resp = client.describe_workspace_directories(DirectoryIds=[directory_id]) + assert len(resp["Directories"]) == 1 + directory = resp["Directories"][0] + assert directory["DirectoryId"] == directory_id + + +@mock_aws +def test_describe_workspace_directories_with_invalid_directory_id(): + client = boto3.client("workspaces", region_name="eu-west-1") + with pytest.raises(ClientError) as exc: + client.describe_workspace_directories(DirectoryIds=["d-9067f997cx"]) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + + +@mock_aws +def test_modify_workspace_creation_properties(): + client = boto3.client("workspaces", region_name="eu-west-1") + ec2_client = boto3.client("ec2", region_name="eu-west-1") + directory_id = create_directory() + sg = create_security_group(client=ec2_client) + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + client.modify_workspace_creation_properties( + ResourceId=directory_id, + WorkspaceCreationProperties={ + "EnableWorkDocs": False, + "CustomSecurityGroupId": sg["GroupId"], + }, + ) + resp = client.describe_workspace_directories(DirectoryIds=[directory_id]) + directory = resp["Directories"][0] + assert ( + directory["WorkspaceCreationProperties"]["CustomSecurityGroupId"] + == sg["GroupId"] + ) + + +@mock_aws +def test_modify_workspace_creation_properties_invalid_request(): + client = boto3.client("workspaces", region_name="eu-west-1") + ec2_client = boto3.client("ec2", region_name="eu-west-1") + sg = create_security_group(client=ec2_client) + with pytest.raises(ClientError) as exc: + client.modify_workspace_creation_properties( + ResourceId="d-9067f6c44b", # Invalid DirectoryID + WorkspaceCreationProperties={ + "EnableWorkDocs": False, + "CustomSecurityGroupId": sg["GroupId"], + }, + ) + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + + +@mock_aws +def test_create_tags(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory( + DirectoryId=directory_id, + EnableWorkDocs=True, + Tags=[ + {"Key": "foo1", "Value": "bar1"}, + ], + ) + client.create_tags( + ResourceId=directory_id, + Tags=[ + {"Key": "foo2", "Value": "bar2"}, + ], + ) + resp = client.describe_tags(ResourceId=directory_id) + assert resp["TagList"][1]["Key"] == "foo2" + + +@mock_aws +def test_describe_tags(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory( + DirectoryId=directory_id, + EnableWorkDocs=True, + Tags=[ + {"Key": "foo", "Value": "bar"}, + ], + ) + resp = client.describe_tags(ResourceId=directory_id) + assert resp["TagList"][0]["Key"] == "foo" + + +@mock_aws +def test_describe_client_properties(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory( + DirectoryId=directory_id, + EnableWorkDocs=True, + ) + resp = client.describe_client_properties(ResourceIds=[directory_id]) + assert "ClientProperties" in resp["ClientPropertiesList"][0] + + +@mock_aws +def test_modify_client_properties(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory( + DirectoryId=directory_id, + EnableWorkDocs=True, + ) + client.modify_client_properties( + ResourceId=directory_id, + ClientProperties={ + "ReconnectEnabled": "DISABLED", + "LogUploadEnabled": "DISABLED", + }, + ) + resp = client.describe_client_properties(ResourceIds=[directory_id]) + client_properties_list = resp["ClientPropertiesList"][0]["ClientProperties"] + assert client_properties_list["ReconnectEnabled"] == "DISABLED" + assert client_properties_list["LogUploadEnabled"] == "DISABLED" + + +@mock_aws +def test_create_workspace_image(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + workspace = client.create_workspaces( + Workspaces=[ + { + "DirectoryId": directory_id, + "UserName": "Administrator", + "BundleId": "wsb-bh8rsxt14", + }, + ] + ) + workspace_id = workspace["PendingRequests"][0]["WorkspaceId"] + resp = client.create_workspace_image( + Name="test-image", + Description="Test Description for workspace images", + WorkspaceId=workspace_id, + ) + assert "ImageId" in resp + assert "Name" in resp + assert "Description" in resp + assert "State" in resp + assert "RequiredTenancy" in resp + assert "Created" in resp + assert "OwnerAccountId" in resp + + +@mock_aws +def test_create_workspace_image_invalid_workspace(): + client = boto3.client("workspaces", region_name="eu-west-1") + with pytest.raises(ClientError) as exc: + client.create_workspace_image( + Name="test-image", + Description="Invalid workspace id", + WorkspaceId="ws-hbfljyz9x", + ) + err = exc.value.response["Error"] + assert err["Code"] == "ResourceNotFoundException" + + +@mock_aws +def test_create_workspace_image_already_exists(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + workspace = client.create_workspaces( + Workspaces=[ + { + "DirectoryId": directory_id, + "UserName": "Administrator", + "BundleId": "wsb-bh8rsxt14", + }, + ] + ) + workspace_id = workspace["PendingRequests"][0]["WorkspaceId"] + client.create_workspace_image( + Name="test-image", + Description="Test Description for workspace images", + WorkspaceId=workspace_id, + ) + with pytest.raises(ClientError) as exc: + client.create_workspace_image( + Name="test-image", + Description="Image with same name", + WorkspaceId=workspace_id, + ) + err = exc.value.response["Error"] + assert err["Code"] == "ResourceAlreadyExistsException" + + +@mock_aws +def test_describe_workspace_images(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + workspace = client.create_workspaces( + Workspaces=[ + { + "DirectoryId": directory_id, + "UserName": "Administrator", + "BundleId": "wsb-bh8rsxt14", + }, + ] + ) + workspace_id = workspace["PendingRequests"][0]["WorkspaceId"] + image = client.create_workspace_image( + Name="test-image", + Description="Test Description for workspace images", + WorkspaceId=workspace_id, + ) + resp = client.describe_workspace_images(ImageIds=[image["ImageId"]]) + assert "ImageId" in resp["Images"][0] + assert "Name" in resp["Images"][0] + assert "Description" in resp["Images"][0] + assert "State" in resp["Images"][0] + assert "RequiredTenancy" in resp["Images"][0] + assert "Created" in resp["Images"][0] + assert "OwnerAccountId" in resp["Images"][0] + assert "Updates" in resp["Images"][0] + + +@mock_aws +def test_update_workspace_image_permission(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + workspace = client.create_workspaces( + Workspaces=[ + { + "DirectoryId": directory_id, + "UserName": "Administrator", + "BundleId": "wsb-bh8rsxt14", + }, + ] + ) + workspace_id = workspace["PendingRequests"][0]["WorkspaceId"] + image = client.create_workspace_image( + Name="test-image", + Description="Test Description for workspace images", + WorkspaceId=workspace_id, + ) + client.update_workspace_image_permission( + ImageId=image["ImageId"], AllowCopyImage=True, SharedAccountId="111111111111" + ) + resp = client.describe_workspace_image_permissions(ImageId=image["ImageId"]) + assert resp["ImagePermissions"][0]["SharedAccountId"] == "111111111111" + + client.update_workspace_image_permission( + ImageId=image["ImageId"], AllowCopyImage=False, SharedAccountId="111111111111" + ) + resp = client.describe_workspace_image_permissions(ImageId=image["ImageId"]) + assert len(resp["ImagePermissions"]) == 0 + + +@mock_aws +def test_describe_workspace_image_permissions(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + workspace = client.create_workspaces( + Workspaces=[ + { + "DirectoryId": directory_id, + "UserName": "Administrator", + "BundleId": "wsb-bh8rsxt14", + }, + ] + ) + workspace_id = workspace["PendingRequests"][0]["WorkspaceId"] + image = client.create_workspace_image( + Name="test-image", + Description="Test Description for workspace images", + WorkspaceId=workspace_id, + ) + client.update_workspace_image_permission( + ImageId=image["ImageId"], AllowCopyImage=True, SharedAccountId="111111111111" + ) + resp = client.describe_workspace_image_permissions(ImageId=image["ImageId"]) + assert resp["ImageId"] == image["ImageId"] + assert resp["ImagePermissions"][0]["SharedAccountId"] == "111111111111" + + +@mock_aws +def test_describe_workspace_image_permissions_with_invalid_image_id(): + client = boto3.client("workspaces", region_name="eu-west-1") + with pytest.raises(ClientError) as exc: + client.describe_workspace_image_permissions(ImageId="foo") + err = exc.value.response["Error"] + assert err["Code"] == "ValidationException" + + +@mock_aws +def test_deregister_workspace_directory(): + client = boto3.client("workspaces", region_name="eu-west-1") + directory_id = create_directory() + client.register_workspace_directory(DirectoryId=directory_id, EnableWorkDocs=False) + resp = client.describe_workspace_directories(DirectoryIds=[directory_id]) + assert len(resp["Directories"]) > 0 + client.deregister_workspace_directory(DirectoryId=directory_id) + resp = client.describe_workspace_directories(DirectoryIds=[directory_id]) + assert len(resp["Directories"]) == 0 + + +@mock_aws +def test_modify_selfservice_permissions(): + client = boto3.client("workspaces", region_name="eu-west-1") + + directory_id = create_directory() + client.register_workspace_directory( + DirectoryId=directory_id, + EnableWorkDocs=True, + ) + resp = client.describe_workspace_directories(DirectoryIds=[directory_id]) + assert ( + resp["Directories"][0]["SelfservicePermissions"]["IncreaseVolumeSize"] + == "DISABLED" + ) + client.modify_selfservice_permissions( + ResourceId=directory_id, + SelfservicePermissions={ + "RestartWorkspace": "ENABLED", + "IncreaseVolumeSize": "ENABLED", + "ChangeComputeType": "ENABLED", + "SwitchRunningMode": "ENABLED", + "RebuildWorkspace": "ENABLED", + }, + ) + resp = client.describe_workspace_directories(DirectoryIds=[directory_id]) + assert ( + resp["Directories"][0]["SelfservicePermissions"]["IncreaseVolumeSize"] + == "ENABLED" + )