Add filtering support for RDS::DBCluster (#6144)

* Add filtering support for RDS::DBCluster

* Add support for filtering by `db-cluster-id`.
* Add support for filtering by `engine`.

Obviates the need for the recent change (#6114) because the filter definition maps
`db_cluster_id` to either `db_cluster_identifier` or `db_cluster_arn`.

* No longer required due to filter configuration.

* Account for Neptune weirdness...
This commit is contained in:
Brian Pandola 2023-03-28 00:54:41 -07:00 committed by GitHub
parent 28008b7e05
commit 93638de380
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 11 deletions

View File

@ -46,6 +46,13 @@ from .utils import (
class Cluster: class Cluster:
SUPPORTED_FILTERS = {
"db-cluster-id": FilterDef(
["db_cluster_arn", "db_cluster_identifier"], "DB Cluster Identifiers"
),
"engine": FilterDef(["engine"], "Engine Names"),
}
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.db_name = kwargs.get("db_name") self.db_name = kwargs.get("db_name")
self.db_cluster_identifier = kwargs.get("db_cluster_identifier") self.db_cluster_identifier = kwargs.get("db_cluster_identifier")
@ -1948,17 +1955,19 @@ class RDSBackend(BaseBackend):
return self.cluster_snapshots.pop(db_snapshot_identifier) return self.cluster_snapshots.pop(db_snapshot_identifier)
def describe_db_clusters(self, cluster_identifier): def describe_db_clusters(self, cluster_identifier=None, filters=None):
clusters = self.clusters
clusters_neptune = self.neptune.clusters
if cluster_identifier: if cluster_identifier:
# ARN to identifier filters = merge_filters(filters, {"db-cluster-id": [cluster_identifier]})
# arn:aws:rds:eu-north-1:123456789012:cluster:cluster --> cluster-id if filters:
cluster_identifier = cluster_identifier.split(":")[-1] clusters = self._filter_resources(clusters, filters, Cluster)
if cluster_identifier in self.clusters: clusters_neptune = self._filter_resources(
return [self.clusters[cluster_identifier]] clusters_neptune, filters, Cluster
if cluster_identifier in self.neptune.clusters: )
return [self.neptune.clusters[cluster_identifier]] if cluster_identifier and not (clusters or clusters_neptune):
raise DBClusterNotFoundError(cluster_identifier) raise DBClusterNotFoundError(cluster_identifier)
return list(self.clusters.values()) + list(self.neptune.clusters.values()) return list(clusters.values()) + list(clusters_neptune.values())
def describe_db_cluster_snapshots( def describe_db_cluster_snapshots(
self, db_cluster_identifier, db_snapshot_identifier, filters=None self, db_cluster_identifier, db_snapshot_identifier, filters=None

View File

@ -583,7 +583,11 @@ class RDSResponse(BaseResponse):
def describe_db_clusters(self): def describe_db_clusters(self):
_id = self._get_param("DBClusterIdentifier") _id = self._get_param("DBClusterIdentifier")
clusters = self.backend.describe_db_clusters(cluster_identifier=_id) filters = self._get_multi_param("Filters.Filter.")
filters = {f["Name"]: f["Values"] for f in filters}
clusters = self.backend.describe_db_clusters(
cluster_identifier=_id, filters=filters
)
template = self.response_template(DESCRIBE_CLUSTERS_TEMPLATE) template = self.response_template(DESCRIBE_CLUSTERS_TEMPLATE)
return template.render(clusters=clusters) return template.render(clusters=clusters)

View File

@ -64,7 +64,9 @@ def test_describe_db_clusters():
client.create_db_cluster(DBClusterIdentifier="cluster-id", Engine="neptune") client.create_db_cluster(DBClusterIdentifier="cluster-id", Engine="neptune")
clusters = client.describe_db_clusters()["DBClusters"] clusters = client.describe_db_clusters(DBClusterIdentifier="cluster-id")[
"DBClusters"
]
clusters.should.have.length_of(1) clusters.should.have.length_of(1)
clusters[0]["DBClusterIdentifier"].should.equal("cluster-id") clusters[0]["DBClusterIdentifier"].should.equal("cluster-id")
clusters[0].should.have.key("Engine").equals("neptune") clusters[0].should.have.key("Engine").equals("neptune")

View File

@ -811,3 +811,37 @@ def test_create_db_cluster_with_enable_http_endpoint_invalid():
) )
cluster = resp["DBCluster"] cluster = resp["DBCluster"]
cluster.should.have.key("HttpEndpointEnabled").equal(False) cluster.should.have.key("HttpEndpointEnabled").equal(False)
@mock_rds
def test_describe_db_clusters_filter_by_engine():
client = boto3.client("rds", region_name="eu-north-1")
client.create_db_cluster(
DBClusterIdentifier="id1",
Engine="aurora-mysql",
MasterUsername="root",
MasterUserPassword="hunter21",
)
client.create_db_cluster(
DBClusterIdentifier="id2",
Engine="aurora-postgresql",
MasterUsername="root",
MasterUserPassword="hunter21",
)
resp = client.describe_db_clusters(
Filters=[
{
"Name": "engine",
"Values": ["aurora-postgresql"],
}
]
)
clusters = resp["DBClusters"]
assert len(clusters) == 1
cluster = clusters[0]
assert cluster["DBClusterIdentifier"] == "id2"
assert cluster["Engine"] == "aurora-postgresql"