Service: Neptune (#5983)

This commit is contained in:
Bert Blommers 2023-02-26 15:27:08 -01:00 committed by GitHub
parent 83ba839931
commit 2d3867dbd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 11348 additions and 24 deletions

View File

@ -4419,6 +4419,81 @@
- [X] update_user
</details>
## neptune
<details>
<summary>13% implemented</summary>
- [ ] add_role_to_db_cluster
- [ ] add_source_identifier_to_subscription
- [ ] add_tags_to_resource
- [ ] apply_pending_maintenance_action
- [ ] copy_db_cluster_parameter_group
- [ ] copy_db_cluster_snapshot
- [ ] copy_db_parameter_group
- [X] create_db_cluster
- [ ] create_db_cluster_endpoint
- [ ] create_db_cluster_parameter_group
- [ ] create_db_cluster_snapshot
- [ ] create_db_instance
- [ ] create_db_parameter_group
- [ ] create_db_subnet_group
- [ ] create_event_subscription
- [X] create_global_cluster
- [X] delete_db_cluster
- [ ] delete_db_cluster_endpoint
- [ ] delete_db_cluster_parameter_group
- [ ] delete_db_cluster_snapshot
- [ ] delete_db_instance
- [ ] delete_db_parameter_group
- [ ] delete_db_subnet_group
- [ ] delete_event_subscription
- [X] delete_global_cluster
- [ ] describe_db_cluster_endpoints
- [ ] describe_db_cluster_parameter_groups
- [ ] describe_db_cluster_parameters
- [ ] describe_db_cluster_snapshot_attributes
- [ ] describe_db_cluster_snapshots
- [X] describe_db_clusters
- [ ] describe_db_engine_versions
- [ ] describe_db_instances
- [ ] describe_db_parameter_groups
- [ ] describe_db_parameters
- [ ] describe_db_subnet_groups
- [ ] describe_engine_default_cluster_parameters
- [ ] describe_engine_default_parameters
- [ ] describe_event_categories
- [ ] describe_event_subscriptions
- [ ] describe_events
- [X] describe_global_clusters
- [X] describe_orderable_db_instance_options
- [ ] describe_pending_maintenance_actions
- [ ] describe_valid_db_instance_modifications
- [ ] failover_db_cluster
- [ ] failover_global_cluster
- [ ] list_tags_for_resource
- [X] modify_db_cluster
- [ ] modify_db_cluster_endpoint
- [ ] modify_db_cluster_parameter_group
- [ ] modify_db_cluster_snapshot_attribute
- [ ] modify_db_instance
- [ ] modify_db_parameter_group
- [ ] modify_db_subnet_group
- [ ] modify_event_subscription
- [ ] modify_global_cluster
- [ ] promote_read_replica_db_cluster
- [ ] reboot_db_instance
- [ ] remove_from_global_cluster
- [ ] remove_role_from_db_cluster
- [ ] remove_source_identifier_from_subscription
- [ ] remove_tags_from_resource
- [ ] reset_db_cluster_parameter_group
- [ ] reset_db_parameter_group
- [ ] restore_db_cluster_from_snapshot
- [ ] restore_db_cluster_to_point_in_time
- [X] start_db_cluster
- [ ] stop_db_cluster
</details>
## opsworks
<details>
<summary>12% implemented</summary>
@ -4937,7 +5012,7 @@
## rds
<details>
<summary>29% implemented</summary>
<summary>30% implemented</summary>
- [ ] add_role_to_db_cluster
- [ ] add_role_to_db_instance
@ -5020,7 +5095,7 @@
- [ ] describe_global_clusters
- [X] describe_option_group_options
- [X] describe_option_groups
- [ ] describe_orderable_db_instance_options
- [X] describe_orderable_db_instance_options
- [ ] describe_pending_maintenance_actions
- [ ] describe_reserved_db_instances
- [ ] describe_reserved_db_instances_offerings
@ -6815,7 +6890,6 @@
- mobile
- mturk
- mwaa
- neptune
- network-firewall
- networkmanager
- nimble
@ -6881,7 +6955,6 @@
- support-app
- synthetics
- timestream-query
- tnb
- transfer
- translate
- voice-id

View File

@ -9,6 +9,7 @@ include moto/ec2/resources/amis.json
include moto/cognitoidp/resources/*.json
include moto/dynamodb/parsing/reserved_keywords.txt
include moto/moto_api/_internal/*
include moto/rds/resources/cluster_options/*.json
include moto/servicequotas/resources/*/*.json
include moto/ssm/resources/*.json
include moto/ssm/resources/ami-amazon-linux-latest/*.json

View File

@ -0,0 +1,112 @@
.. _implementedservice_neptune:
.. |start-h3| raw:: html
<h3>
.. |end-h3| raw:: html
</h3>
=======
neptune
=======
.. autoclass:: moto.neptune.models.NeptuneBackend
|start-h3| Example usage |end-h3|
.. sourcecode:: python
@mock_neptune
def test_neptune_behaviour:
boto3.client("neptune")
...
|start-h3| Implemented features for this service |end-h3|
- [ ] add_role_to_db_cluster
- [ ] add_source_identifier_to_subscription
- [ ] add_tags_to_resource
- [ ] apply_pending_maintenance_action
- [ ] copy_db_cluster_parameter_group
- [ ] copy_db_cluster_snapshot
- [ ] copy_db_parameter_group
- [X] create_db_cluster
- [ ] create_db_cluster_endpoint
- [ ] create_db_cluster_parameter_group
- [ ] create_db_cluster_snapshot
- [ ] create_db_instance
- [ ] create_db_parameter_group
- [ ] create_db_subnet_group
- [ ] create_event_subscription
- [X] create_global_cluster
- [X] delete_db_cluster
The parameters SkipFinalSnapshot and FinalDBSnapshotIdentifier are not yet implemented.
The DeletionProtection-attribute is not yet enforced
- [ ] delete_db_cluster_endpoint
- [ ] delete_db_cluster_parameter_group
- [ ] delete_db_cluster_snapshot
- [ ] delete_db_instance
- [ ] delete_db_parameter_group
- [ ] delete_db_subnet_group
- [ ] delete_event_subscription
- [X] delete_global_cluster
- [ ] describe_db_cluster_endpoints
- [ ] describe_db_cluster_parameter_groups
- [ ] describe_db_cluster_parameters
- [ ] describe_db_cluster_snapshot_attributes
- [ ] describe_db_cluster_snapshots
- [X] describe_db_clusters
Pagination and the Filters-argument is not yet implemented
- [ ] describe_db_engine_versions
- [ ] describe_db_instances
- [ ] describe_db_parameter_groups
- [ ] describe_db_parameters
- [ ] describe_db_subnet_groups
- [ ] describe_engine_default_cluster_parameters
- [ ] describe_engine_default_parameters
- [ ] describe_event_categories
- [ ] describe_event_subscriptions
- [ ] describe_events
- [X] describe_global_clusters
- [X] describe_orderable_db_instance_options
Only the EngineVersion-parameter is currently implemented.
- [ ] describe_pending_maintenance_actions
- [ ] describe_valid_db_instance_modifications
- [ ] failover_db_cluster
- [ ] failover_global_cluster
- [ ] list_tags_for_resource
- [X] modify_db_cluster
- [ ] modify_db_cluster_endpoint
- [ ] modify_db_cluster_parameter_group
- [ ] modify_db_cluster_snapshot_attribute
- [ ] modify_db_instance
- [ ] modify_db_parameter_group
- [ ] modify_db_subnet_group
- [ ] modify_event_subscription
- [ ] modify_global_cluster
- [ ] promote_read_replica_db_cluster
- [ ] reboot_db_instance
- [ ] remove_from_global_cluster
- [ ] remove_role_from_db_cluster
- [ ] remove_source_identifier_from_subscription
- [ ] remove_tags_from_resource
- [ ] reset_db_cluster_parameter_group
- [ ] reset_db_parameter_group
- [ ] restore_db_cluster_from_snapshot
- [ ] restore_db_cluster_to_point_in_time
- [X] start_db_cluster
- [ ] stop_db_cluster

View File

@ -106,7 +106,11 @@ rds
- [ ] describe_global_clusters
- [X] describe_option_group_options
- [X] describe_option_groups
- [ ] describe_orderable_db_instance_options
- [X] describe_orderable_db_instance_options
Only the Neptune-engine is currently implemented
- [ ] describe_pending_maintenance_actions
- [ ] describe_reserved_db_instances
- [ ] describe_reserved_db_instances_offerings

View File

@ -91,6 +91,7 @@ mock_glacier = lazy_load(".glacier", "mock_glacier")
mock_glue = lazy_load(".glue", "mock_glue", boto3_name="glue")
mock_guardduty = lazy_load(".guardduty", "mock_guardduty")
mock_iam = lazy_load(".iam", "mock_iam")
mock_identitystore = lazy_load(".identitystore", "mock_identitystore")
mock_iot = lazy_load(".iot", "mock_iot")
mock_iotdata = lazy_load(".iotdata", "mock_iotdata", boto3_name="iot-data")
mock_kinesis = lazy_load(".kinesis", "mock_kinesis")
@ -111,7 +112,8 @@ mock_mediastoredata = lazy_load(
".mediastoredata", "mock_mediastoredata", boto3_name="mediastore-data"
)
mock_meteringmarketplace = lazy_load(".meteringmarketplace", "mock_meteringmarketplace")
mock_mq = lazy_load(".mq", "mock_mq", boto3_name="mq")
mock_mq = lazy_load(".mq", "mock_mq")
mock_neptune = lazy_load(".rds", "mock_rds", boto3_name="neptune")
mock_opsworks = lazy_load(".opsworks", "mock_opsworks")
mock_organizations = lazy_load(".organizations", "mock_organizations")
mock_personalize = lazy_load(".personalize", "mock_personalize")
@ -124,9 +126,7 @@ mock_redshift = lazy_load(".redshift", "mock_redshift")
mock_redshiftdata = lazy_load(
".redshiftdata", "mock_redshiftdata", boto3_name="redshift-data"
)
mock_rekognition = lazy_load(
".rekognition", "mock_rekognition", boto3_name="rekognition"
)
mock_rekognition = lazy_load(".rekognition", "mock_rekognition")
mock_resourcegroups = lazy_load(
".resourcegroups", "mock_resourcegroups", boto3_name="resource-groups"
)
@ -147,7 +147,7 @@ mock_servicequotas = lazy_load(
)
mock_ses = lazy_load(".ses", "mock_ses")
mock_servicediscovery = lazy_load(".servicediscovery", "mock_servicediscovery")
mock_signer = lazy_load(".signer", "mock_signer", boto3_name="signer")
mock_signer = lazy_load(".signer", "mock_signer")
mock_sns = lazy_load(".sns", "mock_sns")
mock_sqs = lazy_load(".sqs", "mock_sqs")
mock_ssm = lazy_load(".ssm", "mock_ssm")
@ -167,7 +167,6 @@ mock_xray = lazy_load(".xray", "mock_xray")
mock_xray_client = lazy_load(".xray", "mock_xray_client")
mock_wafv2 = lazy_load(".wafv2", "mock_wafv2")
mock_textract = lazy_load(".textract", "mock_textract")
mock_identitystore = lazy_load(".identitystore", "mock_identitystore")
class MockAll(ContextDecorator):

View File

@ -1,4 +1,4 @@
# autogenerated by /Users/dan/Sites/moto/scripts/update_backend_index.py
# autogenerated by scripts/update_backend_index.py
import re
backend_url_patterns = [

View File

@ -142,8 +142,11 @@ class convert_flask_to_responses_response(object):
return status, headers, response
def iso_8601_datetime_with_milliseconds(value: datetime.datetime) -> str:
return value.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
def iso_8601_datetime_with_milliseconds(
value: Optional[datetime.datetime] = None,
) -> str:
date_to_use = value or datetime.datetime.now()
return date_to_use.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
# Even Python does not support nanoseconds, other languages like Go do (needed for Terraform)

11
moto/neptune/__init__.py Normal file
View File

@ -0,0 +1,11 @@
"""
Neptune is a bit of an odd duck.
It shares almost everything with RDS: the endpoint URL, and the features. Only the parameters to these features can be different.
Because the endpoint URL is the same (rds.amazonaws.com), every request is intercepted by the RDS service.
RDS then has to determine whether any incoming call was meant for RDS, or for neptune.
"""
from .models import neptune_backends
from ..core.models import base_decorator
mock_neptune = base_decorator(neptune_backends)

View File

@ -0,0 +1,26 @@
from jinja2 import Template
from moto.core.exceptions import RESTError
class NeptuneClientError(RESTError):
def __init__(self, code: str, message: str):
super().__init__(error_type=code, message=message)
template = Template(
"""
<ErrorResponse>
<Error>
<Code>{{ code }}</Code>
<Message>{{ message }}</Message>
<Type>Sender</Type>
</Error>
<RequestId>6876f774-7273-11e4-85dc-39e55ca848d1</RequestId>
</ErrorResponse>"""
)
self.description = template.render(code=code, message=message)
class DBClusterNotFoundError(NeptuneClientError):
def __init__(self, cluster_identifier: str):
super().__init__(
"DBClusterNotFoundFault", f"DBCluster {cluster_identifier} not found."
)

370
moto/neptune/models.py Normal file
View File

@ -0,0 +1,370 @@
import copy
import string
from jinja2 import Template
from typing import Any, Dict, List, Optional
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.core.utils import iso_8601_datetime_with_milliseconds
from moto.utilities.utils import load_resource
from moto.moto_api._internal import mock_random as random
from .exceptions import DBClusterNotFoundError
class GlobalCluster(BaseModel):
def __init__(
self,
account_id: str,
global_cluster_identifier: str,
engine: Optional[str],
engine_version: Optional[str],
storage_encrypted: Optional[str],
deletion_protection: Optional[str],
):
self.global_cluster_identifier = global_cluster_identifier
self.global_cluster_resource_id = "cluster-" + random.get_random_hex(8)
self.global_cluster_arn = (
f"arn:aws:rds::{account_id}:global-cluster:{global_cluster_identifier}"
)
self.engine = engine or "neptune"
self.engine_version = engine_version or "1.2.0.0"
self.storage_encrypted = (
storage_encrypted and storage_encrypted.lower() == "true"
)
self.deletion_protection = (
deletion_protection and deletion_protection.lower() == "true"
)
def to_xml(self) -> str:
template = Template(
"""
<GlobalClusterIdentifier>{{ cluster.global_cluster_identifier }}</GlobalClusterIdentifier>
<GlobalClusterResourceId>{{ cluster.global_cluster_resource_id }}</GlobalClusterResourceId>
<GlobalClusterArn>{{ cluster.global_cluster_arn }}</GlobalClusterArn>
<Engine>{{ cluster.engine }}</Engine>
<Status>available</Status>
<EngineVersion>{{ cluster.engine_version }}</EngineVersion>
<StorageEncrypted>{{ 'true' if cluster.storage_encrypted else 'false' }}</StorageEncrypted>
<DeletionProtection>{{ 'true' if cluster.deletion_protection else 'false' }}</DeletionProtection>"""
)
return template.render(cluster=self)
class DBCluster(BaseModel):
def __init__(
self,
account_id: str,
region_name: str,
db_cluster_identifier: str,
database_name: Optional[str],
tags: List[Dict[str, str]],
storage_encrypted: str,
parameter_group_name: str,
engine: str,
engine_version: str,
kms_key_id: Optional[str],
preferred_maintenance_window: Optional[str],
preferred_backup_window: Optional[str],
backup_retention_period: Optional[int],
port: Optional[int],
serverless_v2_scaling_configuration: Optional[Dict[str, int]],
):
self.account_id = account_id
self.region_name = region_name
self.db_cluster_identifier = db_cluster_identifier
self.resource_id = "cluster-" + random.get_random_hex(8)
self.tags = tags
self.storage_encrypted = storage_encrypted.lower() != "false"
self.db_cluster_parameter_group_name = parameter_group_name
self.engine = engine
self.engine_version = engine_version
self.database_name = database_name
self.db_subnet_group = "default"
self.status = "available"
self.backup_retention_period = backup_retention_period
self.cluster_create_time = iso_8601_datetime_with_milliseconds()
self.url_identifier = "".join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(12)
)
self.endpoint = f"{self.db_cluster_identifier}.cluster-{self.url_identifier}.{self.region_name}.neptune.amazonaws.com"
self.reader_endpoint = f"{self.db_cluster_identifier}.cluster-ro-{self.url_identifier}.{self.region_name}.neptune.amazonaws.com"
self.resource_id = "cluster-" + "".join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(26)
)
self.hosted_zone_id = "".join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(14)
)
self.kms_key_id = kms_key_id or (
"default_kms_key_id" if self.storage_encrypted else None
)
self.preferred_maintenance_window = preferred_maintenance_window
self.preferred_backup_window = preferred_backup_window
self.port = port
self.availability_zones = [
f"{self.region_name}a",
f"{self.region_name}b",
f"{self.region_name}c",
]
self.serverless_v2_scaling_configuration = serverless_v2_scaling_configuration
@property
def db_cluster_arn(self) -> str:
return f"arn:aws:rds:{self.region_name}:{self.account_id}:cluster:{self.db_cluster_identifier}"
def get_tags(self) -> List[Dict[str, str]]:
return self.tags
def add_tags(self, tags: List[Dict[str, str]]) -> List[Dict[str, str]]:
new_keys = [tag_set["Key"] for tag_set in tags]
self.tags = [tag_set for tag_set in self.tags if tag_set["Key"] not in new_keys]
self.tags.extend(tags)
return self.tags
def remove_tags(self, tag_keys: List[str]) -> None:
self.tags = [tag_set for tag_set in self.tags if tag_set["Key"] not in tag_keys]
def to_xml(self) -> str:
template = Template(
"""<DBCluster>
{% if cluster.allocated_storage %}
<AllocatedStorage>{{ cluster.allocated_storage }}</AllocatedStorage>
{% endif %}
<AvailabilityZones>
{% for zone in cluster.availability_zones %}
<AvailabilityZone>{{ zone }}</AvailabilityZone>
{% endfor %}
</AvailabilityZones>
{% if cluster.backup_retention_period %}
<BackupRetentionPeriod>{{ cluster.backup_retention_period }}</BackupRetentionPeriod>
{% endif %}
{% if cluster.character_set_name %}
<CharacterSetName>{{ cluster.character_set_name }}</CharacterSetName>
{% endif %}
{% if cluster.database_name %}
<DatabaseName>{{ cluster.database_name }}</DatabaseName>
{% endif %}
<DBClusterIdentifier>{{ cluster.db_cluster_identifier }}</DBClusterIdentifier>
<DBClusterParameterGroup>{{ cluster.db_cluster_parameter_group_name }}</DBClusterParameterGroup>
<DBSubnetGroup>{{ cluster.db_subnet_group }}</DBSubnetGroup>
<Status>{{ cluster.status }}</Status>
<PercentProgress>{{ cluster.percent_progress }}</PercentProgress>
{% if cluster.earliest_restorable_time %}
<EarliestRestorableTime>{{ cluster.earliest_restorable_time }}</EarliestRestorableTime>
{% endif %}
<Endpoint>{{ cluster.endpoint }}</Endpoint>
<ReaderEndpoint>{{ cluster.reader_endpoint }}</ReaderEndpoint>
<MultiAZ>false</MultiAZ>
<Engine>{{ cluster.engine }}</Engine>
<EngineVersion>{{ cluster.engine_version }}</EngineVersion>
{% if cluster.latest_restorable_time %}
<LatestRestorableTime>{{ cluster.latest_restorable_time }}</LatestRestorableTime>
{% endif %}
{% if cluster.port %}
<Port>{{ cluster.port }}</Port>
{% endif %}
<MasterUsername>{{ cluster.master_username }}</MasterUsername>
<DBClusterOptionGroupMemberships>
{% for dbclusteroptiongroupmembership in cluster.dbclusteroptiongroupmemberships %}
<member>
<DBClusterOptionGroupName>{{ dbclusteroptiongroupmembership.db_cluster_option_group_name }}</DBClusterOptionGroupName>
<Status>{{ dbclusteroptiongroupmembership.status }}</Status>
</member>
{% endfor %}
</DBClusterOptionGroupMemberships>
<PreferredBackupWindow>{{ cluster.preferred_backup_window }}</PreferredBackupWindow>
<PreferredMaintenanceWindow>{{ cluster.preferred_maintenance_window }}</PreferredMaintenanceWindow>
<ReplicationSourceIdentifier>{{ cluster.replication_source_identifier }}</ReplicationSourceIdentifier>
<ReadReplicaIdentifiers>
{% for readreplicaidentifier in cluster.readreplicaidentifiers %}
<member/>
{% endfor %}
</ReadReplicaIdentifiers>
<DBClusterMembers>
{% for dbclustermember in cluster.dbclustermembers %}
<member>
<DBInstanceIdentifier>{{ dbclustermember.db_instance_identifier }}</DBInstanceIdentifier>
<IsClusterWriter>{{ dbclustermember.is_cluster_writer }}</IsClusterWriter>
<DBClusterParameterGroupStatus>{{ dbclustermember.db_cluster_parameter_group_status }}</DBClusterParameterGroupStatus>
<PromotionTier>{{ dbclustermember.promotion_tier }}</PromotionTier>
</member>
{% endfor %}
</DBClusterMembers>
<VpcSecurityGroups>
{% for vpcsecuritygroup in cluster.vpcsecuritygroups %}
<member>
<VpcSecurityGroupId>{{ vpcsecuritygroup.vpc_security_group_id }}</VpcSecurityGroupId>
<Status>{{ vpcsecuritygroup.status }}</Status>
</member>
{% endfor %}
</VpcSecurityGroups>
<HostedZoneId>{{ cluster.hosted_zone_id }}</HostedZoneId>
<StorageEncrypted>{{ 'true' if cluster.storage_encrypted else 'false'}}</StorageEncrypted>
<KmsKeyId>{{ cluster.kms_key_id }}</KmsKeyId>
<DbClusterResourceId>{{ cluster.resource_id }}</DbClusterResourceId>
<DBClusterArn>{{ cluster.db_cluster_arn }}</DBClusterArn>
<AssociatedRoles>
{% for associatedrole in cluster.associatedroles %}
<member>
<RoleArn>{{ associatedrole.role_arn }}</RoleArn>
<Status>{{ associatedrole.status }}</Status>
<FeatureName>{{ associatedrole.feature_name }}</FeatureName>
</member>
{% endfor %}
</AssociatedRoles>
<IAMDatabaseAuthenticationEnabled>false</IAMDatabaseAuthenticationEnabled>
<CloneGroupId>{{ cluster.clone_group_id }}</CloneGroupId>
<ClusterCreateTime>{{ cluster.cluster_create_time }}</ClusterCreateTime>
<CopyTagsToSnapshot>false</CopyTagsToSnapshot>
<EnabledCloudwatchLogsExports>
{% for enabledcloudwatchlogsexport in cluster.enabledcloudwatchlogsexports %}
<member/>db_cluster_arn
{% endfor %}
</EnabledCloudwatchLogsExports>
<DeletionProtection>false</DeletionProtection>
<CrossAccountClone>false</CrossAccountClone>
{% if cluster.automatic_restart_time %}
<AutomaticRestartTime>{{ cluster.automatic_restart_time }}</AutomaticRestartTime>
{% endif %}
{% if cluster.serverless_v2_scaling_configuration %}
<ServerlessV2ScalingConfiguration>
<MinCapacity>{{ cluster.serverless_v2_scaling_configuration["MinCapacity"] }}</MinCapacity>
<MaxCapacity>{{ cluster.serverless_v2_scaling_configuration["MaxCapacity"] }}</MaxCapacity>
</ServerlessV2ScalingConfiguration>
{% endif %}
</DBCluster>"""
)
return template.render(cluster=self)
class NeptuneBackend(BaseBackend):
"""Implementation of Neptune APIs."""
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.clusters: Dict[str, DBCluster] = dict()
self.global_clusters: Dict[str, GlobalCluster] = dict()
self._db_cluster_options: Optional[List[Dict[str, Any]]] = None
@property
def db_cluster_options(self) -> List[Dict[str, Any]]: # type: ignore[misc]
if self._db_cluster_options is None:
from moto.rds.utils import decode_orderable_db_instance
decoded_options: List[Dict[str, Any]] = load_resource( # type: ignore[assignment]
__name__, "../rds/resources/cluster_options/neptune.json"
)
self._db_cluster_options = [
decode_orderable_db_instance(option) for option in decoded_options
]
return self._db_cluster_options
def create_db_cluster(self, **kwargs: Any) -> DBCluster:
cluster = DBCluster(
account_id=self.account_id,
region_name=self.region_name,
db_cluster_identifier=kwargs["db_cluster_identifier"],
database_name=kwargs.get("database_name"),
storage_encrypted=kwargs.get("storage_encrypted", True),
parameter_group_name=kwargs.get("db_cluster_parameter_group_name") or "",
tags=kwargs.get("tags", []),
engine=kwargs.get("engine", "neptune"),
engine_version=kwargs.get("engine_version") or "1.2.0.2",
kms_key_id=kwargs.get("kms_key_id"),
preferred_maintenance_window=kwargs.get("preferred_maintenance_window")
or "none",
preferred_backup_window=kwargs.get("preferred_backup_window"),
backup_retention_period=kwargs.get("backup_retention_period") or 1,
port=kwargs.get("port") or 8192,
serverless_v2_scaling_configuration=kwargs.get(
"serverless_v2_scaling_configuration"
),
)
self.clusters[cluster.db_cluster_identifier] = cluster
return cluster
def create_global_cluster(
self,
global_cluster_identifier: str,
engine: Optional[str],
engine_version: Optional[str],
storage_encrypted: Optional[str],
deletion_protection: Optional[str],
) -> GlobalCluster:
cluster = GlobalCluster(
account_id=self.account_id,
global_cluster_identifier=global_cluster_identifier,
engine=engine,
engine_version=engine_version,
storage_encrypted=storage_encrypted,
deletion_protection=deletion_protection,
)
self.global_clusters[global_cluster_identifier] = cluster
return cluster
def delete_global_cluster(self, global_cluster_identifier: str) -> GlobalCluster:
return self.global_clusters.pop(global_cluster_identifier)
def describe_global_clusters(self) -> List[GlobalCluster]:
return list(self.global_clusters.values())
def describe_db_clusters(self, db_cluster_identifier: str) -> List[DBCluster]:
"""
Pagination and the Filters-argument is not yet implemented
"""
if db_cluster_identifier:
if db_cluster_identifier not in self.clusters:
raise DBClusterNotFoundError(db_cluster_identifier)
return [self.clusters[db_cluster_identifier]]
return list(self.clusters.values())
def delete_db_cluster(self, cluster_identifier: str) -> DBCluster:
"""
The parameters SkipFinalSnapshot and FinalDBSnapshotIdentifier are not yet implemented.
The DeletionProtection-attribute is not yet enforced
"""
if cluster_identifier in self.clusters:
return self.clusters.pop(cluster_identifier)
raise DBClusterNotFoundError(cluster_identifier)
def modify_db_cluster(self, kwargs: Any) -> DBCluster:
cluster_id = kwargs["db_cluster_identifier"]
cluster = self.clusters[cluster_id]
del self.clusters[cluster_id]
kwargs["db_cluster_identifier"] = kwargs.pop("new_db_cluster_identifier")
for k, v in kwargs.items():
if v is not None:
setattr(cluster, k, v)
cluster_id = kwargs.get("new_db_cluster_identifier", cluster_id)
self.clusters[cluster_id] = cluster
initial_state = copy.deepcopy(cluster) # Return status=creating
cluster.status = "available" # Already set the final status in the background
return initial_state
def start_db_cluster(self, cluster_identifier: str) -> DBCluster:
if cluster_identifier not in self.clusters:
raise DBClusterNotFoundError(cluster_identifier)
cluster = self.clusters[cluster_identifier]
temp_state = copy.deepcopy(cluster)
temp_state.status = "started"
cluster.status = "available" # This is the final status - already setting it in the background
return temp_state
def describe_orderable_db_instance_options(
self, engine_version: Optional[str]
) -> List[Dict[str, Any]]:
"""
Only the EngineVersion-parameter is currently implemented.
"""
if engine_version:
return [
option
for option in self.db_cluster_options
if option["EngineVersion"] == engine_version
]
return self.db_cluster_options
neptune_backends = BackendDict(NeptuneBackend, "neptune")

180
moto/neptune/responses.py Normal file
View File

@ -0,0 +1,180 @@
from moto.core.responses import BaseResponse
from .models import neptune_backends, NeptuneBackend
class NeptuneResponse(BaseResponse):
"""Handler for Neptune requests and responses."""
def __init__(self) -> None:
super().__init__(service_name="neptune")
@property
def neptune_backend(self) -> NeptuneBackend:
"""Return backend instance specific for this region."""
return neptune_backends[self.current_account][self.region]
@property
def global_backend(self) -> NeptuneBackend:
"""Return backend instance of the region that stores Global Clusters"""
return neptune_backends[self.current_account]["us-east-1"]
def create_db_cluster(self) -> str:
params = self._get_params()
availability_zones = params.get("AvailabilityZones")
backup_retention_period = params.get("BackupRetentionPeriod")
character_set_name = params.get("CharacterSetName")
copy_tags_to_snapshot = params.get("CopyTagsToSnapshot")
database_name = params.get("DatabaseName")
db_cluster_identifier = params.get("DBClusterIdentifier")
db_cluster_parameter_group_name = params.get("DBClusterParameterGroupName")
vpc_security_group_ids = params.get("VpcSecurityGroupIds")
db_subnet_group_name = params.get("DBSubnetGroupName")
engine = params.get("Engine")
engine_version = params.get("EngineVersion")
port = params.get("Port")
master_username = params.get("MasterUsername")
master_user_password = params.get("MasterUserPassword")
option_group_name = params.get("OptionGroupName")
preferred_backup_window = params.get("PreferredBackupWindow")
preferred_maintenance_window = params.get("PreferredMaintenanceWindow")
replication_source_identifier = params.get("ReplicationSourceIdentifier")
tags = (self._get_multi_param_dict("Tags") or {}).get("Tag", [])
storage_encrypted = params.get("StorageEncrypted", "")
kms_key_id = params.get("KmsKeyId")
pre_signed_url = params.get("PreSignedUrl")
enable_iam_database_authentication = params.get(
"EnableIAMDatabaseAuthentication"
)
enable_cloudwatch_logs_exports = params.get("EnableCloudwatchLogsExports")
deletion_protection = params.get("DeletionProtection")
serverless_v2_scaling_configuration = params.get(
"ServerlessV2ScalingConfiguration"
)
global_cluster_identifier = params.get("GlobalClusterIdentifier")
source_region = params.get("SourceRegion")
db_cluster = self.neptune_backend.create_db_cluster(
availability_zones=availability_zones,
backup_retention_period=backup_retention_period,
character_set_name=character_set_name,
copy_tags_to_snapshot=copy_tags_to_snapshot,
database_name=database_name,
db_cluster_identifier=db_cluster_identifier,
db_cluster_parameter_group_name=db_cluster_parameter_group_name,
vpc_security_group_ids=vpc_security_group_ids,
db_subnet_group_name=db_subnet_group_name,
engine=engine,
engine_version=engine_version,
port=port,
master_username=master_username,
master_user_password=master_user_password,
option_group_name=option_group_name,
preferred_backup_window=preferred_backup_window,
preferred_maintenance_window=preferred_maintenance_window,
replication_source_identifier=replication_source_identifier,
tags=tags,
storage_encrypted=storage_encrypted,
kms_key_id=kms_key_id,
pre_signed_url=pre_signed_url,
enable_iam_database_authentication=enable_iam_database_authentication,
enable_cloudwatch_logs_exports=enable_cloudwatch_logs_exports,
deletion_protection=deletion_protection,
serverless_v2_scaling_configuration=serverless_v2_scaling_configuration,
global_cluster_identifier=global_cluster_identifier,
source_region=source_region,
)
template = self.response_template(CREATE_DB_CLUSTER_TEMPLATE)
return template.render(cluster=db_cluster)
def describe_db_clusters(self) -> str:
params = self._get_params()
db_cluster_identifier = params["DBClusterIdentifier"]
db_clusters = self.neptune_backend.describe_db_clusters(
db_cluster_identifier=db_cluster_identifier
)
template = self.response_template(DESCRIBE_DB_CLUSTERS_TEMPLATE)
return template.render(db_clusters=db_clusters)
def describe_global_clusters(self) -> str:
clusters = self.global_backend.describe_global_clusters()
template = self.response_template(DESCRIBE_GLOBAL_CLUSTERS_TEMPLATE)
return template.render(clusters=clusters)
def create_global_cluster(self) -> str:
params = self._get_params()
cluster = self.global_backend.create_global_cluster(
global_cluster_identifier=params["GlobalClusterIdentifier"],
engine=params.get("Engine"),
engine_version=params.get("EngineVersion"),
storage_encrypted=params.get("StorageEncrypted"),
deletion_protection=params.get("DeletionProtection"),
)
template = self.response_template(CREATE_GLOBAL_CLUSTER_TEMPLATE)
return template.render(cluster=cluster)
def delete_global_cluster(self) -> str:
params = self._get_params()
cluster = self.global_backend.delete_global_cluster(
global_cluster_identifier=params["GlobalClusterIdentifier"],
)
template = self.response_template(DELETE_GLOBAL_CLUSTER_TEMPLATE)
return template.render(cluster=cluster)
CREATE_DB_CLUSTER_TEMPLATE = """<CreateDBClusterResponse xmlns="http://rds.amazonaws.com/doc/2014-10-31/">
<ResponseMetadata>
<RequestId>1549581b-12b7-11e3-895e-1334aEXAMPLE</RequestId>
</ResponseMetadata>
<CreateDBClusterResult>
{{ cluster.to_xml() }}
</CreateDBClusterResult>
</CreateDBClusterResponse>"""
DESCRIBE_DB_CLUSTERS_TEMPLATE = """<DescribeDBClustersResponse xmlns="http://rds.amazonaws.com/doc/2014-10-31/">
<ResponseMetadata>
<RequestId>1549581b-12b7-11e3-895e-1334aEXAMPLE</RequestId>
</ResponseMetadata>
<DescribeDBClustersResult>
<DBClusters>
{% for cluster in db_clusters %}
{{ cluster.to_xml() }}
{% endfor %}
</DBClusters>
</DescribeDBClustersResult>
</DescribeDBClustersResponse>"""
CREATE_GLOBAL_CLUSTER_TEMPLATE = """<CreateGlobalClusterResponse xmlns="http://rds.amazonaws.com/doc/2014-10-31/">
<ResponseMetadata>
<RequestId>1549581b-12b7-11e3-895e-1334aEXAMPLE</RequestId>
</ResponseMetadata>
<CreateGlobalClusterResult>
<GlobalCluster>
{{ cluster.to_xml() }}
</GlobalCluster>
</CreateGlobalClusterResult>
</CreateGlobalClusterResponse>"""
DELETE_GLOBAL_CLUSTER_TEMPLATE = """<DeleteGlobalClusterResponse xmlns="http://rds.amazonaws.com/doc/2014-10-31/">
<ResponseMetadata>
<RequestId>1549581b-12b7-11e3-895e-1334aEXAMPLE</RequestId>
</ResponseMetadata>
<DeleteGlobalClusterResult>
<GlobalCluster>
{{ cluster.to_xml() }}
</GlobalCluster>
</DeleteGlobalClusterResult>
</DeleteGlobalClusterResponse>"""
DESCRIBE_GLOBAL_CLUSTERS_TEMPLATE = """<DescribeGlobalClustersResponse xmlns="http://rds.amazonaws.com/doc/2014-10-31/">
<ResponseMetadata>
<RequestId>1549581b-12b7-11e3-895e-1334aEXAMPLE</RequestId>
</ResponseMetadata>
<DescribeGlobalClustersResult>
<GlobalClusters>
{% for cluster in clusters %}
<GlobalClusterMember>
{{ cluster.to_xml() }}
</GlobalClusterMember>
{% endfor %}
</GlobalClusters>
</DescribeGlobalClustersResult>
</DescribeGlobalClustersResponse>"""

7
moto/neptune/urls.py Normal file
View File

@ -0,0 +1,7 @@
"""
All calls to this service are intercepted by RDS
"""
url_bases = [] # type: ignore[var-annotated]
url_paths = {} # type: ignore[var-annotated]

View File

@ -11,6 +11,7 @@ from moto.core import BaseBackend, BackendDict, BaseModel, CloudFormationModel
from moto.core.utils import iso_8601_datetime_with_milliseconds
from moto.ec2.models import ec2_backends
from moto.moto_api._internal import mock_random as random
from moto.neptune.models import neptune_backends, NeptuneBackend
from .exceptions import (
RDSClientError,
DBClusterNotFoundError,
@ -1335,6 +1336,14 @@ class RDSBackend(BaseBackend):
self.security_groups = {}
self.subnet_groups = {}
def reset(self):
self.neptune.reset()
super().reset()
@property
def neptune(self) -> NeptuneBackend:
return neptune_backends[self.account_id][self.region_name]
@staticmethod
def default_vpc_endpoint_service(service_region, zones):
"""Default VPC endpoint service."""
@ -1868,6 +1877,9 @@ class RDSBackend(BaseBackend):
def modify_db_cluster(self, kwargs):
cluster_id = kwargs["db_cluster_identifier"]
if cluster_id in self.neptune.clusters:
return self.neptune.modify_db_cluster(kwargs)
cluster = self.clusters[cluster_id]
del self.clusters[cluster_id]
@ -1933,10 +1945,12 @@ class RDSBackend(BaseBackend):
def describe_db_clusters(self, cluster_identifier):
if cluster_identifier:
if cluster_identifier not in self.clusters:
raise DBClusterNotFoundError(cluster_identifier)
if cluster_identifier in self.clusters:
return [self.clusters[cluster_identifier]]
return self.clusters.values()
if cluster_identifier in self.neptune.clusters:
return [self.neptune.clusters[cluster_identifier]]
raise DBClusterNotFoundError(cluster_identifier)
return list(self.clusters.values()) + list(self.neptune.clusters.values())
def describe_db_cluster_snapshots(
self, db_cluster_identifier, db_snapshot_identifier, filters=None
@ -1963,10 +1977,13 @@ class RDSBackend(BaseBackend):
if snapshot_name:
self.create_db_cluster_snapshot(cluster_identifier, snapshot_name)
return self.clusters.pop(cluster_identifier)
if cluster_identifier in self.neptune.clusters:
return self.neptune.delete_db_cluster(cluster_identifier)
raise DBClusterNotFoundError(cluster_identifier)
def start_db_cluster(self, cluster_identifier):
if cluster_identifier not in self.clusters:
return self.neptune.start_db_cluster(cluster_identifier)
raise DBClusterNotFoundError(cluster_identifier)
cluster = self.clusters[cluster_identifier]
if cluster.status != "stopped":
@ -2083,6 +2100,8 @@ class RDSBackend(BaseBackend):
elif resource_type == "cluster": # Cluster
if resource_name in self.clusters:
return self.clusters[resource_name].get_tags()
if resource_name in self.neptune.clusters:
return self.neptune.clusters[resource_name].get_tags()
elif resource_type == "es": # Event Subscription
if resource_name in self.event_subscriptions:
return self.event_subscriptions[resource_name].get_tags()
@ -2142,6 +2161,8 @@ class RDSBackend(BaseBackend):
elif resource_type == "cluster":
if resource_name in self.clusters:
return self.clusters[resource_name].remove_tags(tag_keys)
if resource_name in self.neptune.clusters:
return self.neptune.clusters[resource_name].remove_tags(tag_keys)
elif resource_type == "cluster-snapshot": # DB Cluster Snapshot
if resource_name in self.cluster_snapshots:
return self.cluster_snapshots[resource_name].remove_tags(tag_keys)
@ -2181,6 +2202,8 @@ class RDSBackend(BaseBackend):
elif resource_type == "cluster":
if resource_name in self.clusters:
return self.clusters[resource_name].add_tags(tags)
if resource_name in self.neptune.clusters:
return self.neptune.clusters[resource_name].add_tags(tags)
elif resource_type == "cluster-snapshot": # DB Cluster Snapshot
if resource_name in self.cluster_snapshots:
return self.cluster_snapshots[resource_name].add_tags(tags)
@ -2211,6 +2234,14 @@ class RDSBackend(BaseBackend):
tags_dict.update({d["Key"]: d["Value"] for d in new_tags})
return [{"Key": k, "Value": v} for k, v in tags_dict.items()]
def describe_orderable_db_instance_options(self, engine, engine_version):
"""
Only the Neptune-engine is currently implemented
"""
if engine == "neptune":
return self.neptune.describe_orderable_db_instance_options(engine_version)
return []
class OptionGroup(object):
def __init__(

File diff suppressed because it is too large Load Diff

View File

@ -1,19 +1,36 @@
from collections import defaultdict
from typing import Any
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backends
from .models import rds_backends
from moto.neptune.responses import NeptuneResponse
from .models import rds_backends, RDSBackend
from .exceptions import DBParameterGroupNotFoundError
class RDSResponse(BaseResponse):
def __init__(self):
super().__init__(service_name="rds")
# Neptune and RDS share a HTTP endpoint RDS is the lucky guy that catches all requests
# So we have to determine whether we can handle an incoming request here, or whether it needs redirecting to Neptune
self.neptune = NeptuneResponse()
@property
def backend(self):
def backend(self) -> RDSBackend:
return rds_backends[self.current_account][self.region]
def _dispatch(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE:
# Because some requests are send through to Neptune, we have to prepare the NeptuneResponse-class
self.neptune.setup_class(request, full_url, headers)
return super()._dispatch(request, full_url, headers)
def __getattribute__(self, name: str):
if name in ["create_db_cluster"]:
if self._get_param("Engine") == "neptune":
return object.__getattribute__(self.neptune, name)
return object.__getattribute__(self, name)
def _get_db_kwargs(self):
args = {
"auto_minor_version_upgrade": self._get_param("AutoMinorVersionUpgrade"),
@ -80,6 +97,9 @@ class RDSResponse(BaseResponse):
"db_instance_identifier": self._get_param("DBInstanceIdentifier"),
"db_name": self._get_param("DBName"),
"db_parameter_group_name": self._get_param("DBParameterGroupName"),
"db_cluster_parameter_group_name": self._get_param(
"DBClusterParameterGroupName"
),
"db_snapshot_identifier": self._get_param("DBSnapshotIdentifier"),
"db_subnet_group_name": self._get_param("DBSubnetGroupName"),
"engine": self._get_param("Engine"),
@ -98,8 +118,10 @@ class RDSResponse(BaseResponse):
"multi_az": self._get_bool_param("MultiAZ"),
"option_group_name": self._get_param("OptionGroupName"),
"port": self._get_param("Port"),
# PreferredBackupWindow
# PreferredMaintenanceWindow
"preferred_backup_window": self._get_param("PreferredBackupWindow"),
"preferred_maintenance_window": self._get_param(
"PreferredMaintenanceWindow"
),
"publicly_accessible": self._get_param("PubliclyAccessible"),
"account_id": self.current_account,
"region": self.region,
@ -672,6 +694,24 @@ class RDSResponse(BaseResponse):
template = self.response_template(DESCRIBE_EVENT_SUBSCRIPTIONS_TEMPLATE)
return template.render(subscriptions=subscriptions)
def describe_orderable_db_instance_options(self):
engine = self._get_param("Engine")
engine_version = self._get_param("EngineVersion")
options = self.backend.describe_orderable_db_instance_options(
engine, engine_version
)
template = self.response_template(DESCRIBE_ORDERABLE_CLUSTER_OPTIONS)
return template.render(options=options, marker=None)
def describe_global_clusters(self):
return self.neptune.describe_global_clusters()
def create_global_cluster(self):
return self.neptune.create_global_cluster()
def delete_global_cluster(self):
return self.neptune.delete_global_cluster()
CREATE_DATABASE_TEMPLATE = """<CreateDBInstanceResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<CreateDBInstanceResult>
@ -1210,3 +1250,55 @@ DESCRIBE_EVENT_SUBSCRIPTIONS_TEMPLATE = """<DescribeEventSubscriptionsResponse x
</ResponseMetadata>
</DescribeEventSubscriptionsResponse>
"""
DESCRIBE_ORDERABLE_CLUSTER_OPTIONS = """<DescribeOrderableDBInstanceOptionsResponse xmlns="http://rds.amazonaws.com/doc/2014-10-31/">
<DescribeOrderableDBInstanceOptionsResult>
<OrderableDBInstanceOptions>
{% for option in options %}
<OrderableDBInstanceOption>
<OutpostCapable>option["OutpostCapable"]</OutpostCapable>
<AvailabilityZones>
{% for zone in option["AvailabilityZones"] %}
<AvailabilityZone>
<Name>{{ zone["Name"] }}</Name>
</AvailabilityZone>
{% endfor %}
</AvailabilityZones>
<SupportsStorageThroughput>{{ option["SupportsStorageThroughput"] }}</SupportsStorageThroughput>
<SupportedEngineModes>
<member>provisioned</member>
</SupportedEngineModes>
<SupportsGlobalDatabases>{{ option["SupportsGlobalDatabases"] }}</SupportsGlobalDatabases>
<SupportsClusters>{{ option["SupportsClusters"] }}</SupportsClusters>
<Engine>{{ option["Engine"] }}</Engine>
<SupportedActivityStreamModes/>
<SupportsEnhancedMonitoring>false</SupportsEnhancedMonitoring>
<EngineVersion>{{ option["EngineVersion"] }}</EngineVersion>
<ReadReplicaCapable>false</ReadReplicaCapable>
<Vpc>true</Vpc>
<DBInstanceClass>{{ option["DBInstanceClass"] }}</DBInstanceClass>
<SupportsStorageEncryption>{{ option["SupportsStorageEncryption"] }}</SupportsStorageEncryption>
<SupportsKerberosAuthentication>{{ option["SupportsKerberosAuthentication"] }}</SupportsKerberosAuthentication>
<SupportedNetworkTypes>
<member>IPV4</member>
</SupportedNetworkTypes>
<AvailableProcessorFeatures/>
<SupportsPerformanceInsights>{{ option["SupportsPerformanceInsights"] }}</SupportsPerformanceInsights>
<LicenseModel>{{ option["LicenseModel"] }}</LicenseModel>
<MultiAZCapable>{{ option["MultiAZCapable"] }}</MultiAZCapable>
<RequiresCustomProcessorFeatures>{{ option["RequiresCustomProcessorFeatures"] }}</RequiresCustomProcessorFeatures>
<StorageType>{{ option["StorageType"] }}</StorageType>
<SupportsIops>{{ option["SupportsIops"] }}</SupportsIops>
<SupportsIAMDatabaseAuthentication>{{ option["SupportsIAMDatabaseAuthentication"] }}</SupportsIAMDatabaseAuthentication>
</OrderableDBInstanceOption>
{% endfor %}
</OrderableDBInstanceOptions>
{% if marker %}
<Marker>{{ marker }}</Marker>
{% endif %}
</DescribeOrderableDBInstanceOptionsResult>
<ResponseMetadata>
<RequestId>54212dc5-16c4-4eb8-a88e-448691e877ab</RequestId>
</ResponseMetadata>
</DescribeOrderableDBInstanceOptionsResponse>"""

View File

@ -1,4 +1,6 @@
import copy
from collections import namedtuple
from typing import Any, Dict
from botocore.utils import merge_dicts
@ -288,3 +290,58 @@ def valid_preferred_maintenance_window(maintenance_window, backup_window):
return "Maintenance window must be less than 24 hours."
except Exception:
return f"Invalid day:hour:minute value: {maintenance_window}"
ORDERABLE_DB_INSTANCE_ENCODING = {
"Engine": "E",
"EngineVersion": "EV",
"DBInstanceClass": "DBIC",
"LicenseModel": "L",
"AvailabilityZones": "AZ",
"MultiAZCapable": "MC",
"ReadReplicaCapable": "RC",
"Vpc": "V",
"SupportsStorageEncryption": "SE",
"StorageType": "ST",
"SupportsIops": "SI",
"SupportsEnhancedMonitoring": "SM",
"SupportsIAMDatabaseAuthentication": "SIAM",
"SupportsPerformanceInsights": "SPI",
"AvailableProcessorFeatures": "APF",
"SupportedEngineModes": "SEM",
"SupportsKerberosAuthentication": "SK",
"OutpostCapable": "O",
"SupportedActivityStreamModes": "SSM",
"SupportsGlobalDatabases": "SGD",
"SupportsClusters": "SC",
"SupportedNetworkTypes": "SN",
"SupportsStorageThroughput": "SST",
}
ORDERABLE_DB_INSTANCE_DECODING = {
v: k for (k, v) in ORDERABLE_DB_INSTANCE_ENCODING.items()
}
def encode_orderable_db_instance(db: Dict[str, Any]) -> Dict[str, Any]:
encoded = copy.deepcopy(db)
if "AvailabilityZones" in encoded:
encoded["AvailabilityZones"] = [
az["Name"] for az in encoded["AvailabilityZones"]
]
return {
ORDERABLE_DB_INSTANCE_ENCODING.get(key, key): value
for key, value in encoded.items()
}
def decode_orderable_db_instance(db: Dict[str, Any]) -> Dict[str, Any]:
decoded = copy.deepcopy(db)
decoded_az = ORDERABLE_DB_INSTANCE_ENCODING.get(
"AvailabilityZones", "AvailabilityZones"
)
if decoded_az in decoded:
decoded["AvailabilityZones"] = [{"Name": az} for az in decoded[decoded_az]]
return {
ORDERABLE_DB_INSTANCE_DECODING.get(key, key): value
for key, value in decoded.items()
}

View File

@ -32,6 +32,10 @@ def get_moto_implementation(service_name):
backends = list(mock().backends.values())
if backends:
backend = backends[0]["us-east-1"] if "us-east-1" in backends[0] else backends[0]["global"]
# Special use-case - neptune is only reachable via the RDS backend
# RDS has an attribute called 'neptune' pointing to the actual NeptuneBackend
if service_name == "neptune":
backend = backend.neptune
return backend, mock_name

View File

@ -0,0 +1,53 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import boto3
import json
import os
import subprocess
from moto.rds.utils import encode_orderable_db_instance
from time import sleep
PATH = "moto/rds/resources/cluster_options"
def main():
print("Getting DB Cluster Options from just neptune for now")
engines = ["neptune"]
root_dir = (
subprocess.check_output(["git", "rev-parse", "--show-toplevel"])
.decode()
.strip()
)
rds = boto3.client("rds", region_name="us-east-1")
for engine in engines:
print(f"Engine {engine}...")
dest = os.path.join(
root_dir, "{0}/{1}.json".format(PATH, engine)
)
try:
options = []
response = rds.describe_orderable_db_instance_options(Engine=engine)
options.extend(response["OrderableDBInstanceOptions"])
next_token = response.get("Marker", None)
while next_token:
response = rds.describe_orderable_db_instance_options(
Engine=engine, Marker=next_token
)
options.extend(response["OrderableDBInstanceOptions"])
next_token = response.get("Marker", None)
options = [encode_orderable_db_instance(option) for option in options]
print("Writing data to {0}".format(dest))
with open(dest, "w+") as open_file:
json.dump(options, open_file, indent=1, separators=(",", ":"))
except Exception as e:
print("Unable to write data to {0}".format(dest))
print(e)
# We don't want it to look like we're DDOS'ing AWS
sleep(1)
if __name__ == "__main__":
main()

View File

@ -18,7 +18,7 @@ output_path = os.path.join(script_dir, "..", output_file)
# Ignore the MotoAPI and InstanceMetadata backend, as they do not represent AWS services
# Ignore the APIGatewayV2, as it's URL's are managed by APIGateway
# Ignore S3bucket_path, as the functionality is covered in the S3 service
IGNORE_BACKENDS = ["moto_api", "instance_metadata", "apigatewayv2", "s3bucket_path"]
IGNORE_BACKENDS = ["moto_api", "instance_metadata", "apigatewayv2", "s3bucket_path", "neptune"]
def iter_backend_url_patterns():

View File

@ -229,7 +229,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/eks,moto/elasticache,moto/elasticbeanstalk,moto/elastictranscoder,moto/es,moto/moto_api
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/eks,moto/elasticache,moto/elasticbeanstalk,moto/elastictranscoder,moto/es,moto/moto_api,moto/neptune
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract

View File

@ -330,6 +330,16 @@ meta:
mq:
- TestAccMQBrokerDataSource
- TestAccMQBroker_
neptune:
- TestAccNeptuneCluster_basic
- TestAccNeptuneCluster_namePrefix
- TestAccNeptuneCluster_serverlessConfiguration
- TestAccNeptuneCluster_encrypted
- TestAccNeptuneCluster_backupsUpdate
- TestAccNeptuneCluster_kmsKey
- TestAccNeptuneCluster_tags
- TestAccNeptuneCluster_disappears
- TestAccNeptuneGlobalCluster_basic
quicksight:
- TestAccQuickSightUser
- TestAccQuickSightGroup_

View File

View File

@ -0,0 +1,22 @@
import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_neptune
@mock_neptune
def test_db_cluster_options():
# Verified against AWS on 23-02-2023
# We're not checking the exact data here, that is already done in TF
client = boto3.client("neptune", region_name="us-east-1")
response = client.describe_orderable_db_instance_options(Engine="neptune")
response["OrderableDBInstanceOptions"].should.have.length_of(286)
response = client.describe_orderable_db_instance_options(
Engine="neptune", EngineVersion="1.0.2.1"
)
response["OrderableDBInstanceOptions"].should.have.length_of(0)
response = client.describe_orderable_db_instance_options(
Engine="neptune", EngineVersion="1.0.3.0"
)
response["OrderableDBInstanceOptions"].should.have.length_of(12)

View File

@ -0,0 +1,25 @@
import boto3
from moto import mock_neptune
@mock_neptune
def test_add_tags_to_cluster():
conn = boto3.client("neptune", region_name="us-west-2")
resp = conn.create_db_cluster(
DBClusterIdentifier="db-primary-1",
Engine="neptune",
Tags=[{"Key": "k1", "Value": "v1"}],
)
cluster_arn = resp["DBCluster"]["DBClusterArn"]
conn.add_tags_to_resource(
ResourceName=cluster_arn, Tags=[{"Key": "k2", "Value": "v2"}]
)
tags = conn.list_tags_for_resource(ResourceName=cluster_arn)["TagList"]
tags.should.equal([{"Key": "k1", "Value": "v1"}, {"Key": "k2", "Value": "v2"}])
conn.remove_tags_from_resource(ResourceName=cluster_arn, TagKeys=["k1"])
tags = conn.list_tags_for_resource(ResourceName=cluster_arn)["TagList"]
tags.should.equal([{"Key": "k2", "Value": "v2"}])

View File

@ -0,0 +1,116 @@
"""Unit tests for neptune-supported APIs."""
import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
from moto import mock_neptune
# See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
@mock_neptune
def test_create_db_cluster():
client = boto3.client("neptune", region_name="us-east-2")
resp = client.create_db_cluster(DBClusterIdentifier="cluster-id", Engine="neptune")[
"DBCluster"
]
resp.should.have.key("DBClusterIdentifier").equals("cluster-id")
resp.should.have.key("DbClusterResourceId")
resp.should.have.key("DBClusterArn")
resp.should.have.key("Engine").equals("neptune")
resp.should.have.key("EngineVersion").equals("1.2.0.2")
resp.should.have.key("StorageEncrypted").equals(True)
resp.should.have.key("DBClusterParameterGroup").equals("")
resp.should.have.key("Endpoint")
resp.should.have.key("DbClusterResourceId").match("cluster-")
resp.should.have.key("AvailabilityZones").equals(
["us-east-2a", "us-east-2b", "us-east-2c"]
)
resp.shouldnt.have.key("ServerlessV2ScalingConfiguration")
# Double check this cluster is not available in another region
europe_client = boto3.client("neptune", region_name="eu-west-2")
europe_client.describe_db_clusters()["DBClusters"].should.have.length_of(0)
@mock_neptune
def test_create_db_cluster__with_additional_params():
client = boto3.client("neptune", region_name="us-east-1")
resp = client.create_db_cluster(
DBClusterIdentifier="cluster-id",
Engine="neptune",
EngineVersion="1.1.0.1",
StorageEncrypted=False,
DBClusterParameterGroupName="myprm",
KmsKeyId="key",
ServerlessV2ScalingConfiguration={"MinCapacity": 1.0, "MaxCapacity": 2.0},
DatabaseName="sth",
)["DBCluster"]
resp.should.have.key("StorageEncrypted").equals(False)
resp.should.have.key("DBClusterParameterGroup").equals("myprm")
resp.should.have.key("EngineVersion").equals("1.1.0.1")
resp.should.have.key("KmsKeyId").equals("key")
resp.should.have.key("ServerlessV2ScalingConfiguration").equals(
{"MinCapacity": 1.0, "MaxCapacity": 2.0}
)
resp.should.have.key("DatabaseName").equals("sth")
@mock_neptune
def test_describe_db_clusters():
client = boto3.client("neptune", region_name="ap-southeast-1")
client.describe_db_clusters()["DBClusters"].should.equal([])
client.create_db_cluster(DBClusterIdentifier="cluster-id", Engine="neptune")
clusters = client.describe_db_clusters()["DBClusters"]
clusters.should.have.length_of(1)
clusters[0]["DBClusterIdentifier"].should.equal("cluster-id")
clusters[0].should.have.key("Engine").equals("neptune")
@mock_neptune
def test_delete_db_cluster():
client = boto3.client("neptune", region_name="ap-southeast-1")
client.create_db_cluster(DBClusterIdentifier="cluster-id", Engine="neptune")
client.delete_db_cluster(DBClusterIdentifier="cluster-id")
client.describe_db_clusters()["DBClusters"].should.equal([])
@mock_neptune
def test_delete_unknown_db_cluster():
client = boto3.client("neptune", region_name="ap-southeast-1")
with pytest.raises(ClientError) as exc:
client.delete_db_cluster(DBClusterIdentifier="unknown-id")
err = exc.value.response["Error"]
err["Code"].should.equal("DBClusterNotFoundFault")
@mock_neptune
def test_modify_db_cluster():
client = boto3.client("neptune", region_name="us-east-1")
client.create_db_cluster(DBClusterIdentifier="cluster-id", Engine="neptune")
resp = client.modify_db_cluster(
DBClusterIdentifier="cluster-id",
EngineVersion="1.1.0.1",
DBClusterParameterGroupName="myprm",
PreferredBackupWindow="window",
)["DBCluster"]
resp.should.have.key("DBClusterParameterGroup").equals("myprm")
resp.should.have.key("EngineVersion").equals("1.1.0.1")
resp.should.have.key("PreferredBackupWindow").equals("window")
@mock_neptune
def test_start_db_cluster():
client = boto3.client("neptune", region_name="us-east-2")
client.create_db_cluster(DBClusterIdentifier="cluster-id", Engine="neptune")[
"DBCluster"
]
cluster = client.start_db_cluster(DBClusterIdentifier="cluster-id")["DBCluster"]
cluster.should.have.key("Status").equals("started")

View File

@ -0,0 +1,53 @@
import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_neptune
@mock_neptune
def test_describe():
client = boto3.client("neptune", "us-east-2")
client.describe_global_clusters()["GlobalClusters"].should.equal([])
@mock_neptune
def test_create_global_cluster():
client = boto3.client("neptune", "us-east-1")
resp = client.create_global_cluster(GlobalClusterIdentifier="g-id")["GlobalCluster"]
resp.should.have.key("GlobalClusterIdentifier").equals("g-id")
resp.should.have.key("GlobalClusterResourceId")
resp.should.have.key("GlobalClusterArn")
resp.should.have.key("Engine").equals("neptune")
resp.should.have.key("EngineVersion").equals("1.2.0.0")
resp.should.have.key("StorageEncrypted").equals(False)
resp.should.have.key("DeletionProtection").equals(False)
client.describe_global_clusters()["GlobalClusters"].should.have.length_of(1)
# As a global cluster, verify it can be retrieved everywhere
europe_client = boto3.client("neptune", "eu-north-1")
europe_client.describe_global_clusters()["GlobalClusters"].should.have.length_of(1)
@mock_neptune
def test_create_global_cluster_with_additional_params():
client = boto3.client("neptune", "us-east-1")
resp = client.create_global_cluster(
GlobalClusterIdentifier="g-id",
EngineVersion="1.0",
DeletionProtection=True,
StorageEncrypted=True,
)["GlobalCluster"]
resp.should.have.key("Engine").equals("neptune")
resp.should.have.key("EngineVersion").equals("1.0")
resp.should.have.key("StorageEncrypted").equals(True)
resp.should.have.key("DeletionProtection").equals(True)
@mock_neptune
def test_delete_global_cluster():
client = boto3.client("neptune", "us-east-1")
client.create_global_cluster(GlobalClusterIdentifier="g-id2")
client.delete_global_cluster(GlobalClusterIdentifier="g-id2")
client.describe_global_clusters()["GlobalClusters"].should.equal([])

View File

@ -5,6 +5,9 @@ from moto.rds.utils import (
apply_filter,
merge_filters,
validate_filters,
encode_orderable_db_instance,
decode_orderable_db_instance,
ORDERABLE_DB_INSTANCE_ENCODING,
)
@ -134,3 +137,65 @@ class TestMergingFilters(object):
assert len(merged.keys()) == 4
for key in merged.keys():
assert merged[key] == ["value1", "value2"]
def test_encode_orderable_db_instance():
# Data from AWS comes in a specific format. Verify we can encode/decode it to something more compact
original = {
"Engine": "neptune",
"EngineVersion": "1.0.3.0",
"DBInstanceClass": "db.r4.2xlarge",
"LicenseModel": "amazon-license",
"AvailabilityZones": [
{"Name": "us-east-1a"},
{"Name": "us-east-1b"},
{"Name": "us-east-1c"},
{"Name": "us-east-1d"},
{"Name": "us-east-1e"},
{"Name": "us-east-1f"},
],
"MultiAZCapable": False,
"ReadReplicaCapable": False,
"Vpc": True,
"SupportsStorageEncryption": True,
"StorageType": "aurora",
"SupportsIops": False,
"SupportsEnhancedMonitoring": False,
"SupportsIAMDatabaseAuthentication": True,
"SupportsPerformanceInsights": False,
"AvailableProcessorFeatures": [],
"SupportedEngineModes": ["provisioned"],
"SupportsKerberosAuthentication": False,
"OutpostCapable": False,
"SupportedActivityStreamModes": [],
"SupportsGlobalDatabases": False,
"SupportsClusters": True,
"Support edNetworkTypes": ["IPV4"],
}
short = encode_orderable_db_instance(original)
decode_orderable_db_instance(short).should.equal(original)
def test_encode_orderable_db_instance__short_format():
# Verify this works in a random format. We don't know for sure what AWS returns, so it should always work regardless of the input
short = {
"Engine": "neptune",
"EngineVersion": "1.0.3.0",
"DBInstanceClass": "db.r4.2xlarge",
"LicenseModel": "amazon-license",
"SupportsKerberosAuthentication": False,
"OutpostCapable": False,
"SupportedActivityStreamModes": [],
"SupportsGlobalDatabases": False,
"SupportsClusters": True,
"SupportedNetworkTypes": ["IPV4"],
}
decode_orderable_db_instance(encode_orderable_db_instance(short)).should.equal(
short
)
def test_verify_encoding_is_unique():
len(set(ORDERABLE_DB_INSTANCE_ENCODING.values())).should.equal(
len(ORDERABLE_DB_INSTANCE_ENCODING.keys())
)