add elasticloadbalancer:loadbalancer to resourcegroupstaggingapi.get_resources

This commit is contained in:
Chris Keogh 2018-01-25 14:49:59 +13:00
parent 3e27990613
commit 53184208fd
2 changed files with 88 additions and 10 deletions

View File

@ -119,15 +119,17 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
def tag_filter(tag_list):
result = []
if tag_filters:
for tag in tag_list:
temp_result = []
for f in filters:
f_result = f(tag['Key'], tag['Value'])
temp_result.append(f_result)
result.append(all(temp_result))
for tag in tag_list:
temp_result = []
for f in filters:
f_result = f(tag['Key'], tag['Value'])
temp_result.append(f_result)
result.append(all(temp_result))
return any(result)
return any(result)
else:
return True
# Do S3, resource type s3
if not resource_type_filters or 's3' in resource_type_filters:
@ -210,6 +212,23 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
# TODO add these to the keys and values functions / combine functions
# ELB
def get_elbv2_tags(arn):
result = []
for key, value in self.elbv2_backend.load_balancers[elb.arn].tags.items():
result.append({'Key': key, 'Value': value})
return result
if not resource_type_filters or 'elasticloadbalancer' in resource_type_filters or 'elasticloadbalancer:loadbalancer' in resource_type_filters:
for elb in self.elbv2_backend.load_balancers.values():
tags = get_elbv2_tags(elb.arn)
# if 'elasticloadbalancer:loadbalancer' in resource_type_filters:
# from IPython import embed
# embed()
if not tag_filter(tags): # Skip if no tags, or invalid filter
continue
yield {'ResourceARN': '{0}'.format(elb.arn), 'Tags': tags}
# EMR Cluster
# Glacier Vault

View File

@ -2,7 +2,7 @@ from __future__ import unicode_literals
import boto3
import sure # noqa
from moto import mock_resourcegroupstaggingapi, mock_s3, mock_ec2
from moto import mock_resourcegroupstaggingapi, mock_s3, mock_ec2, mock_elbv2
@mock_s3
@ -223,4 +223,63 @@ def test_get_tag_values_ec2():
resp['TagValues'].should.contain('MY_VALUE1')
resp['TagValues'].should.contain('MY_VALUE4')
# TODO test pagenation
@mock_ec2
@mock_elbv2
@mock_resourcegroupstaggingapi
def test_get_resources_elbv2():
conn = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
conn.create_load_balancer(
Name='my-lb',
Subnets=[subnet1.id, subnet2.id],
SecurityGroups=[security_group.id],
Scheme='internal',
Tags=[
{
'Key': 'key_name',
'Value': 'a_value'
},
{
'Key': 'key_2',
'Value': 'val2'
}
]
)
conn.create_load_balancer(
Name='my-other-lb',
Subnets=[subnet1.id, subnet2.id],
SecurityGroups=[security_group.id],
Scheme='internal',
)
rtapi = boto3.client('resourcegroupstaggingapi', region_name='us-east-1')
resp = rtapi.get_resources(ResourceTypeFilters=['elasticloadbalancer:loadbalancer'])
resp['ResourceTagMappingList'].should.have.length_of(2)
resp['ResourceTagMappingList'][0]['ResourceARN'].should.contain('loadbalancer/')
resp = rtapi.get_resources(
ResourceTypeFilters=['elasticloadbalancer:loadbalancer'],
TagFilters=[{
'Key': 'key_name'
}]
)
resp['ResourceTagMappingList'].should.have.length_of(1)
resp['ResourceTagMappingList'][0]['Tags'].should.contain({'Key': 'key_name', 'Value': 'a_value'})
# TODO test pagenation