Feature: OpenSearch (#6128)

This commit is contained in:
Bert Blommers 2023-03-26 12:43:28 +00:00 committed by GitHub
parent 638171a9e1
commit e0726f2d63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1003 additions and 4 deletions

View File

@ -4509,6 +4509,64 @@
- [ ] stop_db_cluster
</details>
## opensearch
<details>
<summary>17% implemented</summary>
- [ ] accept_inbound_connection
- [X] add_tags
- [ ] associate_package
- [ ] authorize_vpc_endpoint_access
- [ ] cancel_service_software_update
- [X] create_domain
- [ ] create_outbound_connection
- [ ] create_package
- [ ] create_vpc_endpoint
- [X] delete_domain
- [ ] delete_inbound_connection
- [ ] delete_outbound_connection
- [ ] delete_package
- [ ] delete_vpc_endpoint
- [X] describe_domain
- [ ] describe_domain_auto_tunes
- [ ] describe_domain_change_progress
- [X] describe_domain_config
- [ ] describe_domains
- [ ] describe_dry_run_progress
- [ ] describe_inbound_connections
- [ ] describe_instance_type_limits
- [ ] describe_outbound_connections
- [ ] describe_packages
- [ ] describe_reserved_instance_offerings
- [ ] describe_reserved_instances
- [ ] describe_vpc_endpoints
- [ ] dissociate_package
- [X] get_compatible_versions
- [ ] get_package_version_history
- [ ] get_upgrade_history
- [ ] get_upgrade_status
- [ ] list_domain_names
- [ ] list_domains_for_package
- [ ] list_instance_type_details
- [ ] list_packages_for_domain
- [ ] list_scheduled_actions
- [X] list_tags
- [ ] list_versions
- [ ] list_vpc_endpoint_access
- [ ] list_vpc_endpoints
- [ ] list_vpc_endpoints_for_domain
- [ ] purchase_reserved_instance_offering
- [ ] reject_inbound_connection
- [X] remove_tags
- [ ] revoke_vpc_endpoint_access
- [ ] start_service_software_update
- [X] update_domain_config
- [ ] update_package
- [ ] update_scheduled_action
- [ ] update_vpc_endpoint
- [ ] upgrade_domain
</details>
## opsworks
<details>
<summary>12% implemented</summary>
@ -6918,7 +6976,6 @@
- nimble
- oam
- omics
- opensearch
- opensearchserverless
- opsworkscm
- outposts

View File

@ -0,0 +1,82 @@
.. _implementedservice_opensearch:
.. |start-h3| raw:: html
<h3>
.. |end-h3| raw:: html
</h3>
==========
opensearch
==========
.. autoclass:: moto.opensearch.models.OpenSearchServiceBackend
|start-h3| Example usage |end-h3|
.. sourcecode:: python
@mock_opensearch
def test_opensearch_behaviour:
boto3.client("opensearch")
...
|start-h3| Implemented features for this service |end-h3|
- [ ] accept_inbound_connection
- [X] add_tags
- [ ] associate_package
- [ ] authorize_vpc_endpoint_access
- [ ] cancel_service_software_update
- [X] create_domain
- [ ] create_outbound_connection
- [ ] create_package
- [ ] create_vpc_endpoint
- [X] delete_domain
- [ ] delete_inbound_connection
- [ ] delete_outbound_connection
- [ ] delete_package
- [ ] delete_vpc_endpoint
- [X] describe_domain
- [ ] describe_domain_auto_tunes
- [ ] describe_domain_change_progress
- [X] describe_domain_config
- [ ] describe_domains
- [ ] describe_dry_run_progress
- [ ] describe_inbound_connections
- [ ] describe_instance_type_limits
- [ ] describe_outbound_connections
- [ ] describe_packages
- [ ] describe_reserved_instance_offerings
- [ ] describe_reserved_instances
- [ ] describe_vpc_endpoints
- [ ] dissociate_package
- [X] get_compatible_versions
- [ ] get_package_version_history
- [ ] get_upgrade_history
- [ ] get_upgrade_status
- [ ] list_domain_names
- [ ] list_domains_for_package
- [ ] list_instance_type_details
- [ ] list_packages_for_domain
- [ ] list_scheduled_actions
- [X] list_tags
- [ ] list_versions
- [ ] list_vpc_endpoint_access
- [ ] list_vpc_endpoints
- [ ] list_vpc_endpoints_for_domain
- [ ] purchase_reserved_instance_offering
- [ ] reject_inbound_connection
- [X] remove_tags
- [ ] revoke_vpc_endpoint_access
- [ ] start_service_software_update
- [X] update_domain_config
- [ ] update_package
- [ ] update_scheduled_action
- [ ] update_vpc_endpoint
- [ ] upgrade_domain

View File

@ -114,6 +114,7 @@ mock_mediastoredata = lazy_load(
mock_meteringmarketplace = lazy_load(".meteringmarketplace", "mock_meteringmarketplace")
mock_mq = lazy_load(".mq", "mock_mq")
mock_neptune = lazy_load(".rds", "mock_rds", boto3_name="neptune")
mock_opensearch = lazy_load(".opensearch", "mock_opensearch")
mock_opsworks = lazy_load(".opsworks", "mock_opsworks")
mock_organizations = lazy_load(".organizations", "mock_organizations")
mock_personalize = lazy_load(".personalize", "mock_personalize")

View File

@ -1,4 +1,5 @@
from .responses import ElasticsearchServiceResponse
from moto.opensearch.responses import OpenSearchServiceResponse
url_bases = [
r"https?://es\.(.+)\.amazonaws\.com",
@ -9,4 +10,10 @@ url_paths = {
"{0}/2015-01-01/domain$": ElasticsearchServiceResponse.list_domains,
"{0}/2015-01-01/es/domain$": ElasticsearchServiceResponse.domains,
"{0}/2015-01-01/es/domain/(?P<domainname>[^/]+)": ElasticsearchServiceResponse.domain,
"{0}/2021-01-01/opensearch/compatibleVersions": OpenSearchServiceResponse.dispatch,
"{0}/2021-01-01/opensearch/domain": OpenSearchServiceResponse.dispatch,
"{0}/2021-01-01/opensearch/domain/(?P<domainname>[^/]+)": OpenSearchServiceResponse.dispatch,
"{0}/2021-01-01/opensearch/domain/(?P<domainname>[^/]+)/config": OpenSearchServiceResponse.dispatch,
"{0}/2021-01-01/tags/": OpenSearchServiceResponse.dispatch,
"{0}/2021-01-01/tags-removal/": OpenSearchServiceResponse.dispatch,
}

View File

@ -1788,6 +1788,8 @@ class IAMBackend(BaseBackend):
self.tagger = TaggingService()
self.initialize_service_roles()
def _init_aws_policies(self) -> List[ManagedPolicy]:
# AWS defines some of its own managed policies and we periodically
# import them via `make aws_managed_policies`
@ -1808,6 +1810,16 @@ class IAMBackend(BaseBackend):
self.__dict__ = {}
self.__init__(region_name, account_id, aws_policies) # type: ignore[misc]
def initialize_service_roles(self) -> None:
pass
# TODO: This role is required for some TF tests to work
# Enabling it breaks an assumption that no roles exist unless created by the user
# Our tests, and probably users' tests, rely on this assumption
# Maybe we can enable this (and roles for other services) as part of a major release
# self.create_service_linked_role(
# service_name="opensearchservice.amazonaws.com", suffix="", description=""
# )
def attach_role_policy(self, policy_arn: str, role_name: str) -> None:
arns = dict((p.arn, p) for p in self.managed_policies.values())
policy = arns[policy_arn]
@ -3244,7 +3256,7 @@ class IAMBackend(BaseBackend):
permissions_boundary=None,
description=description,
tags=[],
max_session_duration=None,
max_session_duration="3600",
linked_service=service_name,
)

View File

@ -0,0 +1,5 @@
"""opensearch module initialization; sets value for base decorator."""
from .models import opensearch_backends
from ..core.models import base_decorator
mock_opensearch = base_decorator(opensearch_backends)

155
moto/opensearch/data.py Normal file
View File

@ -0,0 +1,155 @@
compatible_versions = [
{
"SourceVersion": "Elasticsearch_7.7",
"TargetVersions": [
"Elasticsearch_7.8",
"Elasticsearch_7.9",
"Elasticsearch_7.10",
"OpenSearch_1.0",
"OpenSearch_1.1",
"OpenSearch_1.2",
"OpenSearch_1.3",
],
},
{
"SourceVersion": "Elasticsearch_6.8",
"TargetVersions": [
"Elasticsearch_7.1",
"Elasticsearch_7.4",
"Elasticsearch_7.7",
"Elasticsearch_7.8",
"Elasticsearch_7.9",
"Elasticsearch_7.10",
"OpenSearch_1.0",
"OpenSearch_1.1",
"OpenSearch_1.2",
"OpenSearch_1.3",
],
},
{
"SourceVersion": "Elasticsearch_7.8",
"TargetVersions": [
"Elasticsearch_7.9",
"Elasticsearch_7.10",
"OpenSearch_1.0",
"OpenSearch_1.1",
"OpenSearch_1.2",
"OpenSearch_1.3",
],
},
{
"SourceVersion": "Elasticsearch_7.9",
"TargetVersions": [
"Elasticsearch_7.10",
"OpenSearch_1.0",
"OpenSearch_1.1",
"OpenSearch_1.2",
"OpenSearch_1.3",
],
},
{
"SourceVersion": "Elasticsearch_7.10",
"TargetVersions": [
"OpenSearch_1.0",
"OpenSearch_1.1",
"OpenSearch_1.2",
"OpenSearch_1.3",
],
},
{"SourceVersion": "OpenSearch_2.3", "TargetVersions": ["OpenSearch_2.5"]},
{
"SourceVersion": "OpenSearch_1.0",
"TargetVersions": ["OpenSearch_1.1", "OpenSearch_1.2", "OpenSearch_1.3"],
},
{
"SourceVersion": "OpenSearch_1.1",
"TargetVersions": ["OpenSearch_1.2", "OpenSearch_1.3"],
},
{"SourceVersion": "OpenSearch_1.2", "TargetVersions": ["OpenSearch_1.3"]},
{
"SourceVersion": "OpenSearch_1.3",
"TargetVersions": ["OpenSearch_2.3", "OpenSearch_2.5"],
},
{
"SourceVersion": "Elasticsearch_6.0",
"TargetVersions": [
"Elasticsearch_6.3",
"Elasticsearch_6.4",
"Elasticsearch_6.5",
"Elasticsearch_6.7",
"Elasticsearch_6.8",
],
},
{"SourceVersion": "Elasticsearch_5.1", "TargetVersions": ["Elasticsearch_5.6"]},
{
"SourceVersion": "Elasticsearch_7.1",
"TargetVersions": [
"Elasticsearch_7.4",
"Elasticsearch_7.7",
"Elasticsearch_7.8",
"Elasticsearch_7.9",
"Elasticsearch_7.10",
"OpenSearch_1.0",
"OpenSearch_1.1",
"OpenSearch_1.2",
"OpenSearch_1.3",
],
},
{
"SourceVersion": "Elasticsearch_6.2",
"TargetVersions": [
"Elasticsearch_6.3",
"Elasticsearch_6.4",
"Elasticsearch_6.5",
"Elasticsearch_6.7",
"Elasticsearch_6.8",
],
},
{"SourceVersion": "Elasticsearch_5.3", "TargetVersions": ["Elasticsearch_5.6"]},
{
"SourceVersion": "Elasticsearch_6.3",
"TargetVersions": [
"Elasticsearch_6.4",
"Elasticsearch_6.5",
"Elasticsearch_6.7",
"Elasticsearch_6.8",
],
},
{
"SourceVersion": "Elasticsearch_6.4",
"TargetVersions": [
"Elasticsearch_6.5",
"Elasticsearch_6.7",
"Elasticsearch_6.8",
],
},
{"SourceVersion": "Elasticsearch_5.5", "TargetVersions": ["Elasticsearch_5.6"]},
{
"SourceVersion": "Elasticsearch_7.4",
"TargetVersions": [
"Elasticsearch_7.7",
"Elasticsearch_7.8",
"Elasticsearch_7.9",
"Elasticsearch_7.10",
"OpenSearch_1.0",
"OpenSearch_1.1",
"OpenSearch_1.2",
"OpenSearch_1.3",
],
},
{
"SourceVersion": "Elasticsearch_6.5",
"TargetVersions": ["Elasticsearch_6.7", "Elasticsearch_6.8"],
},
{
"SourceVersion": "Elasticsearch_5.6",
"TargetVersions": [
"Elasticsearch_6.3",
"Elasticsearch_6.4",
"Elasticsearch_6.5",
"Elasticsearch_6.7",
"Elasticsearch_6.8",
],
},
{"SourceVersion": "Elasticsearch_6.7", "TargetVersions": ["Elasticsearch_6.8"]},
]

View File

@ -0,0 +1,7 @@
"""Exceptions raised by the opensearch service."""
from moto.core.exceptions import JsonRESTError
class ResourceNotFoundException(JsonRESTError):
def __init__(self, name: str):
super().__init__("ResourceNotFoundException", f"Domain not found: {name}")

331
moto/opensearch/models.py Normal file
View File

@ -0,0 +1,331 @@
from typing import Any, Dict, List, Optional
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.utilities.tagging_service import TaggingService
from .data import compatible_versions
from .exceptions import ResourceNotFoundException
default_cluster_config = {
"InstanceType": "t3.small.search",
"InstanceCount": 1,
"DedicatedMasterEnabled": False,
"ZoneAwarenessEnabled": False,
"WarmEnabled": False,
"ColdStorageOptions": {"Enabled": False},
}
default_advanced_security_options = {
"Enabled": False,
"InternalUserDatabaseEnabled": False,
"AnonymousAuthEnabled": False,
}
default_domain_endpoint_options = {
"EnforceHTTPS": False,
"TLSSecurityPolicy": "Policy-Min-TLS-1-0-2019-07",
"CustomEndpointEnabled": False,
}
default_software_update_options = {
"CurrentVersion": "",
"NewVersion": "",
"UpdateAvailable": False,
"Cancellable": False,
"UpdateStatus": "COMPLETED",
"Description": "There is no software update available for this domain.",
"AutomatedUpdateDate": "1969-12-31T23:00:00-01:00",
"OptionalDeployment": True,
}
default_advanced_options = {
"override_main_response_version": "false",
"rest.action.multi.allow_explicit_index": "true",
}
class OpenSearchDomain(BaseModel):
def __init__(
self,
account_id: str,
region: str,
domain_name: str,
engine_version: str,
cluster_config: Dict[str, Any],
ebs_options: Dict[str, Any],
access_policies: str,
snapshot_options: Dict[str, int],
vpc_options: Dict[str, List[str]],
cognito_options: Dict[str, Any],
encryption_at_rest_options: Dict[str, Any],
node_to_node_encryption_options: Dict[str, bool],
advanced_options: Dict[str, str],
log_publishing_options: Dict[str, Any],
domain_endpoint_options: Dict[str, Any],
advanced_security_options: Dict[str, Any],
auto_tune_options: Dict[str, Any],
off_peak_window_options: Dict[str, Any],
software_update_options: Dict[str, bool],
):
self.domain_id = f"{account_id}/{domain_name}"
self.domain_name = domain_name
self.arn = f"arn:aws:es:{region}:{account_id}:domain/{domain_name}"
self.engine_version = engine_version or "OpenSearch 2.5"
self.cluster_config = cluster_config or {}
self.ebs_options = ebs_options or {"EBSEnabled": False}
self.access_policies = access_policies or ""
self.snapshot_options = snapshot_options or {"AutomatedSnapshotStartHour": 0}
self.vpc_options = vpc_options
self.cognito_options = cognito_options or {"Enabled": False}
self.encryption_at_rest_options = encryption_at_rest_options or {
"Enabled": False
}
self.node_to_node_encryption_options = node_to_node_encryption_options or {
"Enabled": False
}
self.advanced_options = advanced_options or default_advanced_options
self.log_publishing_options = log_publishing_options
self.domain_endpoint_options = (
domain_endpoint_options or default_domain_endpoint_options
)
self.advanced_security_options = (
advanced_security_options or default_advanced_security_options
)
self.auto_tune_options = auto_tune_options or {"State": "ENABLE_IN_PROGRESS"}
self.off_peak_windows_options = off_peak_window_options
self.software_update_options = (
software_update_options or default_software_update_options
)
self.deleted = False
self.processing = False
# Defaults
for key, value in default_cluster_config.items():
if key not in self.cluster_config:
self.cluster_config[key] = value
if self.vpc_options is None:
self.endpoint: Optional[str] = f"{domain_name}.{region}.es.amazonaws.com"
self.endpoints: Optional[Dict[str, str]] = None
else:
self.endpoint = None
self.endpoints = {"vpc": f"{domain_name}.{region}.es.amazonaws.com"}
def delete(self) -> None:
self.deleted = True
self.processing = True
def dct_options(self) -> Dict[str, Any]:
return {
"Endpoint": self.endpoint,
"Endpoints": self.endpoints,
"EngineVersion": self.engine_version,
"ClusterConfig": self.cluster_config,
"EBSOptions": self.ebs_options,
"AccessPolicies": self.access_policies,
"SnapshotOptions": self.snapshot_options,
"VPCOptions": self.vpc_options,
"CognitoOptions": self.cognito_options,
"EncryptionAtRestOptions": self.encryption_at_rest_options,
"NodeToNodeEncryptionOptions": self.node_to_node_encryption_options,
"AdvancedOptions": self.advanced_options,
"LogPublishingOptions": self.log_publishing_options,
"DomainEndpointOptions": self.domain_endpoint_options,
"AdvancedSecurityOptions": self.advanced_security_options,
"AutoTuneOptions": self.auto_tune_options,
"OffPeakWindowsOptions": self.off_peak_windows_options,
"SoftwareUpdateOptions": self.software_update_options,
}
def to_dict(self) -> Dict[str, Any]:
dct = {
"DomainId": self.domain_id,
"DomainName": self.domain_name,
"ARN": self.arn,
"Created": True,
"Deleted": self.deleted,
"Processing": self.processing,
"UpgradeProcessing": False,
}
for key, value in self.dct_options().items():
if value is not None:
dct[key] = value
return dct
def to_config_dict(self) -> Dict[str, Any]:
dct: Dict[str, Any] = dict()
for key, value in self.dct_options().items():
if value is not None:
dct[key] = {"Options": value}
return dct
def update(
self,
cluster_config: Dict[str, Any],
ebs_options: Dict[str, Any],
access_policies: str,
snapshot_options: Dict[str, int],
vpc_options: Dict[str, List[str]],
cognito_options: Dict[str, Any],
encryption_at_rest_options: Dict[str, Any],
node_to_node_encryption_options: Dict[str, bool],
advanced_options: Dict[str, str],
log_publishing_options: Dict[str, Any],
domain_endpoint_options: Dict[str, Any],
advanced_security_options: Dict[str, Any],
auto_tune_options: Dict[str, Any],
off_peak_window_options: Dict[str, Any],
software_update_options: Dict[str, bool],
) -> None:
self.cluster_config = cluster_config or self.cluster_config
self.ebs_options = ebs_options or self.ebs_options
self.access_policies = access_policies or self.access_policies
self.snapshot_options = snapshot_options or self.snapshot_options
self.vpc_options = vpc_options or self.vpc_options
self.cognito_options = cognito_options or self.cognito_options
self.encryption_at_rest_options = (
encryption_at_rest_options or self.encryption_at_rest_options
)
self.node_to_node_encryption_options = (
node_to_node_encryption_options or self.node_to_node_encryption_options
)
self.advanced_options = advanced_options or self.advanced_options
self.log_publishing_options = (
log_publishing_options or self.log_publishing_options
)
self.domain_endpoint_options = (
domain_endpoint_options or self.domain_endpoint_options
)
self.advanced_security_options = (
advanced_security_options or self.advanced_security_options
)
self.auto_tune_options = auto_tune_options or self.auto_tune_options
self.off_peak_windows_options = (
off_peak_window_options or self.off_peak_windows_options
)
self.software_update_options = (
software_update_options or self.software_update_options
)
class OpenSearchServiceBackend(BaseBackend):
"""Implementation of OpenSearchService APIs."""
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.domains: Dict[str, OpenSearchDomain] = dict()
self.tagger = TaggingService()
def create_domain(
self,
domain_name: str,
engine_version: str,
cluster_config: Dict[str, Any],
ebs_options: Dict[str, Any],
access_policies: str,
snapshot_options: Dict[str, Any],
vpc_options: Dict[str, Any],
cognito_options: Dict[str, Any],
encryption_at_rest_options: Dict[str, Any],
node_to_node_encryption_options: Dict[str, Any],
advanced_options: Dict[str, Any],
log_publishing_options: Dict[str, Any],
domain_endpoint_options: Dict[str, Any],
advanced_security_options: Dict[str, Any],
tag_list: List[Dict[str, str]],
auto_tune_options: Dict[str, Any],
off_peak_window_options: Dict[str, Any],
software_update_options: Dict[str, Any],
) -> OpenSearchDomain:
domain = OpenSearchDomain(
account_id=self.account_id,
region=self.region_name,
domain_name=domain_name,
engine_version=engine_version,
cluster_config=cluster_config,
ebs_options=ebs_options,
access_policies=access_policies,
snapshot_options=snapshot_options,
vpc_options=vpc_options,
cognito_options=cognito_options,
encryption_at_rest_options=encryption_at_rest_options,
node_to_node_encryption_options=node_to_node_encryption_options,
advanced_options=advanced_options,
log_publishing_options=log_publishing_options,
domain_endpoint_options=domain_endpoint_options,
advanced_security_options=advanced_security_options,
auto_tune_options=auto_tune_options,
off_peak_window_options=off_peak_window_options,
software_update_options=software_update_options,
)
self.domains[domain_name] = domain
if tag_list:
self.add_tags(domain.arn, tag_list)
return domain
def get_compatible_versions(self, domain_name: str) -> List[Dict[str, Any]]:
if domain_name not in self.domains:
raise ResourceNotFoundException(domain_name)
return compatible_versions
def delete_domain(self, domain_name: str) -> OpenSearchDomain:
if domain_name not in self.domains:
raise ResourceNotFoundException(domain_name)
self.domains[domain_name].delete()
return self.domains.pop(domain_name)
def describe_domain(self, domain_name: str) -> OpenSearchDomain:
if domain_name not in self.domains:
raise ResourceNotFoundException(domain_name)
return self.domains[domain_name]
def describe_domain_config(self, domain_name: str) -> OpenSearchDomain:
return self.describe_domain(domain_name)
def update_domain_config(
self,
domain_name: str,
cluster_config: Dict[str, Any],
ebs_options: Dict[str, Any],
access_policies: str,
snapshot_options: Dict[str, Any],
vpc_options: Dict[str, Any],
cognito_options: Dict[str, Any],
encryption_at_rest_options: Dict[str, Any],
node_to_node_encryption_options: Dict[str, Any],
advanced_options: Dict[str, Any],
log_publishing_options: Dict[str, Any],
domain_endpoint_options: Dict[str, Any],
advanced_security_options: Dict[str, Any],
auto_tune_options: Dict[str, Any],
off_peak_window_options: Dict[str, Any],
software_update_options: Dict[str, Any],
) -> OpenSearchDomain:
domain = self.describe_domain(domain_name)
domain.update(
cluster_config=cluster_config,
ebs_options=ebs_options,
access_policies=access_policies,
snapshot_options=snapshot_options,
vpc_options=vpc_options,
cognito_options=cognito_options,
encryption_at_rest_options=encryption_at_rest_options,
node_to_node_encryption_options=node_to_node_encryption_options,
advanced_options=advanced_options,
log_publishing_options=log_publishing_options,
domain_endpoint_options=domain_endpoint_options,
advanced_security_options=advanced_security_options,
auto_tune_options=auto_tune_options,
off_peak_window_options=off_peak_window_options,
software_update_options=software_update_options,
)
return domain
def add_tags(self, arn: str, tags: List[Dict[str, str]]) -> None:
self.tagger.tag_resource(arn, tags)
def list_tags(self, arn: str) -> List[Dict[str, str]]:
return self.tagger.list_tags_for_resource(arn)["Tags"]
def remove_tags(self, arn: str, tag_keys: List[str]) -> None:
self.tagger.untag_resource_using_names(arn, tag_keys)
opensearch_backends = BackendDict(OpenSearchServiceBackend, "opensearch")

View File

@ -0,0 +1,140 @@
"""Handles incoming opensearch requests, invokes methods, returns responses."""
import json
from moto.core.responses import BaseResponse
from .models import opensearch_backends, OpenSearchServiceBackend
class OpenSearchServiceResponse(BaseResponse):
"""Handler for OpenSearchService requests and responses."""
def __init__(self) -> None:
super().__init__(service_name="opensearch")
@property
def opensearch_backend(self) -> OpenSearchServiceBackend:
"""Return backend instance specific for this region."""
return opensearch_backends[self.current_account][self.region]
def create_domain(self) -> str:
domain_name = self._get_param("DomainName")
engine_version = self._get_param("EngineVersion")
cluster_config = self._get_param("ClusterConfig")
ebs_options = self._get_param("EBSOptions")
access_policies = self._get_param("AccessPolicies")
snapshot_options = self._get_param("SnapshotOptions")
vpc_options = self._get_param("VPCOptions")
cognito_options = self._get_param("CognitoOptions")
encryption_at_rest_options = self._get_param("EncryptionAtRestOptions")
node_to_node_encryption_options = self._get_param("NodeToNodeEncryptionOptions")
advanced_options = self._get_param("AdvancedOptions")
log_publishing_options = self._get_param("LogPublishingOptions")
domain_endpoint_options = self._get_param("DomainEndpointOptions")
advanced_security_options = self._get_param("AdvancedSecurityOptions")
tag_list = self._get_param("TagList")
auto_tune_options = self._get_param("AutoTuneOptions")
off_peak_window_options = self._get_param("OffPeakWindowOptions")
software_update_options = self._get_param("SoftwareUpdateOptions")
domain = self.opensearch_backend.create_domain(
domain_name=domain_name,
engine_version=engine_version,
cluster_config=cluster_config,
ebs_options=ebs_options,
access_policies=access_policies,
snapshot_options=snapshot_options,
vpc_options=vpc_options,
cognito_options=cognito_options,
encryption_at_rest_options=encryption_at_rest_options,
node_to_node_encryption_options=node_to_node_encryption_options,
advanced_options=advanced_options,
log_publishing_options=log_publishing_options,
domain_endpoint_options=domain_endpoint_options,
advanced_security_options=advanced_security_options,
tag_list=tag_list,
auto_tune_options=auto_tune_options,
off_peak_window_options=off_peak_window_options,
software_update_options=software_update_options,
)
return json.dumps(dict(DomainStatus=domain.to_dict()))
def get_compatible_versions(self) -> str:
domain_name = self._get_param("domainName")
compatible_versions = self.opensearch_backend.get_compatible_versions(
domain_name=domain_name,
)
return json.dumps(dict(CompatibleVersions=compatible_versions))
def delete_domain(self) -> str:
domain_name = self._get_param("DomainName")
domain = self.opensearch_backend.delete_domain(
domain_name=domain_name,
)
return json.dumps(dict(DomainStatus=domain.to_dict()))
def describe_domain(self) -> str:
domain_name = self._get_param("DomainName")
domain = self.opensearch_backend.describe_domain(
domain_name=domain_name,
)
return json.dumps(dict(DomainStatus=domain.to_dict()))
def describe_domain_config(self) -> str:
domain_name = self._get_param("DomainName")
domain = self.opensearch_backend.describe_domain_config(
domain_name=domain_name,
)
return json.dumps(dict(DomainConfig=domain.to_config_dict()))
def update_domain_config(self) -> str:
domain_name = self._get_param("DomainName")
cluster_config = self._get_param("ClusterConfig")
ebs_options = self._get_param("EBSOptions")
access_policies = self._get_param("AccessPolicies")
snapshot_options = self._get_param("SnapshotOptions")
vpc_options = self._get_param("VPCOptions")
cognito_options = self._get_param("CognitoOptions")
encryption_at_rest_options = self._get_param("EncryptionAtRestOptions")
node_to_node_encryption_options = self._get_param("NodeToNodeEncryptionOptions")
advanced_options = self._get_param("AdvancedOptions")
log_publishing_options = self._get_param("LogPublishingOptions")
domain_endpoint_options = self._get_param("DomainEndpointOptions")
advanced_security_options = self._get_param("AdvancedSecurityOptions")
auto_tune_options = self._get_param("AutoTuneOptions")
off_peak_window_options = self._get_param("OffPeakWindowOptions")
software_update_options = self._get_param("SoftwareUpdateOptions")
domain = self.opensearch_backend.update_domain_config(
domain_name=domain_name,
cluster_config=cluster_config,
ebs_options=ebs_options,
access_policies=access_policies,
snapshot_options=snapshot_options,
vpc_options=vpc_options,
cognito_options=cognito_options,
encryption_at_rest_options=encryption_at_rest_options,
node_to_node_encryption_options=node_to_node_encryption_options,
advanced_options=advanced_options,
log_publishing_options=log_publishing_options,
domain_endpoint_options=domain_endpoint_options,
advanced_security_options=advanced_security_options,
auto_tune_options=auto_tune_options,
off_peak_window_options=off_peak_window_options,
software_update_options=software_update_options,
)
return json.dumps(dict(DomainConfig=domain.to_config_dict()))
def list_tags(self) -> str:
arn = self._get_param("arn")
tags = self.opensearch_backend.list_tags(arn)
return json.dumps({"TagList": tags})
def add_tags(self) -> str:
arn = self._get_param("ARN")
tags = self._get_param("TagList")
self.opensearch_backend.add_tags(arn, tags)
return "{}"
def remove_tags(self) -> str:
arn = self._get_param("ARN")
tag_keys = self._get_param("TagKeys")
self.opensearch_backend.remove_tags(arn, tag_keys)
return "{}"

10
moto/opensearch/urls.py Normal file
View File

@ -0,0 +1,10 @@
"""opensearch base URL and path."""
from .responses import OpenSearchServiceResponse
url_bases = [r"https?://es\.(.+)\.amazonaws\.com"]
response = OpenSearchServiceResponse()
url_paths = {"{0}/.*$": response.dispatch}

View File

@ -18,7 +18,9 @@ output_path = os.path.join(script_dir, "..", output_file)
# Ignore the MotoAPI and InstanceMetadata backend, as they do not represent AWS services
# Ignore the APIGatewayV2, as it's URL's are managed by APIGateway
# Ignore S3bucket_path, as the functionality is covered in the S3 service
IGNORE_BACKENDS = ["moto_api", "instance_metadata", "apigatewayv2", "s3bucket_path", "neptune"]
# Ignore neptune, as it shares a URL with RDS
# Ignore OpenSearch, as it shares a URL with ElasticSearch
IGNORE_BACKENDS = ["moto_api", "instance_metadata", "apigatewayv2", "s3bucket_path", "neptune", "opensearch"]
def iter_backend_url_patterns():

View File

@ -235,7 +235,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/f*,moto/g*,moto/i*,moto/k*,moto/l*,moto/moto_api,moto/neptune
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/f*,moto/g*,moto/i*,moto/k*,moto/l*,moto/moto_api,moto/neptune,moto/opensearch
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract

View File

@ -368,6 +368,22 @@ neptune:
- TestAccNeptuneCluster_tags
- TestAccNeptuneCluster_disappears
- TestAccNeptuneGlobalCluster_basic
opensearch:
- TestAccOpenSearchDomain_basic
- TestAccOpenSearchDomain_LogPublishingOptions_searchSlowLogs
- TestAccOpenSearchDomain_AdvancedSecurityOptions
- TestAccOpenSearchDomain_autoTuneOptions
- TestAccOpenSearchDomain_v23
- TestAccOpenSearchDomain_Cluster_
- TestAccOpenSearchDomain_disappears
- TestAccOpenSearchDomain_VolumeType
- TestAccOpenSearchDomain_VPC_
- TestAccOpenSearchDomain_Encryption_
- TestAccOpenSearchDomain_LogPublishingOptions_
- TestAccOpenSearchDomain_Policy_
- TestAccOpenSearchDomain_requireHTTPS
- TestAccOpenSearchDomain_tags
- TestAccOpenSearchDomain_customEndpoint
quicksight:
- TestAccQuickSightUser
- TestAccQuickSightGroup_

View File

View File

@ -0,0 +1,33 @@
import boto3
from moto import mock_opensearch
@mock_opensearch
def test_create_without_tags():
client = boto3.client("opensearch", region_name="eu-north-1")
arn = client.create_domain(DomainName="testdn")["DomainStatus"]["ARN"]
assert client.list_tags(ARN=arn)["TagList"] == []
@mock_opensearch
def test_create_with_tags():
client = boto3.client("opensearch", region_name="eu-north-1")
domain = client.create_domain(
DomainName="testdn", TagList=[{"Key": "k1", "Value": "v1"}]
)
arn = domain["DomainStatus"]["ARN"]
assert client.list_tags(ARN=arn)["TagList"] == [{"Key": "k1", "Value": "v1"}]
client.add_tags(ARN=arn, TagList=[{"Key": "k2", "Value": "v2"}])
assert client.list_tags(ARN=arn)["TagList"] == [
{"Key": "k1", "Value": "v1"},
{"Key": "k2", "Value": "v2"},
]
client.remove_tags(ARN=arn, TagKeys=["k1"])
assert client.list_tags(ARN=arn)["TagList"] == [{"Key": "k2", "Value": "v2"}]

View File

@ -0,0 +1,141 @@
import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_opensearch
# See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
@mock_opensearch
def test_create_domain__minimal_options():
client = boto3.client("opensearch", region_name="eu-west-1")
status = client.create_domain(DomainName="testdn")["DomainStatus"]
assert "DomainId" in status
assert "DomainName" in status
assert status["DomainName"] == "testdn"
assert status["Endpoint"] is not None
assert "Endpoints" not in status
@mock_opensearch
def test_create_domain_in_vpc():
client = boto3.client("opensearch", region_name="eu-west-1")
status = client.create_domain(
DomainName="testdn", VPCOptions={"SubnetIds": ["sub1"]}
)["DomainStatus"]
assert "DomainId" in status
assert "DomainName" in status
assert status["DomainName"] == "testdn"
assert "Endpoint" not in status
assert status["Endpoints"] is not None
@mock_opensearch
def test_create_domain_with_some_options():
client = boto3.client("opensearch", region_name="eu-north-1")
status = client.create_domain(
DomainName="testdn",
DomainEndpointOptions={
"CustomEndpointEnabled": False,
"EnforceHTTPS": True,
"TLSSecurityPolicy": "Policy-Min-TLS-1-0-2019-07",
},
EBSOptions={"EBSEnabled": True, "VolumeSize": 10},
SnapshotOptions={"AutomatedSnapshotStartHour": 20},
EngineVersion="OpenSearch_1.1",
)["DomainStatus"]
assert status["Created"]
assert status["EngineVersion"] == "OpenSearch_1.1"
assert status["DomainEndpointOptions"] == {
"EnforceHTTPS": True,
"TLSSecurityPolicy": "Policy-Min-TLS-1-0-2019-07",
"CustomEndpointEnabled": False,
}
assert status["EBSOptions"] == {"EBSEnabled": True, "VolumeSize": 10}
assert status["SnapshotOptions"] == {"AutomatedSnapshotStartHour": 20}
@mock_opensearch
def test_get_compatible_versions():
client = boto3.client("opensearch", region_name="us-east-2")
client.create_domain(DomainName="testdn")
versions = client.get_compatible_versions(DomainName="testdn")["CompatibleVersions"]
assert len(versions) == 22
@mock_opensearch
def test_get_compatible_versions_unknown_domain():
client = boto3.client("opensearch", region_name="us-east-2")
with pytest.raises(ClientError) as exc:
client.get_compatible_versions(DomainName="testdn")
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal("Domain not found: testdn")
@mock_opensearch
def test_describe_unknown_domain():
client = boto3.client("opensearch", region_name="eu-west-1")
with pytest.raises(ClientError) as exc:
client.describe_domain(DomainName="testdn")
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal("Domain not found: testdn")
@mock_opensearch
def test_describe_domain():
client = boto3.client("opensearch", region_name="eu-west-1")
client.create_domain(DomainName="testdn")
status = client.describe_domain(DomainName="testdn")["DomainStatus"]
assert "DomainId" in status
assert "DomainName" in status
assert status["DomainName"] == "testdn"
@mock_opensearch
def test_delete_domain():
client = boto3.client("opensearch", region_name="eu-west-1")
client.create_domain(DomainName="testdn")
client.delete_domain(DomainName="testdn")
with pytest.raises(ClientError) as exc:
client.describe_domain(DomainName="testdn")
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal("Domain not found: testdn")
@mock_opensearch
def test_update_domain_config():
client = boto3.client("opensearch", region_name="eu-north-1")
client.create_domain(
DomainName="testdn",
DomainEndpointOptions={
"CustomEndpointEnabled": False,
"EnforceHTTPS": True,
"TLSSecurityPolicy": "Policy-Min-TLS-1-0-2019-07",
},
EBSOptions={"EBSEnabled": True, "VolumeSize": 10},
EngineVersion="OpenSearch 1.1",
)
config = client.update_domain_config(
DomainName="testdn",
EBSOptions={"EBSEnabled": False},
)["DomainConfig"]
assert config["EBSOptions"] == {"Options": {"EBSEnabled": False}}
assert config["DomainEndpointOptions"] == {
"Options": {
"EnforceHTTPS": True,
"TLSSecurityPolicy": "Policy-Min-TLS-1-0-2019-07",
"CustomEndpointEnabled": False,
}
}