Add tag get_resource support for target groups (#3012)

This commit is contained in:
Ben 2020-05-24 05:25:38 -04:00 committed by GitHub
parent 59c71760ff
commit 8daafaec58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 110 additions and 45 deletions

View File

@ -286,8 +286,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
} }
# TODO add these to the keys and values functions / combine functions # TODO add these to the keys and values functions / combine functions
# ELB # ELB, resource type elasticloadbalancing:loadbalancer
def get_elbv2_tags(arn): def get_elbv2_tags(arn):
result = [] result = []
for key, value in self.elbv2_backend.load_balancers[elb.arn].tags.items(): for key, value in self.elbv2_backend.load_balancers[elb.arn].tags.items():
@ -296,8 +295,8 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
if ( if (
not resource_type_filters not resource_type_filters
or "elasticloadbalancer" in resource_type_filters or "elasticloadbalancing" in resource_type_filters
or "elasticloadbalancer:loadbalancer" in resource_type_filters or "elasticloadbalancing:loadbalancer" in resource_type_filters
): ):
for elb in self.elbv2_backend.load_balancers.values(): for elb in self.elbv2_backend.load_balancers.values():
tags = get_elbv2_tags(elb.arn) tags = get_elbv2_tags(elb.arn)
@ -306,6 +305,27 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
yield {"ResourceARN": "{0}".format(elb.arn), "Tags": tags} yield {"ResourceARN": "{0}".format(elb.arn), "Tags": tags}
# ELB Target Group, resource type elasticloadbalancing:targetgroup
def get_target_group_tags(arn):
result = []
for key, value in self.elbv2_backend.target_groups[
target_group.arn
].tags.items():
result.append({"Key": key, "Value": value})
return result
if (
not resource_type_filters
or "elasticloadbalancing" in resource_type_filters
or "elasticloadbalancing:targetgroup" in resource_type_filters
):
for target_group in self.elbv2_backend.target_groups.values():
tags = get_target_group_tags(target_group.arn)
if not tag_filter(tags): # Skip if no tags, or invalid filter
continue
yield {"ResourceARN": "{0}".format(target_group.arn), "Tags": tags}
# EMR Cluster # EMR Cluster
# Glacier Vault # Glacier Vault

View File

@ -9,44 +9,6 @@ from moto import mock_resourcegroupstaggingapi
from moto import mock_s3 from moto import mock_s3
@mock_s3
@mock_resourcegroupstaggingapi
def test_get_resources_s3():
# Tests pagination
s3_client = boto3.client("s3", region_name="eu-central-1")
# Will end up having key1,key2,key3,key4
response_keys = set()
# Create 4 buckets
for i in range(1, 5):
i_str = str(i)
s3_client.create_bucket(
Bucket="test_bucket" + i_str,
CreateBucketConfiguration={"LocationConstraint": "eu-central-1"},
)
s3_client.put_bucket_tagging(
Bucket="test_bucket" + i_str,
Tagging={"TagSet": [{"Key": "key" + i_str, "Value": "value" + i_str}]},
)
response_keys.add("key" + i_str)
rtapi = boto3.client("resourcegroupstaggingapi", region_name="eu-central-1")
resp = rtapi.get_resources(ResourcesPerPage=2)
for resource in resp["ResourceTagMappingList"]:
response_keys.remove(resource["Tags"][0]["Key"])
response_keys.should.have.length_of(2)
resp = rtapi.get_resources(
ResourcesPerPage=2, PaginationToken=resp["PaginationToken"]
)
for resource in resp["ResourceTagMappingList"]:
response_keys.remove(resource["Tags"][0]["Key"])
response_keys.should.have.length_of(0)
@mock_ec2 @mock_ec2
@mock_resourcegroupstaggingapi @mock_resourcegroupstaggingapi
def test_get_resources_ec2(): def test_get_resources_ec2():
@ -233,12 +195,14 @@ def test_get_many_resources():
rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-east-1") rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-east-1")
resp = rtapi.get_resources(ResourceTypeFilters=["elasticloadbalancer:loadbalancer"]) resp = rtapi.get_resources(
ResourceTypeFilters=["elasticloadbalancing:loadbalancer"]
)
resp["ResourceTagMappingList"].should.have.length_of(2) resp["ResourceTagMappingList"].should.have.length_of(2)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain("loadbalancer/") resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain("loadbalancer/")
resp = rtapi.get_resources( resp = rtapi.get_resources(
ResourceTypeFilters=["elasticloadbalancer:loadbalancer"], ResourceTypeFilters=["elasticloadbalancing:loadbalancer"],
TagFilters=[{"Key": "key_name"}], TagFilters=[{"Key": "key_name"}],
) )
@ -247,4 +211,85 @@ def test_get_many_resources():
{"Key": "key_name", "Value": "a_value"} {"Key": "key_name", "Value": "a_value"}
) )
# TODO test pagenation # TODO test pagination
@mock_ec2
@mock_elbv2
@mock_resourcegroupstaggingapi
def test_get_resources_target_group():
ec2 = boto3.resource("ec2", region_name="eu-central-1")
elbv2 = boto3.client("elbv2", region_name="eu-central-1")
vpc = ec2.create_vpc(CidrBlock="172.28.7.0/24", InstanceTenancy="default")
# Create two tagged target groups
for i in range(1, 3):
i_str = str(i)
target_group = elbv2.create_target_group(
Name="test" + i_str,
Protocol="HTTP",
Port=8080,
VpcId=vpc.id,
TargetType="instance",
)["TargetGroups"][0]
elbv2.add_tags(
ResourceArns=[target_group["TargetGroupArn"]],
Tags=[{"Key": "Test", "Value": i_str}],
)
rtapi = boto3.client("resourcegroupstaggingapi", region_name="eu-central-1")
# Basic test
resp = rtapi.get_resources(ResourceTypeFilters=["elasticloadbalancing:targetgroup"])
resp["ResourceTagMappingList"].should.have.length_of(2)
# Test tag filtering
resp = rtapi.get_resources(
ResourceTypeFilters=["elasticloadbalancing:targetgroup"],
TagFilters=[{"Key": "Test", "Values": ["1"]}],
)
resp["ResourceTagMappingList"].should.have.length_of(1)
resp["ResourceTagMappingList"][0]["Tags"].should.contain(
{"Key": "Test", "Value": "1"}
)
@mock_s3
@mock_resourcegroupstaggingapi
def test_get_resources_s3():
# Tests pagination
s3_client = boto3.client("s3", region_name="eu-central-1")
# Will end up having key1,key2,key3,key4
response_keys = set()
# Create 4 buckets
for i in range(1, 5):
i_str = str(i)
s3_client.create_bucket(
Bucket="test_bucket" + i_str,
CreateBucketConfiguration={"LocationConstraint": "eu-central-1"},
)
s3_client.put_bucket_tagging(
Bucket="test_bucket" + i_str,
Tagging={"TagSet": [{"Key": "key" + i_str, "Value": "value" + i_str}]},
)
response_keys.add("key" + i_str)
rtapi = boto3.client("resourcegroupstaggingapi", region_name="eu-central-1")
resp = rtapi.get_resources(ResourcesPerPage=2)
for resource in resp["ResourceTagMappingList"]:
response_keys.remove(resource["Tags"][0]["Key"])
response_keys.should.have.length_of(2)
resp = rtapi.get_resources(
ResourcesPerPage=2, PaginationToken=resp["PaginationToken"]
)
for resource in resp["ResourceTagMappingList"]:
response_keys.remove(resource["Tags"][0]["Key"])
response_keys.should.have.length_of(0)