ResourceGroupsTaggingAPI: tag_resource() for RDS resources (#6727)

This commit is contained in:
Bert Blommers 2023-08-26 07:15:57 +00:00 committed by GitHub
parent 59ebe7d6a5
commit 111c349682
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 152 additions and 127 deletions

View File

@ -220,6 +220,10 @@ class Cluster:
or self.replication_source_identifier is not None or self.replication_source_identifier is not None
) )
@property
def arn(self) -> str:
return self.db_cluster_arn
@property @property
def db_cluster_arn(self) -> str: def db_cluster_arn(self) -> str:
return f"arn:aws:rds:{self.region_name}:{self.account_id}:cluster:{self.db_cluster_identifier}" return f"arn:aws:rds:{self.region_name}:{self.account_id}:cluster:{self.db_cluster_identifier}"
@ -461,6 +465,10 @@ class ClusterSnapshot(BaseModel):
datetime.datetime.utcnow() datetime.datetime.utcnow()
) )
@property
def arn(self) -> str:
return self.snapshot_arn
@property @property
def snapshot_arn(self) -> str: def snapshot_arn(self) -> str:
return f"arn:aws:rds:{self.cluster.region_name}:{self.cluster.account_id}:cluster-snapshot:{self.snapshot_id}" return f"arn:aws:rds:{self.cluster.region_name}:{self.cluster.account_id}:cluster-snapshot:{self.snapshot_id}"
@ -645,6 +653,10 @@ class Database(CloudFormationModel):
kwargs.get("enable_cloudwatch_logs_exports") or [] kwargs.get("enable_cloudwatch_logs_exports") or []
) )
@property
def arn(self) -> str:
return self.db_instance_arn
@property @property
def db_instance_arn(self) -> str: def db_instance_arn(self) -> str:
return f"arn:aws:rds:{self.region_name}:{self.account_id}:db:{self.db_instance_identifier}" return f"arn:aws:rds:{self.region_name}:{self.account_id}:db:{self.db_instance_identifier}"
@ -1091,6 +1103,10 @@ class DatabaseSnapshot(BaseModel):
datetime.datetime.utcnow() datetime.datetime.utcnow()
) )
@property
def arn(self) -> str:
return self.snapshot_arn
@property @property
def snapshot_arn(self) -> str: def snapshot_arn(self) -> str:
return f"arn:aws:rds:{self.database.region_name}:{self.database.account_id}:snapshot:{self.snapshot_id}" return f"arn:aws:rds:{self.database.region_name}:{self.database.account_id}:snapshot:{self.snapshot_id}"

View File

