ELBv2 - improve tagging (#4964)

This commit is contained in:
Bert Blommers 2022-03-23 19:44:12 -01:00 committed by GitHub
parent 5e55daebb8
commit 472b1dab0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 189 additions and 132 deletions

View File

@ -11,6 +11,7 @@ from moto.core.utils import (
BackendDict, BackendDict,
) )
from moto.ec2.models import ec2_backends from moto.ec2.models import ec2_backends
from moto.utilities.tagging_service import TaggingService
from .utils import make_arn_for_target_group from .utils import make_arn_for_target_group
from .utils import make_arn_for_load_balancer from .utils import make_arn_for_load_balancer
from .exceptions import ( from .exceptions import (
@ -81,7 +82,6 @@ class FakeTargetGroup(CloudFormationModel):
matcher=None, matcher=None,
target_type=None, target_type=None,
): ):
# TODO: default values differs when you add Network Load balancer # TODO: default values differs when you add Network Load balancer
self.name = name self.name = name
self.arn = arn self.arn = arn
@ -106,7 +106,6 @@ class FakeTargetGroup(CloudFormationModel):
self.healthy_threshold_count = healthy_threshold_count or 5 self.healthy_threshold_count = healthy_threshold_count or 5
self.unhealthy_threshold_count = unhealthy_threshold_count or 2 self.unhealthy_threshold_count = unhealthy_threshold_count or 2
self.load_balancer_arns = [] self.load_balancer_arns = []
self.tags = {}
if self.healthcheck_protocol != "TCP": if self.healthcheck_protocol != "TCP":
self.matcher = matcher or {"HttpCode": "200"} self.matcher = matcher or {"HttpCode": "200"}
self.healthcheck_path = self.healthcheck_path or "/" self.healthcheck_path = self.healthcheck_path or "/"
@ -142,14 +141,6 @@ class FakeTargetGroup(CloudFormationModel):
if target_id in instance_ids: if target_id in instance_ids:
del self.targets[target_id] del self.targets[target_id]
def add_tag(self, key, value):
if len(self.tags) >= 10 and key not in self.tags:
raise TooManyTagsError()
self.tags[key] = value
def remove_tag(self, key):
self.tags.pop(key, None)
def health_for(self, target, ec2_backend): def health_for(self, target, ec2_backend):
t = self.targets.get(target["id"]) t = self.targets.get(target["id"])
if t is None: if t is None:
@ -245,7 +236,6 @@ class FakeListener(CloudFormationModel):
actions=default_actions, actions=default_actions,
is_default=True, is_default=True,
) )
self.tags = {}
@property @property
def physical_resource_id(self): def physical_resource_id(self):
@ -264,14 +254,6 @@ class FakeListener(CloudFormationModel):
self._non_default_rules[arn] = rule self._non_default_rules[arn] = rule
sorted(self._non_default_rules.values(), key=lambda x: x.priority) sorted(self._non_default_rules.values(), key=lambda x: x.priority)
def add_tag(self, key, value):
if len(self.tags) >= 10 and key not in self.tags:
raise TooManyTagsError()
self.tags[key] = value
def remove_tag(self, key):
self.tags.pop(key, None)
@staticmethod @staticmethod
def cloudformation_name_type(): def cloudformation_name_type():
return None return None
@ -333,7 +315,6 @@ class FakeListenerRule(CloudFormationModel):
self.conditions = conditions self.conditions = conditions
self.actions = actions self.actions = actions
self.priority = priority self.priority = priority
self.tags = {}
@property @property
def physical_resource_id(self): def physical_resource_id(self):
@ -558,18 +539,6 @@ class FakeLoadBalancer(CloudFormationModel):
def physical_resource_id(self): def physical_resource_id(self):
return self.arn return self.arn
def add_tag(self, key, value):
if len(self.tags) >= 10 and key not in self.tags:
raise TooManyTagsError()
self.tags[key] = value
def list_tags(self):
return self.tags
def remove_tag(self, key):
if key in self.tags:
del self.tags[key]
def activate(self): def activate(self):
if self.state == "provisioning": if self.state == "provisioning":
self.state = "active" self.state = "active"
@ -651,6 +620,7 @@ class ELBv2Backend(BaseBackend):
self.region_name = region_name self.region_name = region_name
self.target_groups = OrderedDict() self.target_groups = OrderedDict()
self.load_balancers = OrderedDict() self.load_balancers = OrderedDict()
self.tagging_service = TaggingService()
@staticmethod @staticmethod
def default_vpc_endpoint_service(service_region, zones): def default_vpc_endpoint_service(service_region, zones):
@ -682,6 +652,7 @@ class ELBv2Backend(BaseBackend):
subnet_mappings=None, subnet_mappings=None,
scheme="internet-facing", scheme="internet-facing",
loadbalancer_type=None, loadbalancer_type=None,
tags=None,
): ):
vpc_id = None vpc_id = None
subnets = [] subnets = []
@ -722,6 +693,7 @@ class ELBv2Backend(BaseBackend):
loadbalancer_type=loadbalancer_type, loadbalancer_type=loadbalancer_type,
) )
self.load_balancers[arn] = new_load_balancer self.load_balancers[arn] = new_load_balancer
self.tagging_service.tag_resource(arn, tags)
return new_load_balancer return new_load_balancer
def convert_and_validate_action_properties(self, properties): def convert_and_validate_action_properties(self, properties):
@ -736,7 +708,7 @@ class ELBv2Backend(BaseBackend):
raise InvalidActionTypeError(action_type, i + 1) raise InvalidActionTypeError(action_type, i + 1)
return default_actions return default_actions
def create_rule(self, listener_arn, conditions, priority, actions): def create_rule(self, listener_arn, conditions, priority, actions, tags=None):
actions = [FakeAction(action) for action in actions] actions = [FakeAction(action) for action in actions]
listeners = self.describe_listeners(None, [listener_arn]) listeners = self.describe_listeners(None, [listener_arn])
if not listeners: if not listeners:
@ -768,6 +740,7 @@ class ELBv2Backend(BaseBackend):
# create rule # create rule
rule = FakeListenerRule(listener.arn, arn, conditions, priority, actions) rule = FakeListenerRule(listener.arn, arn, conditions, priority, actions)
listener.register(arn, rule) listener.register(arn, rule)
self.tagging_service.tag_resource(arn, tags)
return rule return rule
def _validate_conditions(self, conditions): def _validate_conditions(self, conditions):
@ -1076,6 +1049,7 @@ Member must satisfy regular expression pattern: {}".format(
certificate, certificate,
default_actions, default_actions,
alpn_policy=None, alpn_policy=None,
tags=None,
): ):
default_actions = [FakeAction(action) for action in default_actions] default_actions = [FakeAction(action) for action in default_actions]
balancer = self.load_balancers.get(load_balancer_arn) balancer = self.load_balancers.get(load_balancer_arn)
@ -1108,6 +1082,8 @@ Member must satisfy regular expression pattern: {}".format(
target_group = self.target_groups[arn] target_group = self.target_groups[arn]
target_group.load_balancer_arns.append(load_balancer_arn) target_group.load_balancer_arns.append(load_balancer_arn)
self.tagging_service.tag_resource(listener.arn, tags)
return listener return listener
def describe_load_balancers(self, arns, names): def describe_load_balancers(self, arns, names):
@ -1599,5 +1575,58 @@ Member must satisfy regular expression pattern: {}".format(
cert_arns = [c["certificate_arn"] for c in certificates] cert_arns = [c["certificate_arn"] for c in certificates]
listener.certificates = [c for c in listener.certificates if c not in cert_arns] listener.certificates = [c for c in listener.certificates if c not in cert_arns]
def add_tags(self, resource_arns, tags):
tag_dict = self.tagging_service.flatten_tag_list(tags)
for arn in resource_arns:
existing = self.tagging_service.get_tag_dict_for_resource(arn)
for key in tag_dict:
if len(existing) >= 10 and key not in existing:
raise TooManyTagsError()
self._get_resource_by_arn(arn)
self.tagging_service.tag_resource(arn, tags)
def remove_tags(self, resource_arns, tag_keys):
for arn in resource_arns:
self.tagging_service.untag_resource_using_names(arn, tag_keys)
def describe_tags(self, resource_arns):
return {
arn: self.tagging_service.get_tag_dict_for_resource(arn)
for arn in resource_arns
}
def _get_resource_by_arn(self, arn):
if ":targetgroup" in arn:
resource = self.target_groups.get(arn)
if not resource:
raise TargetGroupNotFoundError()
elif ":loadbalancer" in arn:
resource = self.load_balancers.get(arn)
if not resource:
raise LoadBalancerNotFoundError()
elif ":listener-rule" in arn:
lb_arn = arn.replace(":listener-rule", ":loadbalancer").rsplit("/", 2)[0]
balancer = self.load_balancers.get(lb_arn)
if not balancer:
raise LoadBalancerNotFoundError()
listener_arn = arn.replace(":listener-rule", ":listener").rsplit("/", 1)[0]
listener = balancer.listeners.get(listener_arn)
if not listener:
raise ListenerNotFoundError()
resource = listener.rules.get(arn)
if not resource:
raise RuleNotFoundError()
elif ":listener" in arn:
lb_arn, _, _ = arn.replace(":listener", ":loadbalancer").rpartition("/")
balancer = self.load_balancers.get(lb_arn)
if not balancer:
raise LoadBalancerNotFoundError()
resource = balancer.listeners.get(arn)
if not resource:
raise ListenerNotFoundError()
else:
raise LoadBalancerNotFoundError()
return resource
elbv2_backends = BackendDict(ELBv2Backend, "ec2") elbv2_backends = BackendDict(ELBv2Backend, "ec2")

