Adding dynamodb support to get_resources in resourcegroupstagging api (#7471)

This commit is contained in:
praneeth papishetty 2024-03-17 13:35:21 -07:00 committed by GitHub
parent 62188e7cd1
commit 5a0bc0d6cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 91 additions and 5 deletions

View File

@ -5,6 +5,7 @@ from moto.awslambda.models import LambdaBackend, lambda_backends
from moto.backup.models import BackupBackend, backup_backends
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.exceptions import RESTError
from moto.dynamodb.models import DynamoDBBackend, dynamodb_backends
from moto.ec2 import ec2_backends
from moto.ecs.models import EC2ContainerServiceBackend, ecs_backends
from moto.elb.models import ELBBackend, elb_backends
@ -110,6 +111,10 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
def backup_backend(self) -> BackupBackend:
return backup_backends[self.account_id][self.region_name]
@property
def dynamodb_backend(self) -> DynamoDBBackend:
return dynamodb_backends[self.account_id][self.region_name]
def _get_resources_generator(
self,
tag_filters: Optional[List[Dict[str, Any]]] = None,
@ -198,7 +203,6 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
# CloudFormation
if not resource_type_filters or "cloudformation:stack" in resource_type_filters:
try:
from moto.cloudformation import cloudformation_backends
@ -500,7 +504,6 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
# SNS
if not resource_type_filters or "sns" in resource_type_filters:
for topic in self.sns_backend.topics.values():
tags = format_tags(topic._tags)
if not tags or not tag_filter(
@ -545,6 +548,21 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
"Tags": tags,
}
if (
not resource_type_filters
or "dynamodb" in resource_type_filters
or "dynamodb:table" in resource_type_filters
):
for table in self.dynamodb_backend.tables.values():
tags = table.tags
if not tags or not tag_filter(tags):
continue
yield {
"ResourceARN": table.table_arn,
"Tags": tags,
}
def _get_tag_keys_generator(self) -> Iterator[str]:
# Look at
# https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html
@ -738,7 +756,6 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
def get_tag_keys(
self, pagination_token: Optional[str] = None
) -> Tuple[Optional[str], List[str]]:
if pagination_token:
if pagination_token not in self._pages:
raise RESTError(
@ -786,7 +803,6 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
def get_tag_values(
self, pagination_token: Optional[str], key: str
) -> Tuple[Optional[str], List[str]]:
if pagination_token:
if pagination_token not in self._pages:
raise RESTError(
@ -835,7 +851,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
self, resource_arns: List[str], tags: Dict[str, str]
) -> Dict[str, Dict[str, Any]]:
"""
Only Logs and RDS resources are currently supported
Only DynamoDB, Logs and RDS resources are currently supported
"""
missing_resources = []
missing_error: Dict[str, Any] = {
@ -850,6 +866,10 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
)
if arn.startswith("arn:aws:logs:"):
self.logs_backend.tag_resource(arn, tags)
if arn.startswith("arn:aws:dynamodb"):
self.dynamodb_backend.tag_resource(
arn, TaggingService.convert_dict_to_tags_input(tags)
)
else:
missing_resources.append(arn)
return {arn: missing_error for arn in missing_resources}

View File

@ -0,0 +1,66 @@
import unittest
import boto3
from moto import mock_aws
@mock_aws
class TestDynamoDBTagging(unittest.TestCase):
def setUp(self) -> None:
self.dynamodb = boto3.client("dynamodb", region_name="us-west-2")
self.rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-west-2")
self.resources_tagged = []
self.resources_untagged = []
for i in range(3):
table = self.dynamodb.create_table(
AttributeDefinitions=[{"AttributeName": "col", "AttributeType": "S"}],
TableName=f"table-{i}",
KeySchema=[{"AttributeName": "col", "KeyType": "HASH"}],
BillingMode="PAY_PER_REQUEST",
).get("TableDescription")
self.dynamodb.tag_resource(
ResourceArn=table["TableArn"],
Tags=[{"Key": "test", "Value": f"value-{i}"}] if i else [],
)
group = self.resources_tagged if i else self.resources_untagged
group.append(table["TableArn"])
def test_get_resources_dynamodb(self):
def assert_response(response, expected_count, resource_type=None):
results = response.get("ResourceTagMappingList", [])
assert len(results) == expected_count
for item in results:
arn = item["ResourceARN"]
assert arn in self.resources_tagged
assert arn not in self.resources_untagged
if resource_type:
assert f":{resource_type}:" in arn
resp = self.rtapi.get_resources(ResourceTypeFilters=["dynamodb"])
assert_response(resp, 2)
resp = self.rtapi.get_resources(ResourceTypeFilters=["dynamodb:table"])
assert_response(resp, 2)
resp = self.rtapi.get_resources(
TagFilters=[{"Key": "test", "Values": ["value-1"]}]
)
assert_response(resp, 1)
def test_tag_resources_dynamodb(self):
# WHEN
# we tag resources
self.rtapi.tag_resources(
ResourceARNList=self.resources_tagged,
Tags={"key1": "value1", "key2": "value2"},
)
# THEN
# we can retrieve the tags using the DynamoDB API
def get_tags(arn):
return self.dynamodb.list_tags_of_resource(ResourceArn=arn)["Tags"]
for arn in self.resources_tagged:
assert {"Key": "key1", "Value": "value1"} in get_tags(arn)
assert {"Key": "key2", "Value": "value2"} in get_tags(arn)
for arn in self.resources_untagged:
assert get_tags(arn) == []