@ -2,6 +2,7 @@ from typing import Any, Dict, List, Iterator, Optional, Tuple
from moto.core import BaseBackend, BackendDict from moto.core import BaseBackend, BackendDict
from moto.core.exceptions import RESTError from moto.core.exceptions import RESTError
from moto.moto_api._internal import mock_random from moto.moto_api._internal import mock_random
from moto.utilities.tagging_service import TaggingService
from moto.s3.models import s3_backends, S3Backend from moto.s3.models import s3_backends, S3Backend
from moto.ec2 import ec2_backends from moto.ec2 import ec2_backends
@ -370,33 +371,25 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
yield {"ResourceARN": f"{kms_key.arn}", "Tags": tags} yield {"ResourceARN": f"{kms_key.arn}", "Tags": tags}
# RDS Cluster # RDS resources
if ( resource_map: Dict[str, Dict[str, Any]] = {
not resource_type_filters "rds:cluster": self.rds_backend.clusters,
or "rds" in resource_type_filters "rds:db": self.rds_backend.databases,
or "rds:cluster" in resource_type_filters "rds:snapshot": self.rds_backend.database_snapshots,
): "rds:cluster-snapshot": self.rds_backend.cluster_snapshots,
for rds_cluster in self.rds_backend.clusters.values():
tags = rds_cluster.get_tags()
if not tags or not tag_filter(tags):
continue
yield {
"ResourceARN": rds_cluster.db_cluster_arn,
"Tags": tags,
} }
for resource_type, resource_source in resource_map.items():
# RDS Instance
if ( if (
not resource_type_filters not resource_type_filters
or "rds" in resource_type_filters or "rds" in resource_type_filters
or "rds:db" in resource_type_filters or resource_type in resource_type_filters
): ):
for database in self.rds_backend.databases.values(): for resource in resource_source.values():
tags = database.get_tags() tags = resource.get_tags()
if not tags or not tag_filter(tags): if not tags or not tag_filter(tags):
continue continue
yield { yield {
"ResourceARN": database.db_instance_arn, "ResourceARN": resource.arn,
"Tags": tags, "Tags": tags,
} }
@ -404,37 +397,6 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
# RDS Option Group # RDS Option Group
# RDS Parameter Group # RDS Parameter Group
# RDS Security Group # RDS Security Group
# RDS Snapshot
if (
not resource_type_filters
or "rds" in resource_type_filters
or "rds:snapshot" in resource_type_filters
):
for snapshot in self.rds_backend.database_snapshots.values():
tags = snapshot.get_tags()
if not tags or not tag_filter(tags):
continue
yield {
"ResourceARN": snapshot.snapshot_arn,
"Tags": tags,
}
# RDS Cluster Snapshot
if (
not resource_type_filters
or "rds" in resource_type_filters
or "rds:cluster-snapshot" in resource_type_filters
):
for snapshot in self.rds_backend.cluster_snapshots.values():
tags = snapshot.get_tags()
if not tags or not tag_filter(tags):
continue
yield {
"ResourceARN": snapshot.snapshot_arn,
"Tags": tags,
}
# RDS Subnet Group # RDS Subnet Group
# RDS Event Subscription # RDS Event Subscription
@ -767,14 +729,27 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
return new_token, result return new_token, result
# These methods will be called from responses.py. def tag_resources(
# They should call a tag function inside of the moto module self, resource_arns: List[str], tags: Dict[str, str]
# that governs the resource, that way if the target module ) -> Dict[str, Dict[str, Any]]:
# changes how tags are delt with theres less to change """
Only RDS resources are currently supported
"""
missing_resources = []
missing_error: Dict[str, Any] = {
"StatusCode": 404,
"ErrorCode": "InternalServiceException",
"ErrorMessage": "Service not yet supported",
}
for arn in resource_arns:
if arn.startswith("arn:aws:rds:"):
self.rds_backend.add_tags_to_resource(
arn, TaggingService.convert_dict_to_tags_input(tags)
)
else:
missing_resources.append(arn)
return {arn: missing_error for arn in missing_resources}
# def tag_resources(self, resource_arn_list, tags):
# return failed_resources_map
#
# def untag_resources(self, resource_arn_list, tag_keys): # def untag_resources(self, resource_arn_list, tag_keys):
# return failed_resources_map # return failed_resources_map

View File

@ -58,21 +58,15 @@ class ResourceGroupsTaggingAPIResponse(BaseResponse):
return json.dumps(response) return json.dumps(response)
# These methods are all thats left to be implemented def tag_resources(self) -> str:
# the response is already set up, all thats needed is resource_arns = self._get_param("ResourceARNList")
# the respective model function to be implemented. tags = self._get_param("Tags")
# failed_resources = self.backend.tag_resources(
# def tag_resources(self): resource_arns=resource_arns, tags=tags
# resource_arn_list = self._get_list_prefix("ResourceARNList.member") )
# tags = self._get_param("Tags")
# failed_resources_map = self.backend.tag_resources( return json.dumps({"FailedResourcesMap": failed_resources})
# resource_arn_list=resource_arn_list,
# tags=tags,
# )
#
# # failed_resources_map should be {'resource': {'ErrorCode': str, 'ErrorMessage': str, 'StatusCode': int}}
# return json.dumps({'FailedResourcesMap': failed_resources_map})
#
# def untag_resources(self): # def untag_resources(self):
# resource_arn_list = self._get_list_prefix("ResourceARNList.member") # resource_arn_list = self._get_list_prefix("ResourceARNList.member")
# tag_keys = self._get_list_prefix("TagKeys.member") # tag_keys = self._get_list_prefix("TagKeys.member")

View File

@ -0,0 +1,71 @@
import boto3
import unittest
from moto import mock_rds
from moto import mock_resourcegroupstaggingapi
@mock_rds
@mock_resourcegroupstaggingapi
class TestRdsTagging(unittest.TestCase):
def setUp(self) -> None:
self.rds = boto3.client("rds", region_name="us-west-2")
self.rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-west-2")
self.resources_tagged = []
self.resources_untagged = []
for i in range(3):
database = self.rds.create_db_instance(
DBInstanceIdentifier=f"db-instance-{i}",
Engine="postgres",
DBInstanceClass="db.m1.small",
CopyTagsToSnapshot=bool(i),
Tags=[{"Key": "test", "Value": f"value-{i}"}] if i else [],
).get("DBInstance")
snapshot = self.rds.create_db_snapshot(
DBInstanceIdentifier=database["DBInstanceIdentifier"],
DBSnapshotIdentifier=f"snapshot-{i}",
).get("DBSnapshot")
group = self.resources_tagged if i else self.resources_untagged
group.append(database["DBInstanceArn"])
group.append(snapshot["DBSnapshotArn"])
def test_get_resources_rds(self):
def assert_response(response, expected_count, resource_type=None):
results = response.get("ResourceTagMappingList", [])
assert len(results) == expected_count
for item in results:
arn = item["ResourceARN"]
assert arn in self.resources_tagged
assert arn not in self.resources_untagged
if resource_type:
assert f":{resource_type}:" in arn
resp = self.rtapi.get_resources(ResourceTypeFilters=["rds"])
assert_response(resp, 4)
resp = self.rtapi.get_resources(ResourceTypeFilters=["rds:db"])
assert_response(resp, 2, resource_type="db")
resp = self.rtapi.get_resources(ResourceTypeFilters=["rds:snapshot"])
assert_response(resp, 2, resource_type="snapshot")
resp = self.rtapi.get_resources(
TagFilters=[{"Key": "test", "Values": ["value-1"]}]
)
assert_response(resp, 2)
def test_tag_resources_rds(self):
# WHEN
# we tag resources
self.rtapi.tag_resources(
ResourceARNList=self.resources_tagged,
Tags={"key1": "value1", "key2": "value2"},
)
# THEN
# we can retrieve the tags using the RDS API
def get_tags(arn):
return self.rds.list_tags_for_resource(ResourceName=arn)["TagList"]
for arn in self.resources_tagged:
assert {"Key": "key1", "Value": "value1"} in get_tags(arn)
assert {"Key": "key2", "Value": "value2"} in get_tags(arn)
for arn in self.resources_untagged:
assert get_tags(arn) == []

View File

@ -6,7 +6,6 @@ from botocore.client import ClientError
from moto import mock_ec2 from moto import mock_ec2
from moto import mock_elbv2 from moto import mock_elbv2
from moto import mock_kms from moto import mock_kms
from moto import mock_rds
from moto import mock_resourcegroupstaggingapi from moto import mock_resourcegroupstaggingapi
from moto import mock_s3 from moto import mock_s3
from moto import mock_lambda from moto import mock_lambda
@ -223,7 +222,6 @@ def test_get_resources_ecs():
assert task_two not in resp["ResourceTagMappingList"][0]["ResourceARN"] assert task_two not in resp["ResourceTagMappingList"][0]["ResourceARN"]
@mock_rds
@mock_ec2 @mock_ec2
@mock_resourcegroupstaggingapi @mock_resourcegroupstaggingapi
def test_get_resources_ec2(): def test_get_resources_ec2():
@ -588,49 +586,6 @@ def test_multiple_tag_filters():
assert instance_2_id not in results[0]["ResourceARN"] assert instance_2_id not in results[0]["ResourceARN"]
@mock_rds
@mock_resourcegroupstaggingapi
def test_get_resources_rds():
client = boto3.client("rds", region_name="us-west-2")
resources_tagged = []
resources_untagged = []
for i in range(3):
database = client.create_db_instance(
DBInstanceIdentifier=f"db-instance-{i}",
Engine="postgres",
DBInstanceClass="db.m1.small",
CopyTagsToSnapshot=bool(i),
Tags=[{"Key": "test", "Value": f"value-{i}"}] if i else [],
).get("DBInstance")
snapshot = client.create_db_snapshot(
DBInstanceIdentifier=database["DBInstanceIdentifier"],
DBSnapshotIdentifier=f"snapshot-{i}",
).get("DBSnapshot")
group = resources_tagged if i else resources_untagged
group.append(database["DBInstanceArn"])
group.append(snapshot["DBSnapshotArn"])
def assert_response(response, expected_count, resource_type=None):
results = response.get("ResourceTagMappingList", [])
assert len(results) == expected_count
for item in results:
arn = item["ResourceARN"]
assert arn in resources_tagged
assert arn not in resources_untagged
if resource_type:
assert f":{resource_type}:" in arn
rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-west-2")
resp = rtapi.get_resources(ResourceTypeFilters=["rds"])
assert_response(resp, 4)
resp = rtapi.get_resources(ResourceTypeFilters=["rds:db"])
assert_response(resp, 2, resource_type="db")
resp = rtapi.get_resources(ResourceTypeFilters=["rds:snapshot"])
assert_response(resp, 2, resource_type="snapshot")
resp = rtapi.get_resources(TagFilters=[{"Key": "test", "Values": ["value-1"]}])
assert_response(resp, 2)
@mock_lambda @mock_lambda
@mock_resourcegroupstaggingapi @mock_resourcegroupstaggingapi
@mock_iam @mock_iam
@ -717,3 +672,17 @@ def test_get_resources_lambda():
resp = rtapi.get_resources(TagFilters=[{"Key": "Shape", "Values": ["rectangle"]}]) resp = rtapi.get_resources(TagFilters=[{"Key": "Shape", "Values": ["rectangle"]}])
assert_response(resp, [rectangle_arn]) assert_response(resp, [rectangle_arn])
@mock_resourcegroupstaggingapi
def test_tag_resources_for_unknown_service():
rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-west-2")
missing_resources = rtapi.tag_resources(
ResourceARNList=["arn:aws:service_x"], Tags={"key1": "k", "key2": "v"}
)["FailedResourcesMap"]
assert "arn:aws:service_x" in missing_resources
assert (
missing_resources["arn:aws:service_x"]["ErrorCode"]
== "InternalServiceException"
)