Add ResourceGroupsTaggingAPI Support for RDS Resources (#3674)
- AWS::RDS::DBInstance - AWS::RDS::DBSnapshot
This commit is contained in:
parent
3cd03efa3d
commit
4a01360d88
@ -352,11 +352,40 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
|
||||
yield {"ResourceARN": "{0}".format(kms_key.arn), "Tags": tags}
|
||||
|
||||
# RDS Instance
|
||||
if (
|
||||
not resource_type_filters
|
||||
or "rds" in resource_type_filters
|
||||
or "rds:db" in resource_type_filters
|
||||
):
|
||||
for database in self.rds_backend.databases.values():
|
||||
tags = database.get_tags()
|
||||
if not tags or not tag_filter(tags):
|
||||
continue
|
||||
yield {
|
||||
"ResourceARN": database.db_instance_arn,
|
||||
"Tags": tags,
|
||||
}
|
||||
|
||||
# RDS Reserved Database Instance
|
||||
# RDS Option Group
|
||||
# RDS Parameter Group
|
||||
# RDS Security Group
|
||||
|
||||
# RDS Snapshot
|
||||
if (
|
||||
not resource_type_filters
|
||||
or "rds" in resource_type_filters
|
||||
or "rds:snapshot" in resource_type_filters
|
||||
):
|
||||
for snapshot in self.rds_backend.snapshots.values():
|
||||
tags = snapshot.get_tags()
|
||||
if not tags or not tag_filter(tags):
|
||||
continue
|
||||
yield {
|
||||
"ResourceARN": snapshot.snapshot_arn,
|
||||
"Tags": tags,
|
||||
}
|
||||
|
||||
# RDS Subnet Group
|
||||
# RDS Event Subscription
|
||||
|
||||
|
@ -5,6 +5,7 @@ import sure # noqa
|
||||
from moto import mock_ec2
|
||||
from moto import mock_elbv2
|
||||
from moto import mock_kms
|
||||
from moto import mock_rds2
|
||||
from moto import mock_resourcegroupstaggingapi
|
||||
from moto import mock_s3
|
||||
from tests import EXAMPLE_AMI_ID, EXAMPLE_AMI_ID2
|
||||
@ -374,3 +375,46 @@ def test_multiple_tag_filters():
|
||||
results.should.have.length_of(1)
|
||||
instance_1_id.should.be.within(results[0]["ResourceARN"])
|
||||
instance_2_id.shouldnt.be.within(results[0]["ResourceARN"])
|
||||
|
||||
|
||||
@mock_rds2
|
||||
@mock_resourcegroupstaggingapi
|
||||
def test_get_resources_rds():
|
||||
client = boto3.client("rds", region_name="us-west-2")
|
||||
resources_tagged = []
|
||||
resources_untagged = []
|
||||
for i in range(3):
|
||||
database = client.create_db_instance(
|
||||
DBInstanceIdentifier="db-instance-{}".format(i),
|
||||
Engine="postgres",
|
||||
DBInstanceClass="db.m1.small",
|
||||
CopyTagsToSnapshot=True if i else False,
|
||||
Tags=[{"Key": "test", "Value": "value-{}".format(i)}] if i else [],
|
||||
).get("DBInstance")
|
||||
snapshot = client.create_db_snapshot(
|
||||
DBInstanceIdentifier=database["DBInstanceIdentifier"],
|
||||
DBSnapshotIdentifier="snapshot-{}".format(i),
|
||||
).get("DBSnapshot")
|
||||
group = resources_tagged if i else resources_untagged
|
||||
group.append(database["DBInstanceArn"])
|
||||
group.append(snapshot["DBSnapshotArn"])
|
||||
|
||||
def assert_response(response, expected_count, resource_type=None):
|
||||
results = response.get("ResourceTagMappingList", [])
|
||||
results.should.have.length_of(expected_count)
|
||||
for item in results:
|
||||
arn = item["ResourceARN"]
|
||||
arn.should.be.within(resources_tagged)
|
||||
arn.should_not.be.within(resources_untagged)
|
||||
if resource_type:
|
||||
sure.this(":{}:".format(resource_type)).should.be.within(arn)
|
||||
|
||||
rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-west-2")
|
||||
resp = rtapi.get_resources(ResourceTypeFilters=["rds"])
|
||||
assert_response(resp, 4)
|
||||
resp = rtapi.get_resources(ResourceTypeFilters=["rds:db"])
|
||||
assert_response(resp, 2, resource_type="db")
|
||||
resp = rtapi.get_resources(ResourceTypeFilters=["rds:snapshot"])
|
||||
assert_response(resp, 2, resource_type="snapshot")
|
||||
resp = rtapi.get_resources(TagFilters=[{"Key": "test", "Values": ["value-1"]}])
|
||||
assert_response(resp, 2)
|
||||
|
Loading…
Reference in New Issue
Block a user