View File

@ -2,10 +2,7 @@ from moto.core.exceptions import RESTError
from moto.core.utils import amzn_request_id from moto.core.utils import amzn_request_id
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from .models import elbv2_backends from .models import elbv2_backends
from .exceptions import DuplicateTagKeysError, RuleNotFoundError
from .exceptions import LoadBalancerNotFoundError
from .exceptions import TargetGroupNotFoundError from .exceptions import TargetGroupNotFoundError
from .exceptions import ListenerNotFoundError
from .exceptions import ListenerOrBalancerMissingError from .exceptions import ListenerOrBalancerMissingError
SSL_POLICIES = [ SSL_POLICIES = [
@ -144,12 +141,14 @@ class ELBV2Response(BaseResponse):
@amzn_request_id @amzn_request_id
def create_load_balancer(self): def create_load_balancer(self):
load_balancer_name = self._get_param("Name") params = self._get_params()
load_balancer_name = params.get("Name")
subnet_ids = self._get_multi_param("Subnets.member") subnet_ids = self._get_multi_param("Subnets.member")
subnet_mappings = self._get_params().get("SubnetMappings", []) subnet_mappings = params.get("SubnetMappings", [])
security_groups = self._get_multi_param("SecurityGroups.member") security_groups = self._get_multi_param("SecurityGroups.member")
scheme = self._get_param("Scheme") scheme = params.get("Scheme")
loadbalancer_type = self._get_param("Type") loadbalancer_type = params.get("Type")
tags = params.get("Tags")
load_balancer = self.elbv2_backend.create_load_balancer( load_balancer = self.elbv2_backend.create_load_balancer(
name=load_balancer_name, name=load_balancer_name,
@ -158,8 +157,8 @@ class ELBV2Response(BaseResponse):
subnet_mappings=subnet_mappings, subnet_mappings=subnet_mappings,
scheme=scheme, scheme=scheme,
loadbalancer_type=loadbalancer_type, loadbalancer_type=loadbalancer_type,
tags=tags,
) )
self._add_tags(load_balancer)
template = self.response_template(CREATE_LOAD_BALANCER_TEMPLATE) template = self.response_template(CREATE_LOAD_BALANCER_TEMPLATE)
return template.render(load_balancer=load_balancer) return template.render(load_balancer=load_balancer)
@ -171,6 +170,7 @@ class ELBV2Response(BaseResponse):
conditions=params["Conditions"], conditions=params["Conditions"],
priority=params["Priority"], priority=params["Priority"],
actions=params["Actions"], actions=params["Actions"],
tags=params.get("Tags"),
) )
template = self.response_template(CREATE_RULE_TEMPLATE) template = self.response_template(CREATE_RULE_TEMPLATE)
return template.render(rules=rules) return template.render(rules=rules)
@ -228,6 +228,7 @@ class ELBV2Response(BaseResponse):
certificate = None certificate = None
default_actions = params.get("DefaultActions", []) default_actions = params.get("DefaultActions", [])
alpn_policy = params.get("AlpnPolicy", []) alpn_policy = params.get("AlpnPolicy", [])
tags = params.get("Tags")
listener = self.elbv2_backend.create_listener( listener = self.elbv2_backend.create_listener(
load_balancer_arn=load_balancer_arn, load_balancer_arn=load_balancer_arn,
@ -237,8 +238,8 @@ class ELBV2Response(BaseResponse):
certificate=certificate, certificate=certificate,
default_actions=default_actions, default_actions=default_actions,
alpn_policy=alpn_policy, alpn_policy=alpn_policy,
tags=tags,
) )
self._add_tags(listener)
template = self.response_template(CREATE_LISTENER_TEMPLATE) template = self.response_template(CREATE_LISTENER_TEMPLATE)
return template.render(listener=listener) return template.render(listener=listener)
@ -421,10 +422,10 @@ class ELBV2Response(BaseResponse):
@amzn_request_id @amzn_request_id
def add_tags(self): def add_tags(self):
resource_arns = self._get_multi_param("ResourceArns.member") resource_arns = self._get_multi_param("ResourceArns.member")
tags = self._get_params().get("Tags")
tags = self._get_params().get("Tags")
for arn in resource_arns: self.elbv2_backend.add_tags(resource_arns, tags)
resource = self._get_resource_by_arn(arn)
self._add_tags(resource)
template = self.response_template(ADD_TAGS_TEMPLATE) template = self.response_template(ADD_TAGS_TEMPLATE)
return template.render() return template.render()
@ -434,9 +435,7 @@ class ELBV2Response(BaseResponse):
resource_arns = self._get_multi_param("ResourceArns.member") resource_arns = self._get_multi_param("ResourceArns.member")
tag_keys = self._get_multi_param("TagKeys.member") tag_keys = self._get_multi_param("TagKeys.member")
for arn in resource_arns: self.elbv2_backend.remove_tags(resource_arns, tag_keys)
resource = self._get_resource_by_arn(arn)
[resource.remove_tag(key) for key in tag_keys]
template = self.response_template(REMOVE_TAGS_TEMPLATE) template = self.response_template(REMOVE_TAGS_TEMPLATE)
return template.render() return template.render()
@ -444,46 +443,10 @@ class ELBV2Response(BaseResponse):
@amzn_request_id @amzn_request_id
def describe_tags(self): def describe_tags(self):
resource_arns = self._get_multi_param("ResourceArns.member") resource_arns = self._get_multi_param("ResourceArns.member")
resources = [] resource_tags = self.elbv2_backend.describe_tags(resource_arns)
for arn in resource_arns:
resource = self._get_resource_by_arn(arn)
resources.append(resource)
template = self.response_template(DESCRIBE_TAGS_TEMPLATE) template = self.response_template(DESCRIBE_TAGS_TEMPLATE)
return template.render(resources=resources) return template.render(resource_tags=resource_tags)
def _get_resource_by_arn(self, arn):
if ":targetgroup" in arn:
resource = self.elbv2_backend.target_groups.get(arn)
if not resource:
raise TargetGroupNotFoundError()
elif ":loadbalancer" in arn:
resource = self.elbv2_backend.load_balancers.get(arn)
if not resource:
raise LoadBalancerNotFoundError()
elif ":listener-rule" in arn:
lb_arn = arn.replace(":listener-rule", ":loadbalancer").rsplit("/", 2)[0]
balancer = self.elbv2_backend.load_balancers.get(lb_arn)
if not balancer:
raise LoadBalancerNotFoundError()
listener_arn = arn.replace(":listener-rule", ":listener").rsplit("/", 1)[0]
listener = balancer.listeners.get(listener_arn)
if not listener:
raise ListenerNotFoundError()
resource = listener.rules.get(arn)
if not resource:
raise RuleNotFoundError()
elif ":listener" in arn:
lb_arn, _, _ = arn.replace(":listener", ":loadbalancer").rpartition("/")
balancer = self.elbv2_backend.load_balancers.get(lb_arn)
if not balancer:
raise LoadBalancerNotFoundError()
resource = balancer.listeners.get(arn)
if not resource:
raise ListenerNotFoundError()
else:
raise LoadBalancerNotFoundError()
return resource
@amzn_request_id @amzn_request_id
def describe_account_limits(self): def describe_account_limits(self):
@ -654,30 +617,6 @@ class ELBV2Response(BaseResponse):
template = self.response_template(REMOVE_LISTENER_CERTIFICATES_TEMPLATE) template = self.response_template(REMOVE_LISTENER_CERTIFICATES_TEMPLATE)
return template.render(certificates=certificates) return template.render(certificates=certificates)
def _add_tags(self, resource):
tag_values = []
tag_keys = []
for t_key, t_val in sorted(self.querystring.items()):
if t_key.startswith("Tags.member."):
if t_key.split(".")[3] == "Key":
tag_keys.extend(t_val)
elif t_key.split(".")[3] == "Value":
tag_values.extend(t_val)
counts = {}
for i in tag_keys:
counts[i] = tag_keys.count(i)
counts = sorted(counts.items(), key=lambda i: i[1], reverse=True)
if counts and counts[0][1] > 1:
# We have dupes...
raise DuplicateTagKeysError(counts[0])
for tag_key, tag_value in zip(tag_keys, tag_values):
resource.add_tag(tag_key, tag_value)
ADD_TAGS_TEMPLATE = """<AddTagsResponse xmlns="http://elasticloadbalancing.amazonaws.com/doc/2015-12-01/"> ADD_TAGS_TEMPLATE = """<AddTagsResponse xmlns="http://elasticloadbalancing.amazonaws.com/doc/2015-12-01/">
<AddTagsResult/> <AddTagsResult/>
@ -696,11 +635,11 @@ REMOVE_TAGS_TEMPLATE = """<RemoveTagsResponse xmlns="http://elasticloadbalancing
DESCRIBE_TAGS_TEMPLATE = """<DescribeTagsResponse xmlns="http://elasticloadbalancing.amazonaws.com/doc/2015-12-01/"> DESCRIBE_TAGS_TEMPLATE = """<DescribeTagsResponse xmlns="http://elasticloadbalancing.amazonaws.com/doc/2015-12-01/">
<DescribeTagsResult> <DescribeTagsResult>
<TagDescriptions> <TagDescriptions>
{% for resource in resources %} {% for resource, tags in resource_tags.items() %}
<member> <member>
<ResourceArn>{{ resource.arn }}</ResourceArn> <ResourceArn>{{ resource.arn }}</ResourceArn>
<Tags> <Tags>
{% for key, value in resource.tags.items() %} {% for key, value in tags.items() %}
<member> <member>
<Value>{{ value }}</Value> <Value>{{ value }}</Value>
<Key>{{ key }}</Key> <Key>{{ key }}</Key>

View File

@ -297,38 +297,30 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
# TODO add these to the keys and values functions / combine functions # TODO add these to the keys and values functions / combine functions
# ELB, resource type elasticloadbalancing:loadbalancer # ELB, resource type elasticloadbalancing:loadbalancer
def get_elbv2_tags(arn):
result = []
for key, value in self.elbv2_backend.load_balancers[arn].tags.items():
result.append({"Key": key, "Value": value})
return result
if ( if (
not resource_type_filters not resource_type_filters
or "elasticloadbalancing" in resource_type_filters or "elasticloadbalancing" in resource_type_filters
or "elasticloadbalancing:loadbalancer" in resource_type_filters or "elasticloadbalancing:loadbalancer" in resource_type_filters
): ):
for elb in self.elbv2_backend.load_balancers.values(): for elb in self.elbv2_backend.load_balancers.values():
tags = get_elbv2_tags(elb.arn) tags = self.elbv2_backend.tagging_service.list_tags_for_resource(
elb.arn
)["Tags"]
if not tag_filter(tags): # Skip if no tags, or invalid filter if not tag_filter(tags): # Skip if no tags, or invalid filter
continue continue
yield {"ResourceARN": "{0}".format(elb.arn), "Tags": tags} yield {"ResourceARN": "{0}".format(elb.arn), "Tags": tags}
# ELB Target Group, resource type elasticloadbalancing:targetgroup # ELB Target Group, resource type elasticloadbalancing:targetgroup
def get_target_group_tags(arn):
result = []
for key, value in self.elbv2_backend.target_groups[arn].tags.items():
result.append({"Key": key, "Value": value})
return result
if ( if (
not resource_type_filters not resource_type_filters
or "elasticloadbalancing" in resource_type_filters or "elasticloadbalancing" in resource_type_filters
or "elasticloadbalancing:targetgroup" in resource_type_filters or "elasticloadbalancing:targetgroup" in resource_type_filters
): ):
for target_group in self.elbv2_backend.target_groups.values(): for target_group in self.elbv2_backend.target_groups.values():
tags = get_target_group_tags(target_group.arn) tags = self.elbv2_backend.tagging_service.list_tags_for_resource(
target_group.arn
)["Tags"]
if not tag_filter(tags): # Skip if no tags, or invalid filter if not tag_filter(tags): # Skip if no tags, or invalid filter
continue continue

View File

@ -45,6 +45,8 @@ class TaggingService:
Note: the storage is internal to this class instance. Note: the storage is internal to this class instance.
""" """
if not tags:
return
if arn not in self.tags: if arn not in self.tags:
self.tags[arn] = {} self.tags[arn] = {}
for tag in tags: for tag in tags:

View File

@ -1,7 +1,6 @@
import copy import copy
import os import os
import boto3 import boto3
import botocore
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
import pytest import pytest
import sure # noqa # pylint: disable=unused-import import sure # noqa # pylint: disable=unused-import
@ -178,9 +177,12 @@ def test_add_remove_tags():
], ],
) )
conn.add_tags.when.called_with( with pytest.raises(ClientError) as exc:
conn.add_tags(
ResourceArns=[lb.get("LoadBalancerArn")], Tags=[{"Key": "k", "Value": "b"}] ResourceArns=[lb.get("LoadBalancerArn")], Tags=[{"Key": "k", "Value": "b"}]
).should.throw(botocore.exceptions.ClientError) )
err = exc.value.response["Error"]
err["Code"].should.equal("TooManyTagsError")
conn.add_tags( conn.add_tags(
ResourceArns=[lb.get("LoadBalancerArn")], Tags=[{"Key": "j", "Value": "c"}] ResourceArns=[lb.get("LoadBalancerArn")], Tags=[{"Key": "j", "Value": "c"}]
@ -331,7 +333,7 @@ def test_create_rule_forward_config_as_second_arg():
elbv2.create_rule( elbv2.create_rule(
ListenerArn=http_listener_arn, ListenerArn=http_listener_arn,
Conditions=[ Conditions=[
{"Field": "path-pattern", "PathPatternConfig": {"Values": [f"/sth*"]}} {"Field": "path-pattern", "PathPatternConfig": {"Values": ["/sth*"]}}
], ],
Priority=priority, Priority=priority,
Actions=[ Actions=[
@ -699,7 +701,7 @@ def test_modify_rule_conditions():
"StatusCode": "HTTP_301", "StatusCode": "HTTP_301",
}, },
} }
condition = {"Field": "path-pattern", "PathPatternConfig": {"Values": [f"/sth*"]}} condition = {"Field": "path-pattern", "PathPatternConfig": {"Values": ["/sth*"]}}
response = elbv2.create_listener( response = elbv2.create_listener(
LoadBalancerArn=load_balancer_arn, LoadBalancerArn=load_balancer_arn,

View File

@ -0,0 +1,93 @@
import sure # noqa # pylint: disable=unused-import
from moto import mock_elbv2, mock_ec2
from .test_elbv2 import create_load_balancer
create_condition = {"Field": "host-header", "Values": ["example.com"]}
default_action = {
"FixedResponseConfig": {"StatusCode": "200", "ContentType": "text/plain"},
"Type": "fixed-response",
}
@mock_elbv2
@mock_ec2
def test_create_listener_rule_with_tags():
response, _, _, _, _, elbv2 = create_load_balancer()
load_balancer_arn = response.get("LoadBalancers")[0].get("LoadBalancerArn")
listener_arn = elbv2.create_listener(
LoadBalancerArn=load_balancer_arn,
Protocol="HTTP",
Port=80,
DefaultActions=[],
)["Listeners"][0]["ListenerArn"]
rule_arn = elbv2.create_rule(
ListenerArn=listener_arn,
Priority=100,
Conditions=[create_condition],
Actions=[default_action],
Tags=[{"Key": "k1", "Value": "v1"}],
)["Rules"][0]["RuleArn"]
# Ensure the tags persisted
response = elbv2.describe_tags(ResourceArns=[rule_arn])
tags = {d["Key"]: d["Value"] for d in response["TagDescriptions"][0]["Tags"]}
tags.should.equal({"k1": "v1"})
@mock_elbv2
@mock_ec2
def test_listener_rule_add_remove_tags():
response, _, _, _, _, elbv2 = create_load_balancer()
load_balancer_arn = response.get("LoadBalancers")[0].get("LoadBalancerArn")
listener_arn = elbv2.create_listener(
LoadBalancerArn=load_balancer_arn,
Protocol="HTTP",
Port=80,
DefaultActions=[],
Tags=[{"Key": "k1", "Value": "v1"}],
)["Listeners"][0]["ListenerArn"]
rule_arn = elbv2.create_rule(
ListenerArn=listener_arn,
Priority=100,
Conditions=[create_condition],
Actions=[default_action],
)["Rules"][0]["RuleArn"]
elbv2.add_tags(ResourceArns=[rule_arn], Tags=[{"Key": "a", "Value": "b"}])
tags = elbv2.describe_tags(ResourceArns=[rule_arn])["TagDescriptions"][0]["Tags"]
tags.should.equal([{"Key": "a", "Value": "b"}])
elbv2.add_tags(
ResourceArns=[rule_arn],
Tags=[
{"Key": "a", "Value": "b"},
{"Key": "b", "Value": "b"},
{"Key": "c", "Value": "b"},
],
)
tags = elbv2.describe_tags(ResourceArns=[rule_arn])["TagDescriptions"][0]["Tags"]
tags.should.equal(
[
{"Key": "a", "Value": "b"},
{"Key": "b", "Value": "b"},
{"Key": "c", "Value": "b"},
]
)
elbv2.remove_tags(ResourceArns=[rule_arn], TagKeys=["a"])
tags = elbv2.describe_tags(ResourceArns=[rule_arn])["TagDescriptions"][0]["Tags"]
tags.should.equal(
[
{"Key": "b", "Value": "b"},
{"Key": "c", "Value": "b"},
]
)