diff --git a/moto/elbv2/exceptions.py b/moto/elbv2/exceptions.py
index 5ac47961a..22ae9fe8c 100644
--- a/moto/elbv2/exceptions.py
+++ b/moto/elbv2/exceptions.py
@@ -1,46 +1,47 @@
+from typing import Any, Optional
from moto.core.exceptions import RESTError
class ELBClientError(RESTError):
code = 400
- def __init__(self, error_type, message):
+ def __init__(self, error_type: str, message: str):
super().__init__(error_type, message, template="wrapped_single_error")
class DuplicateTagKeysError(ELBClientError):
- def __init__(self, cidr):
+ def __init__(self, cidr: Any):
super().__init__(
"DuplicateTagKeys", f"Tag key was specified more than once: {cidr}"
)
class LoadBalancerNotFoundError(ELBClientError):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__(
"LoadBalancerNotFound", "The specified load balancer does not exist."
)
class ListenerNotFoundError(ELBClientError):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__("ListenerNotFound", "The specified listener does not exist.")
class SubnetNotFoundError(ELBClientError):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__("SubnetNotFound", "The specified subnet does not exist.")
class TargetGroupNotFoundError(ELBClientError):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__(
"TargetGroupNotFound", "The specified target group does not exist."
)
class TooManyTagsError(ELBClientError):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__(
"TooManyTagsError",
"The quota for the number of tags that can be assigned to a load balancer has been reached",
@@ -48,7 +49,7 @@ class TooManyTagsError(ELBClientError):
class BadHealthCheckDefinition(ELBClientError):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__(
"ValidationError",
"HealthCheck Target must begin with one of HTTP, TCP, HTTPS, SSL",
@@ -56,14 +57,14 @@ class BadHealthCheckDefinition(ELBClientError):
class DuplicateListenerError(ELBClientError):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__(
"DuplicateListener", "A listener with the specified port already exists."
)
class DuplicateLoadBalancerName(ELBClientError):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__(
"DuplicateLoadBalancerName",
"A load balancer with the specified name already exists.",
@@ -71,7 +72,7 @@ class DuplicateLoadBalancerName(ELBClientError):
class DuplicateTargetGroupName(ELBClientError):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__(
"DuplicateTargetGroupName",
"A target group with the specified name already exists.",
@@ -79,7 +80,7 @@ class DuplicateTargetGroupName(ELBClientError):
class InvalidTargetError(ELBClientError):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__(
"InvalidTarget",
"The specified target does not exist or is not in the same VPC as the target group.",
@@ -87,12 +88,12 @@ class InvalidTargetError(ELBClientError):
class EmptyListenersError(ELBClientError):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__("ValidationError", "Listeners cannot be empty")
class PriorityInUseError(ELBClientError):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__("PriorityInUse", "The specified priority is in use.")
@@ -106,7 +107,7 @@ class InvalidConditionFieldError(ELBClientError):
"source-ip",
]
- def __init__(self, invalid_name):
+ def __init__(self, invalid_name: str):
valid = ",".join(self.VALID_FIELDS)
super().__init__(
"ValidationError",
@@ -115,12 +116,12 @@ class InvalidConditionFieldError(ELBClientError):
class InvalidConditionValueError(ELBClientError):
- def __init__(self, msg):
+ def __init__(self, msg: str):
super().__init__("ValidationError", msg)
class InvalidActionTypeError(ELBClientError):
- def __init__(self, invalid_name, index):
+ def __init__(self, invalid_name: Any, index: int):
super().__init__(
"ValidationError",
f"1 validation error detected: Value '{invalid_name}' at 'actions.{index}.member.type' failed to satisfy constraint: Member must satisfy enum value set: [forward, redirect, fixed-response]",
@@ -128,12 +129,12 @@ class InvalidActionTypeError(ELBClientError):
class ActionTargetGroupNotFoundError(ELBClientError):
- def __init__(self, arn):
+ def __init__(self, arn: str):
super().__init__("TargetGroupNotFound", f"Target group '{arn}' not found")
class ListenerOrBalancerMissingError(ELBClientError):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__(
"ValidationError",
"You must specify either listener ARNs or a load balancer ARN",
@@ -141,45 +142,45 @@ class ListenerOrBalancerMissingError(ELBClientError):
class InvalidDescribeRulesRequest(ELBClientError):
- def __init__(self, msg):
+ def __init__(self, msg: str):
super().__init__("ValidationError", msg)
class ResourceInUseError(ELBClientError):
- def __init__(self, msg="A specified resource is in use"):
+ def __init__(self, msg: str = "A specified resource is in use"):
super().__init__("ResourceInUse", msg)
class RuleNotFoundError(ELBClientError):
- def __init__(self, msg=None):
+ def __init__(self, msg: Optional[str] = None):
msg = msg or "The specified rule does not exist."
super().__init__("RuleNotFound", msg)
class DuplicatePriorityError(ELBClientError):
- def __init__(self, invalid_value):
+ def __init__(self, invalid_value: str):
super().__init__(
"ValidationError", f"Priority '{invalid_value}' was provided multiple times"
)
class InvalidTargetGroupNameError(ELBClientError):
- def __init__(self, msg):
+ def __init__(self, msg: str):
super().__init__("ValidationError", msg)
class InvalidModifyRuleArgumentsError(ELBClientError):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__(
"ValidationError", "Either conditions or actions must be specified"
)
class InvalidStatusCodeActionTypeError(ELBClientError):
- def __init__(self, msg):
+ def __init__(self, msg: str):
super().__init__("ValidationError", msg)
class InvalidLoadBalancerActionException(ELBClientError):
- def __init__(self, msg):
+ def __init__(self, msg: str):
super().__init__("InvalidLoadBalancerAction", msg)
diff --git a/moto/elbv2/models.py b/moto/elbv2/models.py
index e7902eead..f8ca7b776 100644
--- a/moto/elbv2/models.py
+++ b/moto/elbv2/models.py
@@ -8,6 +8,7 @@ from moto.core.exceptions import RESTError
from moto.core import BaseBackend, BackendDict, BaseModel, CloudFormationModel
from moto.core.utils import iso_8601_datetime_with_milliseconds
from moto.ec2.models import ec2_backends
+from moto.ec2.models.subnets import Subnet
from moto.moto_api._internal import mock_random
from moto.utilities.tagging_service import TaggingService
from .utils import make_arn_for_target_group
@@ -48,7 +49,13 @@ ALLOWED_ACTIONS = [
class FakeHealthStatus(BaseModel):
def __init__(
- self, instance_id, port, health_port, status, reason=None, description=None
+ self,
+ instance_id: str,
+ port: str,
+ health_port: Optional[str],
+ status: str,
+ reason: Optional[str] = None,
+ description: Optional[str] = None,
):
self.instance_id = instance_id
self.port = port
@@ -63,22 +70,22 @@ class FakeTargetGroup(CloudFormationModel):
def __init__(
self,
- name,
- arn,
- vpc_id,
- protocol,
- port,
- protocol_version=None,
- healthcheck_protocol=None,
- healthcheck_port=None,
- healthcheck_path=None,
- healthcheck_interval_seconds=None,
- healthcheck_timeout_seconds=None,
- healthcheck_enabled=None,
- healthy_threshold_count=None,
- unhealthy_threshold_count=None,
- matcher=None,
- target_type=None,
+ name: str,
+ arn: str,
+ vpc_id: str,
+ protocol: str,
+ port: str,
+ protocol_version: Optional[str] = None,
+ healthcheck_protocol: Optional[str] = None,
+ healthcheck_port: Optional[str] = None,
+ healthcheck_path: Optional[str] = None,
+ healthcheck_interval_seconds: Optional[str] = None,
+ healthcheck_timeout_seconds: Optional[int] = None,
+ healthcheck_enabled: Optional[str] = None,
+ healthy_threshold_count: Optional[str] = None,
+ unhealthy_threshold_count: Optional[str] = None,
+ matcher: Optional[Dict[str, Any]] = None,
+ target_type: Optional[str] = None,
):
# TODO: default values differs when you add Network Load balancer
self.name = name
@@ -103,9 +110,9 @@ class FakeTargetGroup(CloudFormationModel):
self.healthcheck_enabled = healthcheck_enabled
self.healthy_threshold_count = healthy_threshold_count or 5
self.unhealthy_threshold_count = unhealthy_threshold_count or 2
- self.load_balancer_arns = []
+ self.load_balancer_arns: List[str] = []
if self.healthcheck_protocol != "TCP":
- self.matcher = matcher or {"HttpCode": "200"}
+ self.matcher: Dict[str, Any] = matcher or {"HttpCode": "200"}
self.healthcheck_path = self.healthcheck_path or "/"
self.healthcheck_port = self.healthcheck_port or str(self.port)
self.target_type = target_type
@@ -118,31 +125,31 @@ class FakeTargetGroup(CloudFormationModel):
"waf.fail_open.enabled": "false",
}
- self.targets = OrderedDict()
+ self.targets: Dict[str, Dict[str, Any]] = OrderedDict()
@property
- def physical_resource_id(self):
+ def physical_resource_id(self) -> str:
return self.arn
- def register(self, targets):
+ def register(self, targets: List[Dict[str, Any]]) -> None:
for target in targets:
self.targets[target["id"]] = {
"id": target["id"],
"port": target.get("port", self.port),
}
- def deregister(self, targets):
+ def deregister(self, targets: List[Dict[str, Any]]) -> None:
for target in targets:
t = self.targets.pop(target["id"], None)
if not t:
raise InvalidTargetError()
- def deregister_terminated_instances(self, instance_ids):
+ def deregister_terminated_instances(self, instance_ids: List[str]) -> None:
for target_id in list(self.targets.keys()):
if target_id in instance_ids:
del self.targets[target_id]
- def health_for(self, target, ec2_backend):
+ def health_for(self, target: Dict[str, Any], ec2_backend: Any) -> FakeHealthStatus:
t = self.targets.get(target["id"])
if t is None:
raise InvalidTargetError()
@@ -160,18 +167,23 @@ class FakeTargetGroup(CloudFormationModel):
return FakeHealthStatus(t["id"], t["port"], self.healthcheck_port, "healthy")
@staticmethod
- def cloudformation_name_type():
+ def cloudformation_name_type() -> str:
return "Name"
@staticmethod
- def cloudformation_type():
+ def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-elasticloadbalancingv2-targetgroup.html
return "AWS::ElasticLoadBalancingV2::TargetGroup"
@classmethod
- def create_from_cloudformation_json(
- cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
- ):
+ def create_from_cloudformation_json( # type: ignore[misc]
+ cls,
+ resource_name: str,
+ cloudformation_json: Any,
+ account_id: str,
+ region_name: str,
+ **kwargs: Any,
+ ) -> "FakeTargetGroup":
properties = cloudformation_json["Properties"]
elbv2_backend = elbv2_backends[account_id][region_name]
@@ -210,14 +222,14 @@ class FakeTargetGroup(CloudFormationModel):
class FakeListener(CloudFormationModel):
def __init__(
self,
- load_balancer_arn,
- arn,
- protocol,
- port,
- ssl_policy,
- certificate,
- default_actions,
- alpn_policy,
+ load_balancer_arn: str,
+ arn: str,
+ protocol: str,
+ port: str,
+ ssl_policy: str,
+ certificate: Optional[str],
+ default_actions: List["FakeAction"],
+ alpn_policy: Optional[List[str]],
):
self.load_balancer_arn = load_balancer_arn
self.arn = arn
@@ -228,8 +240,8 @@ class FakeListener(CloudFormationModel):
self.certificates = [certificate] if certificate is not None else []
self.default_actions = default_actions
self.alpn_policy = alpn_policy or []
- self._non_default_rules = OrderedDict()
- self._default_rule = OrderedDict()
+ self._non_default_rules: Dict[str, FakeListenerRule] = OrderedDict()
+ self._default_rule: Dict[int, FakeRule] = OrderedDict()
self._default_rule[0] = FakeRule(
listener_arn=self.arn,
conditions=[],
@@ -239,35 +251,40 @@ class FakeListener(CloudFormationModel):
)
@property
- def physical_resource_id(self):
+ def physical_resource_id(self) -> str:
return self.arn
@property
- def rules(self):
+ def rules(self) -> Dict[Any, "FakeRule"]: # type: ignore[misc]
return OrderedDict(
- list(self._non_default_rules.items()) + list(self._default_rule.items())
+ list(self._non_default_rules.items()) + list(self._default_rule.items()) # type: ignore
)
- def remove_rule(self, arn):
+ def remove_rule(self, arn: str) -> None:
self._non_default_rules.pop(arn)
- def register(self, arn, rule):
+ def register(self, arn: str, rule: "FakeListenerRule") -> None:
self._non_default_rules[arn] = rule
sorted(self._non_default_rules.values(), key=lambda x: x.priority)
@staticmethod
- def cloudformation_name_type():
- return None
+ def cloudformation_name_type() -> str:
+ return ""
@staticmethod
- def cloudformation_type():
+ def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-elasticloadbalancingv2-listener.html
return "AWS::ElasticLoadBalancingV2::Listener"
@classmethod
- def create_from_cloudformation_json(
- cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
- ):
+ def create_from_cloudformation_json( # type: ignore[misc]
+ cls,
+ resource_name: str,
+ cloudformation_json: Any,
+ account_id: str,
+ region_name: str,
+ **kwargs: Any,
+ ) -> "FakeListener":
properties = cloudformation_json["Properties"]
elbv2_backend = elbv2_backends[account_id][region_name]
@@ -285,14 +302,14 @@ class FakeListener(CloudFormationModel):
return listener
@classmethod
- def update_from_cloudformation_json(
+ def update_from_cloudformation_json( # type: ignore[misc]
cls,
- original_resource,
- new_resource_name,
- cloudformation_json,
- account_id,
- region_name,
- ):
+ original_resource: Any,
+ new_resource_name: str,
+ cloudformation_json: Any,
+ account_id: str,
+ region_name: str,
+ ) -> "FakeListener":
properties = cloudformation_json["Properties"]
elbv2_backend = elbv2_backends[account_id][region_name]
@@ -315,7 +332,14 @@ class FakeListener(CloudFormationModel):
class FakeListenerRule(CloudFormationModel):
- def __init__(self, listener_arn, arn, conditions, priority, actions):
+ def __init__(
+ self,
+ listener_arn: str,
+ arn: str,
+ conditions: List[Dict[str, Any]],
+ priority: int,
+ actions: List["FakeAction"],
+ ):
self.listener_arn = listener_arn
self.arn = arn
self.conditions = conditions
@@ -323,18 +347,23 @@ class FakeListenerRule(CloudFormationModel):
self.priority = priority
@property
- def physical_resource_id(self):
+ def physical_resource_id(self) -> str:
return self.arn
@staticmethod
- def cloudformation_type():
+ def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-elasticloadbalancingv2-listenerrule.html
return "AWS::ElasticLoadBalancingV2::ListenerRule"
@classmethod
- def create_from_cloudformation_json(
- cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
- ):
+ def create_from_cloudformation_json( # type: ignore[misc]
+ cls,
+ resource_name: str,
+ cloudformation_json: Any,
+ account_id: str,
+ region_name: str,
+ **kwargs: Any,
+ ) -> "FakeListenerRule":
properties = cloudformation_json["Properties"]
elbv2_backend = elbv2_backends[account_id][region_name]
listener_arn = properties.get("ListenerArn")
@@ -348,14 +377,14 @@ class FakeListenerRule(CloudFormationModel):
return listener_rule
@classmethod
- def update_from_cloudformation_json(
+ def update_from_cloudformation_json( # type: ignore[misc]
cls,
- original_resource,
- new_resource_name,
- cloudformation_json,
- account_id,
- region_name,
- ):
+ original_resource: Any,
+ new_resource_name: str,
+ cloudformation_json: Any,
+ account_id: str,
+ region_name: str,
+ ) -> "FakeListenerRule":
properties = cloudformation_json["Properties"]
@@ -370,7 +399,14 @@ class FakeListenerRule(CloudFormationModel):
class FakeRule(BaseModel):
- def __init__(self, listener_arn, conditions, priority, actions, is_default):
+ def __init__(
+ self,
+ listener_arn: str,
+ conditions: List[Dict[str, Any]],
+ priority: Any,
+ actions: List["FakeAction"],
+ is_default: bool,
+ ):
self.listener_arn = listener_arn
self.arn = (
listener_arn.replace(":listener/", ":listener-rule/") + f"/{id(self)}"
@@ -382,7 +418,7 @@ class FakeRule(BaseModel):
class FakeAction(BaseModel):
- def __init__(self, data):
+ def __init__(self, data: Dict[str, Any]):
self.data = data
self.type = data.get("Type")
@@ -392,7 +428,7 @@ class FakeAction(BaseModel):
"Enabled": "false"
}
- def to_xml(self):
+ def to_xml(self) -> str:
template = Template(
"""{{ action.type }}
{% if "Order" in action.data %}
@@ -501,11 +537,11 @@ class FakeAction(BaseModel):
class FakeBackend(BaseModel):
- def __init__(self, instance_port):
+ def __init__(self, instance_port: str):
self.instance_port = instance_port
- self.policy_names = []
+ self.policy_names: List[str] = []
- def __repr__(self):
+ def __repr__(self) -> str:
return f"FakeBackend(inp: {self.instance_port}, policies: {self.policy_names})"
@@ -530,15 +566,15 @@ class FakeLoadBalancer(CloudFormationModel):
def __init__(
self,
- name,
- security_groups,
- subnets,
- vpc_id,
- arn,
- dns_name,
- state,
- scheme="internet-facing",
- loadbalancer_type=None,
+ name: str,
+ security_groups: List[str],
+ subnets: List[Subnet],
+ vpc_id: str,
+ arn: str,
+ dns_name: str,
+ state: str,
+ scheme: str = "internet-facing",
+ loadbalancer_type: Optional[str] = None,
):
self.name = name
self.created_time = iso_8601_datetime_with_milliseconds(datetime.datetime.now())
@@ -546,8 +582,8 @@ class FakeLoadBalancer(CloudFormationModel):
self.security_groups = security_groups
self.subnets = subnets or []
self.vpc_id = vpc_id
- self.listeners = OrderedDict()
- self.tags = {}
+ self.listeners: Dict[str, FakeListener] = OrderedDict()
+ self.tags: Dict[str, Any] = {}
self.arn = arn
self.dns_name = dns_name
self.state = state
@@ -563,30 +599,35 @@ class FakeLoadBalancer(CloudFormationModel):
}
@property
- def physical_resource_id(self):
+ def physical_resource_id(self) -> str:
return self.arn
- def activate(self):
+ def activate(self) -> None:
if self.state == "provisioning":
self.state = "active"
- def delete(self, account_id, region):
+ def delete(self, account_id: str, region: str) -> None:
"""Not exposed as part of the ELB API - used for CloudFormation."""
elbv2_backends[account_id][region].delete_load_balancer(self.arn)
@staticmethod
- def cloudformation_name_type():
+ def cloudformation_name_type() -> str:
return "Name"
@staticmethod
- def cloudformation_type():
+ def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-elasticloadbalancingv2-loadbalancer.html
return "AWS::ElasticLoadBalancingV2::LoadBalancer"
@classmethod
- def create_from_cloudformation_json(
- cls, resource_name, cloudformation_json, account_id, region_name, **kwargs
- ):
+ def create_from_cloudformation_json( # type: ignore[misc]
+ cls,
+ resource_name: str,
+ cloudformation_json: Any,
+ account_id: str,
+ region_name: str,
+ **kwargs: Any,
+ ) -> "FakeLoadBalancer":
properties = cloudformation_json["Properties"]
elbv2_backend = elbv2_backends[account_id][region_name]
@@ -601,7 +642,7 @@ class FakeLoadBalancer(CloudFormationModel):
return load_balancer
@classmethod
- def has_cfn_attr(cls, attr):
+ def has_cfn_attr(cls, attr: str) -> bool:
return attr in [
"DNSName",
"LoadBalancerName",
@@ -610,7 +651,7 @@ class FakeLoadBalancer(CloudFormationModel):
"SecurityGroups",
]
- def get_cfn_attribute(self, attribute_name):
+ def get_cfn_attribute(self, attribute_name: str) -> Any:
"""
Implemented attributes:
* DNSName
@@ -643,21 +684,23 @@ class FakeLoadBalancer(CloudFormationModel):
class ELBv2Backend(BaseBackend):
- def __init__(self, region_name, account_id):
+ def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
- self.target_groups = OrderedDict()
- self.load_balancers = OrderedDict()
+ self.target_groups: Dict[str, FakeTargetGroup] = OrderedDict()
+ self.load_balancers: Dict[str, FakeLoadBalancer] = OrderedDict()
self.tagging_service = TaggingService()
@staticmethod
- def default_vpc_endpoint_service(service_region, zones):
+ def default_vpc_endpoint_service(
+ service_region: str, zones: List[str]
+ ) -> List[Dict[str, str]]:
"""Default VPC endpoint service."""
return BaseBackend.default_vpc_endpoint_service_factory(
service_region, zones, "elasticloadbalancing"
)
@property
- def ec2_backend(self):
+ def ec2_backend(self) -> Any: # type: ignore[misc]
"""
EC2 backend
@@ -668,14 +711,14 @@ class ELBv2Backend(BaseBackend):
def create_load_balancer(
self,
- name,
- security_groups,
- subnet_ids,
- subnet_mappings=None,
- scheme="internet-facing",
- loadbalancer_type=None,
- tags=None,
- ):
+ name: str,
+ security_groups: List[str],
+ subnet_ids: List[str],
+ subnet_mappings: Optional[List[Dict[str, str]]] = None,
+ scheme: str = "internet-facing",
+ loadbalancer_type: Optional[str] = None,
+ tags: Optional[List[Dict[str, str]]] = None,
+ ) -> FakeLoadBalancer:
vpc_id = None
subnets = []
state = "provisioning"
@@ -718,7 +761,9 @@ class ELBv2Backend(BaseBackend):
self.tagging_service.tag_resource(arn, tags)
return new_load_balancer
- def convert_and_validate_action_properties(self, properties):
+ def convert_and_validate_action_properties(
+ self, properties: Dict[str, Any]
+ ) -> List[Dict[str, Any]]:
# transform Actions to confirm with the rest of the code and XML templates
default_actions = []
@@ -730,8 +775,15 @@ class ELBv2Backend(BaseBackend):
raise InvalidActionTypeError(action_type, i + 1)
return default_actions
- def create_rule(self, listener_arn, conditions, priority, actions, tags=None):
- actions = [FakeAction(action) for action in actions]
+ def create_rule(
+ self,
+ listener_arn: str,
+ conditions: List[Dict[str, Any]],
+ priority: int,
+ actions: List[Dict[str, Any]],
+ tags: Optional[List[Dict[str, str]]] = None,
+ ) -> FakeListenerRule:
+ fake_actions = [FakeAction(action) for action in actions]
listeners = self.describe_listeners(None, [listener_arn])
if not listeners:
raise ListenerNotFoundError()
@@ -752,7 +804,7 @@ class ELBv2Backend(BaseBackend):
if rule.priority == priority:
raise PriorityInUseError()
- self._validate_actions(actions)
+ self._validate_actions(fake_actions)
arn = listener_arn.replace(":listener/", ":listener-rule/")
arn += f"/{mock_random.get_random_hex(16)}"
@@ -760,12 +812,14 @@ class ELBv2Backend(BaseBackend):
# TODO: check for error 'TooManyRules'
# create rule
- rule = FakeListenerRule(listener.arn, arn, conditions, priority, actions)
- listener.register(arn, rule)
+ listener_rule = FakeListenerRule(
+ listener.arn, arn, conditions, priority, fake_actions
+ )
+ listener.register(arn, listener_rule)
self.tagging_service.tag_resource(arn, tags)
- return rule
+ return listener_rule
- def _validate_conditions(self, conditions):
+ def _validate_conditions(self, conditions: List[Dict[str, Any]]) -> None:
for condition in conditions:
if "Field" in condition:
field = condition["Field"]
@@ -790,7 +844,7 @@ class ELBv2Backend(BaseBackend):
func = getattr(self, method_name)
func(condition)
- def _validate_host_header_condition(self, condition):
+ def _validate_host_header_condition(self, condition: Dict[str, Any]) -> None:
values = None
if "HostHeaderConfig" in condition:
values = condition["HostHeaderConfig"]["Values"]
@@ -808,7 +862,7 @@ class ELBv2Backend(BaseBackend):
"The 'host-header' value is too long; the limit is '128'"
)
- def _validate_http_header_condition(self, condition):
+ def _validate_http_header_condition(self, condition: Dict[str, Any]) -> None:
if "HttpHeaderConfig" in condition:
config = condition["HttpHeaderConfig"]
name = config.get("HttpHeaderName")
@@ -827,7 +881,9 @@ class ELBv2Backend(BaseBackend):
"A 'HttpHeaderConfig' must be specified with 'http-header'"
)
- def _validate_http_request_method_condition(self, condition):
+ def _validate_http_request_method_condition(
+ self, condition: Dict[str, Any]
+ ) -> None:
if "HttpRequestMethodConfig" in condition:
for value in condition["HttpRequestMethodConfig"]["Values"]:
if len(value) > 40:
@@ -843,7 +899,7 @@ class ELBv2Backend(BaseBackend):
"A 'HttpRequestMethodConfig' must be specified with 'http-request-method'"
)
- def _validate_path_pattern_condition(self, condition):
+ def _validate_path_pattern_condition(self, condition: Dict[str, Any]) -> None:
values = None
if "PathPatternConfig" in condition:
values = condition["PathPatternConfig"]["Values"]
@@ -865,7 +921,7 @@ class ELBv2Backend(BaseBackend):
"The 'path-pattern' value is too long; the limit is '128'"
)
- def _validate_source_ip_condition(self, condition):
+ def _validate_source_ip_condition(self, condition: Dict[str, Any]) -> None:
if "SourceIpConfig" in condition:
values = condition["SourceIpConfig"].get("Values", [])
if len(values) == 0:
@@ -877,7 +933,7 @@ class ELBv2Backend(BaseBackend):
"A 'SourceIpConfig' must be specified with 'source-ip'"
)
- def _validate_query_string_condition(self, condition):
+ def _validate_query_string_condition(self, condition: Dict[str, Any]) -> None:
if "QueryStringConfig" in condition:
config = condition["QueryStringConfig"]
values = config["Values"]
@@ -899,7 +955,7 @@ class ELBv2Backend(BaseBackend):
"A 'QueryStringConfig' must be specified with 'query-string'"
)
- def _get_target_group_arns_from(self, action_data):
+ def _get_target_group_arns_from(self, action_data: Dict[str, Any]) -> List[Any]:
if "TargetGroupArn" in action_data:
return [action_data["TargetGroupArn"]]
elif "ForwardConfig" in action_data:
@@ -910,7 +966,7 @@ class ELBv2Backend(BaseBackend):
else:
return []
- def _validate_actions(self, actions):
+ def _validate_actions(self, actions: List[FakeAction]) -> None:
# validate Actions
target_group_arns = [
target_group.arn for target_group in self.target_groups.values()
@@ -937,7 +993,9 @@ class ELBv2Backend(BaseBackend):
else:
raise InvalidActionTypeError(action_type, index)
- def _validate_fixed_response_action(self, action, i, index):
+ def _validate_fixed_response_action(
+ self, action: FakeAction, i: int, index: int
+ ) -> None:
status_code = action.data.get("FixedResponseConfig", {}).get("StatusCode")
if status_code is None:
raise ParamValidationError(
@@ -962,7 +1020,7 @@ Member must satisfy regular expression pattern: {expression}"
"The ContentType must be one of:'text/html', 'application/json', 'application/javascript', 'text/css', 'text/plain'"
)
- def create_target_group(self, name, **kwargs):
+ def create_target_group(self, name: str, **kwargs: Any) -> FakeTargetGroup:
if len(name) > 32:
raise InvalidTargetGroupNameError(
f"Target group name '{name}' cannot be longer than '32' characters"
@@ -1023,14 +1081,18 @@ Member must satisfy regular expression pattern: {expression}"
self.add_tags(resource_arns=[target_group.arn], tags=tags)
return target_group
- def modify_target_group_attributes(self, target_group_arn, attributes):
+ def modify_target_group_attributes(
+ self, target_group_arn: str, attributes: Dict[str, Any]
+ ) -> None:
target_group = self.target_groups.get(target_group_arn)
if not target_group:
raise TargetGroupNotFoundError()
target_group.attributes.update(attributes)
- def convert_and_validate_certificates(self, certificates):
+ def convert_and_validate_certificates(
+ self, certificates: List[Dict[str, Any]]
+ ) -> List[Dict[str, Any]]:
# transform default certificate to conform with the rest of the code and XML templates
for cert in certificates or []:
@@ -1038,7 +1100,9 @@ Member must satisfy regular expression pattern: {expression}"
return certificates
- def convert_and_validate_properties(self, properties):
+ def convert_and_validate_properties(
+ self, properties: Dict[str, Any]
+ ) -> List[Dict[str, Any]]:
# transform default actions to confirm with the rest of the code and XML templates
# Caller: CF create/update for type "AWS::ElasticLoadBalancingV2::Listener"
@@ -1057,16 +1121,16 @@ Member must satisfy regular expression pattern: {expression}"
def create_listener(
self,
- load_balancer_arn,
- protocol,
- port,
- ssl_policy,
- certificate,
- default_actions,
- alpn_policy=None,
- tags=None,
- ):
- default_actions = [FakeAction(action) for action in default_actions]
+ load_balancer_arn: str,
+ protocol: str,
+ port: str,
+ ssl_policy: str,
+ certificate: Optional[str],
+ actions: List[Dict[str, Any]],
+ alpn_policy: Optional[List[str]] = None,
+ tags: Optional[List[Dict[str, str]]] = None,
+ ) -> FakeListener:
+ default_actions = [FakeAction(action) for action in actions]
balancer = self.load_balancers.get(load_balancer_arn)
if balancer is None:
raise LoadBalancerNotFoundError()
@@ -1101,8 +1165,10 @@ Member must satisfy regular expression pattern: {expression}"
return listener
- def describe_load_balancers(self, arns, names):
- balancers = self.load_balancers.values()
+ def describe_load_balancers(
+ self, arns: Optional[List[str]], names: Optional[List[str]]
+ ) -> List[FakeLoadBalancer]:
+ balancers = list(self.load_balancers.values())
arns = arns or []
names = names or []
if not arns and not names:
@@ -1135,7 +1201,9 @@ Member must satisfy regular expression pattern: {expression}"
return matched_balancers
- def describe_rules(self, listener_arn, rule_arns):
+ def describe_rules(
+ self, listener_arn: Optional[str], rule_arns: Optional[List[str]]
+ ) -> List[FakeRule]:
if listener_arn is None and not rule_arns:
raise InvalidDescribeRulesRequest(
"You must specify either listener rule ARNs or a listener ARN"
@@ -1146,17 +1214,17 @@ Member must satisfy regular expression pattern: {expression}"
)
if listener_arn:
listener = self.describe_listeners(None, [listener_arn])[0]
- return listener.rules.values()
+ return list(listener.rules.values())
# search for rule arns
matched_rules = []
for load_balancer_arn in self.load_balancers:
- listeners = self.load_balancers.get(load_balancer_arn).listeners.values()
+ listeners = self.load_balancers.get(load_balancer_arn).listeners.values() # type: ignore
for listener in listeners:
for rule in listener.rules.values():
- if rule.arn in rule_arns:
+ if rule.arn in rule_arns: # type: ignore[operator]
matched_rules.append(rule)
- if len(matched_rules) != len(rule_arns):
+ if len(matched_rules) != len(rule_arns): # type: ignore
raise RuleNotFoundError("One or more rules not found")
return matched_rules
@@ -1194,11 +1262,13 @@ Member must satisfy regular expression pattern: {expression}"
return self.target_groups.values()
- def describe_listeners(self, load_balancer_arn, listener_arns):
+ def describe_listeners(
+ self, load_balancer_arn: Optional[str], listener_arns: List[str]
+ ) -> List[FakeListener]:
if load_balancer_arn:
if load_balancer_arn not in self.load_balancers:
raise LoadBalancerNotFoundError()
- return self.load_balancers.get(load_balancer_arn).listeners.values()
+ return list(self.load_balancers.get(load_balancer_arn).listeners.values()) # type: ignore
matched = []
for load_balancer in self.load_balancers.values():
@@ -1210,12 +1280,12 @@ Member must satisfy regular expression pattern: {expression}"
raise ListenerNotFoundError()
return matched
- def delete_load_balancer(self, arn):
+ def delete_load_balancer(self, arn: str) -> None:
self.load_balancers.pop(arn, None)
- def delete_rule(self, arn):
+ def delete_rule(self, arn: str) -> None:
for load_balancer_arn in self.load_balancers:
- listeners = self.load_balancers.get(load_balancer_arn).listeners.values()
+ listeners = self.load_balancers.get(load_balancer_arn).listeners.values() # type: ignore[union-attr]
for listener in listeners:
for rule in listener.rules.values():
if rule.arn == arn:
@@ -1225,7 +1295,7 @@ Member must satisfy regular expression pattern: {expression}"
# should raise RuleNotFound Error according to the AWS API doc
# however, boto3 does't raise error even if rule is not found
- def delete_target_group(self, target_group_arn):
+ def delete_target_group(self, target_group_arn: str) -> Optional[FakeTargetGroup]: # type: ignore[return]
if target_group_arn not in self.target_groups:
raise TargetGroupNotFoundError()
@@ -1238,17 +1308,22 @@ Member must satisfy regular expression pattern: {expression}"
del self.target_groups[target_group_arn]
return target_group
- def delete_listener(self, listener_arn):
+ def delete_listener(self, listener_arn: str) -> FakeListener:
for load_balancer in self.load_balancers.values():
listener = load_balancer.listeners.pop(listener_arn, None)
if listener:
return listener
raise ListenerNotFoundError()
- def modify_rule(self, rule_arn, conditions, actions):
- actions = [FakeAction(action) for action in actions]
+ def modify_rule(
+ self,
+ rule_arn: str,
+ conditions: List[Dict[str, Any]],
+ actions: List[Dict[str, Any]],
+ ) -> FakeRule:
+ fake_actions = [FakeAction(action) for action in actions]
# if conditions or actions is empty list, do not update the attributes
- if not conditions and not actions:
+ if not conditions and not fake_actions:
raise InvalidModifyRuleArgumentsError()
rules = self.describe_rules(listener_arn=None, rule_arns=[rule_arn])
if not rules:
@@ -1260,7 +1335,7 @@ Member must satisfy regular expression pattern: {expression}"
# TODO: check pattern of value for 'path-pattern'
# validate Actions
- self._validate_actions(actions)
+ self._validate_actions(fake_actions)
# TODO: check for error 'TooManyRegistrationsForTargetId'
# TODO: check for error 'TooManyRules'
@@ -1269,10 +1344,10 @@ Member must satisfy regular expression pattern: {expression}"
if conditions:
rule.conditions = conditions
if actions:
- rule.actions = actions
+ rule.actions = fake_actions
return rule
- def register_targets(self, target_group_arn: str, instances: List[Any]):
+ def register_targets(self, target_group_arn: str, instances: List[Any]) -> None:
target_group = self.target_groups.get(target_group_arn)
if target_group is None:
raise TargetGroupNotFoundError()
@@ -1280,22 +1355,26 @@ Member must satisfy regular expression pattern: {expression}"
def deregister_targets(
self, target_group_arn: str, instances: List[Dict[str, Any]]
- ):
+ ) -> None:
target_group = self.target_groups.get(target_group_arn)
if target_group is None:
raise TargetGroupNotFoundError()
target_group.deregister(instances)
- def describe_target_health(self, target_group_arn, targets):
+ def describe_target_health(
+ self, target_group_arn: str, targets: List[Dict[str, Any]]
+ ) -> List[FakeHealthStatus]:
target_group = self.target_groups.get(target_group_arn)
if target_group is None:
raise TargetGroupNotFoundError()
if not targets:
- targets = target_group.targets.values()
+ targets = list(target_group.targets.values())
return [target_group.health_for(target, self.ec2_backend) for target in targets]
- def set_rule_priorities(self, rule_priorities):
+ def set_rule_priorities(
+ self, rule_priorities: List[Dict[str, Any]]
+ ) -> List[FakeRule]:
# validate
priorities = [rule_priority["priority"] for rule_priority in rule_priorities]
for priority in set(priorities):
@@ -1332,7 +1411,7 @@ Member must satisfy regular expression pattern: {expression}"
modified_rules.append(given_rule)
return modified_rules
- def set_ip_address_type(self, arn, ip_type):
+ def set_ip_address_type(self, arn: str, ip_type: str) -> None:
if ip_type not in ("internal", "dualstack"):
raise RESTError(
"InvalidParameterValue",
@@ -1351,7 +1430,7 @@ Member must satisfy regular expression pattern: {expression}"
balancer.stack = ip_type
- def set_security_groups(self, arn, sec_groups):
+ def set_security_groups(self, arn: str, sec_groups: List[str]) -> None:
balancer = self.load_balancers.get(arn)
if balancer is None:
raise LoadBalancerNotFoundError()
@@ -1366,16 +1445,18 @@ Member must satisfy regular expression pattern: {expression}"
balancer.security_groups = sec_groups
- def set_subnets(self, arn, subnets, subnet_mappings):
+ def set_subnets(
+ self, arn: str, subnets: List[str], subnet_mappings: List[Dict[str, Any]]
+ ) -> Dict[str, str]:
balancer = self.load_balancers.get(arn)
if balancer is None:
raise LoadBalancerNotFoundError()
subnet_objects = []
- sub_zone_list = {}
- for subnet in subnets:
+ sub_zone_list: Dict[str, str] = {}
+ for subnet_id in subnets:
try:
- subnet = self._get_subnet(sub_zone_list, subnet)
+ subnet = self._get_subnet(sub_zone_list, subnet_id)
sub_zone_list[subnet.availability_zone] = subnet.id
subnet_objects.append(subnet)
@@ -1397,10 +1478,10 @@ Member must satisfy regular expression pattern: {expression}"
balancer.subnets = subnet_objects
- return sub_zone_list.items()
+ return sub_zone_list
- def _get_subnet(self, sub_zone_list, subnet):
- subnet = self.ec2_backend.get_subnet(subnet)
+ def _get_subnet(self, sub_zone_list: Dict[str, str], subnet_id: str) -> Subnet:
+ subnet = self.ec2_backend.get_subnet(subnet_id)
if subnet.availability_zone in sub_zone_list:
raise RESTError(
"InvalidConfigurationRequest",
@@ -1408,7 +1489,9 @@ Member must satisfy regular expression pattern: {expression}"
)
return subnet
- def modify_load_balancer_attributes(self, arn, attrs):
+ def modify_load_balancer_attributes(
+ self, arn: str, attrs: Dict[str, Any]
+ ) -> Dict[str, Any]:
balancer = self.load_balancers.get(arn)
if balancer is None:
raise LoadBalancerNotFoundError()
@@ -1420,7 +1503,7 @@ Member must satisfy regular expression pattern: {expression}"
balancer.attrs.update(attrs)
return balancer.attrs
- def describe_load_balancer_attributes(self, arn):
+ def describe_load_balancer_attributes(self, arn: str) -> Dict[str, Any]:
balancer = self.load_balancers.get(arn)
if balancer is None:
raise LoadBalancerNotFoundError()
@@ -1429,17 +1512,17 @@ Member must satisfy regular expression pattern: {expression}"
def modify_target_group(
self,
- arn,
- health_check_proto=None,
- health_check_port=None,
- health_check_path=None,
- health_check_interval=None,
- health_check_timeout=None,
- healthy_threshold_count=None,
- unhealthy_threshold_count=None,
- http_codes=None,
- health_check_enabled=None,
- ):
+ arn: str,
+ health_check_proto: Optional[str] = None,
+ health_check_port: Optional[str] = None,
+ health_check_path: Optional[str] = None,
+ health_check_interval: Optional[str] = None,
+ health_check_timeout: Optional[int] = None,
+ healthy_threshold_count: Optional[str] = None,
+ unhealthy_threshold_count: Optional[str] = None,
+ http_codes: Optional[str] = None,
+ health_check_enabled: Optional[str] = None,
+ ) -> FakeTargetGroup:
target_group = self.target_groups.get(arn)
if target_group is None:
raise TargetGroupNotFoundError()
@@ -1476,14 +1559,14 @@ Member must satisfy regular expression pattern: {expression}"
def modify_listener(
self,
- arn,
- port=None,
- protocol=None,
- ssl_policy=None,
- certificates=None,
- default_actions=None,
- ):
- default_actions = [FakeAction(action) for action in default_actions]
+ arn: str,
+ port: Optional[str] = None,
+ protocol: Optional[str] = None,
+ ssl_policy: Optional[str] = None,
+ certificates: Optional[List[Dict[str, Any]]] = None,
+ actions: Optional[List[Dict[str, Any]]] = None,
+ ) -> FakeListener:
+ default_actions = [FakeAction(action) for action in actions] # type: ignore
listener = self.describe_listeners(load_balancer_arn=None, listener_arns=[arn])[
0
]
@@ -1510,8 +1593,9 @@ Member must satisfy regular expression pattern: {expression}"
f"Certificate {default_cert_arn} not found",
)
listener.certificate = default_cert_arn
- listener.certificates = certificates
- elif len(certificates) == 0 and len(listener.certificates) == 0:
+ # TODO: Calling describe_listener_certificates after this operation returns a wrong result
+ listener.certificates = certificates # type: ignore[assignment]
+ elif len(certificates) == 0 and len(listener.certificates) == 0: # type: ignore[arg-type]
raise RESTError(
"CertificateWereNotPassed",
"You must provide a list containing exactly one certificate if the listener protocol is HTTPS.",
@@ -1533,7 +1617,7 @@ Member must satisfy regular expression pattern: {expression}"
return listener
- def _certificate_exists(self, certificate_arn):
+ def _certificate_exists(self, certificate_arn: str) -> bool:
"""
Verify the provided certificate exists in either ACM or IAM
"""
@@ -1558,7 +1642,7 @@ Member must satisfy regular expression pattern: {expression}"
# Safe to assume it doesn't exist when we get here
return False
- def _any_listener_using(self, target_group_arn):
+ def _any_listener_using(self, target_group_arn: str) -> bool:
for load_balancer in self.load_balancers.values():
for listener in load_balancer.listeners.values():
for rule in listener.rules.values():
@@ -1570,31 +1654,35 @@ Member must satisfy regular expression pattern: {expression}"
return True
return False
- def notify_terminate_instances(self, instance_ids):
+ def notify_terminate_instances(self, instance_ids: List[str]) -> None:
for target_group in self.target_groups.values():
target_group.deregister_terminated_instances(instance_ids)
- def add_listener_certificates(self, arn, certificates):
+ def add_listener_certificates(
+ self, arn: str, certificates: List[Dict[str, Any]]
+ ) -> List[str]:
listener = self.describe_listeners(load_balancer_arn=None, listener_arns=[arn])[
0
]
listener.certificates.extend([c["certificate_arn"] for c in certificates])
return listener.certificates
- def describe_listener_certificates(self, arn):
+ def describe_listener_certificates(self, arn: str) -> List[str]:
listener = self.describe_listeners(load_balancer_arn=None, listener_arns=[arn])[
0
]
return listener.certificates
- def remove_listener_certificates(self, arn, certificates):
+ def remove_listener_certificates(
+ self, arn: str, certificates: List[Dict[str, Any]]
+ ) -> None:
listener = self.describe_listeners(load_balancer_arn=None, listener_arns=[arn])[
0
]
cert_arns = [c["certificate_arn"] for c in certificates]
listener.certificates = [c for c in listener.certificates if c not in cert_arns]
- def add_tags(self, resource_arns, tags):
+ def add_tags(self, resource_arns: List[str], tags: List[Dict[str, str]]) -> None:
tag_dict = self.tagging_service.flatten_tag_list(tags)
for arn in resource_arns:
existing = self.tagging_service.get_tag_dict_for_resource(arn)
@@ -1604,19 +1692,19 @@ Member must satisfy regular expression pattern: {expression}"
self._get_resource_by_arn(arn)
self.tagging_service.tag_resource(arn, tags)
- def remove_tags(self, resource_arns, tag_keys):
+ def remove_tags(self, resource_arns: List[str], tag_keys: List[str]) -> None:
for arn in resource_arns:
self.tagging_service.untag_resource_using_names(arn, tag_keys)
- def describe_tags(self, resource_arns):
+ def describe_tags(self, resource_arns: List[str]) -> Dict[str, Dict[str, str]]:
return {
arn: self.tagging_service.get_tag_dict_for_resource(arn)
for arn in resource_arns
}
- def _get_resource_by_arn(self, arn):
+ def _get_resource_by_arn(self, arn: str) -> Any:
if ":targetgroup" in arn:
- resource = self.target_groups.get(arn)
+ resource: Any = self.target_groups.get(arn)
if not resource:
raise TargetGroupNotFoundError()
elif ":loadbalancer" in arn:
diff --git a/moto/elbv2/responses.py b/moto/elbv2/responses.py
index 8b4206be0..f4ebba781 100644
--- a/moto/elbv2/responses.py
+++ b/moto/elbv2/responses.py
@@ -1,7 +1,7 @@
from moto.core.exceptions import RESTError
from moto.core.responses import BaseResponse
from moto.utilities.aws_headers import amzn_request_id
-from .models import elbv2_backends
+from .models import elbv2_backends, ELBv2Backend
from .exceptions import TargetGroupNotFoundError
from .exceptions import ListenerOrBalancerMissingError
@@ -135,15 +135,15 @@ SSL_POLICIES = [
class ELBV2Response(BaseResponse):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__(service_name="elbv2")
@property
- def elbv2_backend(self):
+ def elbv2_backend(self) -> ELBv2Backend:
return elbv2_backends[self.current_account][self.region]
@amzn_request_id
- def create_load_balancer(self):
+ def create_load_balancer(self) -> str:
params = self._get_params()
load_balancer_name = params.get("Name")
subnet_ids = self._get_multi_param("Subnets.member")
@@ -154,11 +154,11 @@ class ELBV2Response(BaseResponse):
tags = params.get("Tags")
load_balancer = self.elbv2_backend.create_load_balancer(
- name=load_balancer_name,
+ name=load_balancer_name, # type: ignore
security_groups=security_groups,
subnet_ids=subnet_ids,
subnet_mappings=subnet_mappings,
- scheme=scheme,
+ scheme=scheme, # type: ignore
loadbalancer_type=loadbalancer_type,
tags=tags,
)
@@ -166,7 +166,7 @@ class ELBV2Response(BaseResponse):
return template.render(load_balancer=load_balancer)
@amzn_request_id
- def create_rule(self):
+ def create_rule(self) -> str:
params = self._get_params()
rules = self.elbv2_backend.create_rule(
listener_arn=params["ListenerArn"],
@@ -180,7 +180,7 @@ class ELBV2Response(BaseResponse):
return template.render(rules=rules)
@amzn_request_id
- def create_target_group(self):
+ def create_target_group(self) -> str:
params = self._get_params()
name = params.get("Name")
vpc_id = params.get("VpcId")
@@ -200,7 +200,7 @@ class ELBV2Response(BaseResponse):
tags = params.get("Tags")
target_group = self.elbv2_backend.create_target_group(
- name,
+ name, # type: ignore
vpc_id=vpc_id,
protocol=protocol,
protocol_version=protocol_version,
@@ -222,7 +222,7 @@ class ELBV2Response(BaseResponse):
return template.render(target_group=target_group)
@amzn_request_id
- def create_listener(self):
+ def create_listener(self) -> str:
params = self._get_params()
load_balancer_arn = self._get_param("LoadBalancerArn")
protocol = self._get_param("Protocol")
@@ -243,7 +243,7 @@ class ELBV2Response(BaseResponse):
port=port,
ssl_policy=ssl_policy,
certificate=certificate,
- default_actions=default_actions,
+ actions=default_actions,
alpn_policy=alpn_policy,
tags=tags,
)
@@ -252,7 +252,7 @@ class ELBV2Response(BaseResponse):
return template.render(listener=listener)
@amzn_request_id
- def describe_load_balancers(self):
+ def describe_load_balancers(self) -> str:
arns = self._get_multi_param("LoadBalancerArns.member")
names = self._get_multi_param("Names.member")
all_load_balancers = list(
@@ -276,7 +276,7 @@ class ELBV2Response(BaseResponse):
return template.render(load_balancers=load_balancers_resp, marker=next_marker)
@amzn_request_id
- def describe_rules(self):
+ def describe_rules(self) -> str:
listener_arn = self._get_param("ListenerArn")
rule_arns = (
self._get_multi_param("RuleArns.member")
@@ -305,7 +305,7 @@ class ELBV2Response(BaseResponse):
return template.render(rules=rules_resp, marker=next_marker)
@amzn_request_id
- def describe_target_groups(self):
+ def describe_target_groups(self) -> str:
load_balancer_arn = self._get_param("LoadBalancerArn")
target_group_arns = self._get_multi_param("TargetGroupArns.member")
names = self._get_multi_param("Names.member")
@@ -317,7 +317,7 @@ class ELBV2Response(BaseResponse):
return template.render(target_groups=target_groups)
@amzn_request_id
- def describe_target_group_attributes(self):
+ def describe_target_group_attributes(self) -> str:
target_group_arn = self._get_param("TargetGroupArn")
target_group = self.elbv2_backend.target_groups.get(target_group_arn)
if not target_group:
@@ -326,7 +326,7 @@ class ELBV2Response(BaseResponse):
return template.render(attributes=target_group.attributes)
@amzn_request_id
- def describe_listeners(self):
+ def describe_listeners(self) -> str:
load_balancer_arn = self._get_param("LoadBalancerArn")
listener_arns = self._get_multi_param("ListenerArns.member")
if not load_balancer_arn and not listener_arns:
@@ -339,35 +339,35 @@ class ELBV2Response(BaseResponse):
return template.render(listeners=listeners)
@amzn_request_id
- def delete_load_balancer(self):
+ def delete_load_balancer(self) -> str:
arn = self._get_param("LoadBalancerArn")
self.elbv2_backend.delete_load_balancer(arn)
template = self.response_template(DELETE_LOAD_BALANCER_TEMPLATE)
return template.render()
@amzn_request_id
- def delete_rule(self):
+ def delete_rule(self) -> str:
arn = self._get_param("RuleArn")
self.elbv2_backend.delete_rule(arn)
template = self.response_template(DELETE_RULE_TEMPLATE)
return template.render()
@amzn_request_id
- def delete_target_group(self):
+ def delete_target_group(self) -> str:
arn = self._get_param("TargetGroupArn")
self.elbv2_backend.delete_target_group(arn)
template = self.response_template(DELETE_TARGET_GROUP_TEMPLATE)
return template.render()
@amzn_request_id
- def delete_listener(self):
+ def delete_listener(self) -> str:
arn = self._get_param("ListenerArn")
self.elbv2_backend.delete_listener(arn)
template = self.response_template(DELETE_LISTENER_TEMPLATE)
return template.render()
@amzn_request_id
- def modify_rule(self):
+ def modify_rule(self) -> str:
rule_arn = self._get_param("RuleArn")
params = self._get_params()
conditions = params.get("Conditions", [])
@@ -379,17 +379,17 @@ class ELBV2Response(BaseResponse):
return template.render(rules=rules)
@amzn_request_id
- def modify_target_group_attributes(self):
+ def modify_target_group_attributes(self) -> str:
target_group_arn = self._get_param("TargetGroupArn")
- attributes = self._get_list_prefix("Attributes.member")
- attributes = {attr["key"]: attr["value"] for attr in attributes}
+ attrs = self._get_list_prefix("Attributes.member")
+ attributes = {attr["key"]: attr["value"] for attr in attrs}
self.elbv2_backend.modify_target_group_attributes(target_group_arn, attributes)
template = self.response_template(MODIFY_TARGET_GROUP_ATTRIBUTES_TEMPLATE)
return template.render(attributes=attributes)
@amzn_request_id
- def register_targets(self):
+ def register_targets(self) -> str:
target_group_arn = self._get_param("TargetGroupArn")
targets = self._get_list_prefix("Targets.member")
self.elbv2_backend.register_targets(target_group_arn, targets)
@@ -398,7 +398,7 @@ class ELBV2Response(BaseResponse):
return template.render()
@amzn_request_id
- def deregister_targets(self):
+ def deregister_targets(self) -> str:
target_group_arn = self._get_param("TargetGroupArn")
targets = self._get_list_prefix("Targets.member")
self.elbv2_backend.deregister_targets(target_group_arn, targets)
@@ -407,7 +407,7 @@ class ELBV2Response(BaseResponse):
return template.render()
@amzn_request_id
- def describe_target_health(self):
+ def describe_target_health(self) -> str:
target_group_arn = self._get_param("TargetGroupArn")
targets = self._get_list_prefix("Targets.member")
target_health_descriptions = self.elbv2_backend.describe_target_health(
@@ -418,7 +418,7 @@ class ELBV2Response(BaseResponse):
return template.render(target_health_descriptions=target_health_descriptions)
@amzn_request_id
- def set_rule_priorities(self):
+ def set_rule_priorities(self) -> str:
rule_priorities = self._get_list_prefix("RulePriorities.member")
for rule_priority in rule_priorities:
rule_priority["priority"] = int(rule_priority["priority"])
@@ -427,18 +427,17 @@ class ELBV2Response(BaseResponse):
return template.render(rules=rules)
@amzn_request_id
- def add_tags(self):
+ def add_tags(self) -> str:
resource_arns = self._get_multi_param("ResourceArns.member")
tags = self._get_params().get("Tags")
- tags = self._get_params().get("Tags")
- self.elbv2_backend.add_tags(resource_arns, tags)
+ self.elbv2_backend.add_tags(resource_arns, tags) # type: ignore
template = self.response_template(ADD_TAGS_TEMPLATE)
return template.render()
@amzn_request_id
- def remove_tags(self):
+ def remove_tags(self) -> str:
resource_arns = self._get_multi_param("ResourceArns.member")
tag_keys = self._get_multi_param("TagKeys.member")
@@ -448,7 +447,7 @@ class ELBV2Response(BaseResponse):
return template.render()
@amzn_request_id
- def describe_tags(self):
+ def describe_tags(self) -> str:
resource_arns = self._get_multi_param("ResourceArns.member")
resource_tags = self.elbv2_backend.describe_tags(resource_arns)
@@ -456,7 +455,7 @@ class ELBV2Response(BaseResponse):
return template.render(resource_tags=resource_tags)
@amzn_request_id
- def describe_account_limits(self):
+ def describe_account_limits(self) -> str:
# Supports paging but not worth implementing yet
# marker = self._get_param('Marker')
# page_size = self._get_int_param('PageSize')
@@ -476,7 +475,7 @@ class ELBV2Response(BaseResponse):
return template.render(limits=limits)
@amzn_request_id
- def describe_ssl_policies(self):
+ def describe_ssl_policies(self) -> str:
names = self._get_multi_param("Names.member.")
# Supports paging but not worth implementing yet
# marker = self._get_param('Marker')
@@ -484,13 +483,13 @@ class ELBV2Response(BaseResponse):
policies = SSL_POLICIES
if names:
- policies = filter(lambda policy: policy["name"] in names, policies)
+ policies = filter(lambda policy: policy["name"] in names, policies) # type: ignore
template = self.response_template(DESCRIBE_SSL_POLICIES_TEMPLATE)
return template.render(policies=policies)
@amzn_request_id
- def set_ip_address_type(self):
+ def set_ip_address_type(self) -> str:
arn = self._get_param("LoadBalancerArn")
ip_type = self._get_param("IpAddressType")
@@ -500,7 +499,7 @@ class ELBV2Response(BaseResponse):
return template.render(ip_type=ip_type)
@amzn_request_id
- def set_security_groups(self):
+ def set_security_groups(self) -> str:
arn = self._get_param("LoadBalancerArn")
sec_groups = self._get_multi_param("SecurityGroups.member.")
@@ -510,7 +509,7 @@ class ELBV2Response(BaseResponse):
return template.render(sec_groups=sec_groups)
@amzn_request_id
- def set_subnets(self):
+ def set_subnets(self) -> str:
arn = self._get_param("LoadBalancerArn")
subnets = self._get_multi_param("Subnets.member.")
subnet_mappings = self._get_params().get("SubnetMappings", [])
@@ -521,7 +520,7 @@ class ELBV2Response(BaseResponse):
return template.render(subnets=subnet_zone_list)
@amzn_request_id
- def modify_load_balancer_attributes(self):
+ def modify_load_balancer_attributes(self) -> str:
arn = self._get_param("LoadBalancerArn")
attrs = self._get_map_prefix(
"Attributes.member", key_end="Key", value_end="Value"
@@ -533,7 +532,7 @@ class ELBV2Response(BaseResponse):
return template.render(attrs=all_attrs)
@amzn_request_id
- def describe_load_balancer_attributes(self):
+ def describe_load_balancer_attributes(self) -> str:
arn = self._get_param("LoadBalancerArn")
attrs = self.elbv2_backend.describe_load_balancer_attributes(arn)
@@ -541,7 +540,7 @@ class ELBV2Response(BaseResponse):
return template.render(attrs=attrs)
@amzn_request_id
- def modify_target_group(self):
+ def modify_target_group(self) -> str:
arn = self._get_param("TargetGroupArn")
health_check_proto = self._get_param(
@@ -573,7 +572,7 @@ class ELBV2Response(BaseResponse):
return template.render(target_group=target_group)
@amzn_request_id
- def modify_listener(self):
+ def modify_listener(self) -> str:
arn = self._get_param("ListenerArn")
port = self._get_param("Port")
protocol = self._get_param("Protocol")
@@ -595,16 +594,18 @@ class ELBV2Response(BaseResponse):
return template.render(listener=listener)
@amzn_request_id
- def add_listener_certificates(self):
+ def add_listener_certificates(self) -> str:
arn = self._get_param("ListenerArn")
certificates = self._get_list_prefix("Certificates.member")
- certificates = self.elbv2_backend.add_listener_certificates(arn, certificates)
+ certificate_arns = self.elbv2_backend.add_listener_certificates(
+ arn, certificates
+ )
template = self.response_template(ADD_LISTENER_CERTIFICATES_TEMPLATE)
- return template.render(certificates=certificates)
+ return template.render(certificates=certificate_arns)
@amzn_request_id
- def describe_listener_certificates(self):
+ def describe_listener_certificates(self) -> str:
arn = self._get_param("ListenerArn")
certificates = self.elbv2_backend.describe_listener_certificates(arn)
@@ -612,15 +613,13 @@ class ELBV2Response(BaseResponse):
return template.render(certificates=certificates)
@amzn_request_id
- def remove_listener_certificates(self):
+ def remove_listener_certificates(self) -> str:
arn = self._get_param("ListenerArn")
certificates = self._get_list_prefix("Certificates.member")
- certificates = self.elbv2_backend.remove_listener_certificates(
- arn, certificates
- )
+ self.elbv2_backend.remove_listener_certificates(arn, certificates)
template = self.response_template(REMOVE_LISTENER_CERTIFICATES_TEMPLATE)
- return template.render(certificates=certificates)
+ return template.render()
ADD_TAGS_TEMPLATE = """
@@ -1580,7 +1579,7 @@ SET_SECURITY_GROUPS_TEMPLATE = """
- {% for zone_id, subnet_id in subnets %}
+ {% for zone_id, subnet_id in subnets.items() %}
{{ subnet_id }}
{{ zone_id }}
diff --git a/moto/elbv2/utils.py b/moto/elbv2/utils.py
index 5ec7e5be9..32b647f19 100644
--- a/moto/elbv2/utils.py
+++ b/moto/elbv2/utils.py
@@ -1,6 +1,6 @@
-def make_arn_for_load_balancer(account_id, name, region_name):
+def make_arn_for_load_balancer(account_id: str, name: str, region_name: str) -> str:
return f"arn:aws:elasticloadbalancing:{region_name}:{account_id}:loadbalancer/app/{name}/50dc6c495c0c9188"
-def make_arn_for_target_group(account_id, name, region_name):
+def make_arn_for_target_group(account_id: str, name: str, region_name: str) -> str:
return f"arn:aws:elasticloadbalancing:{region_name}:{account_id}:targetgroup/{name}/50dc6c495c0c9188"
diff --git a/moto/utilities/tagging_service.py b/moto/utilities/tagging_service.py
index 3f8859854..6c302de3c 100644
--- a/moto/utilities/tagging_service.py
+++ b/moto/utilities/tagging_service.py
@@ -1,6 +1,6 @@
"""Tag functionality contained in class TaggingService."""
import re
-from typing import Dict, List
+from typing import Dict, List, Optional
class TaggingService:
@@ -43,7 +43,7 @@ class TaggingService:
"""Return True if the ARN has any associated tags, False otherwise."""
return arn in self.tags
- def tag_resource(self, arn: str, tags: List[Dict[str, str]]) -> None:
+ def tag_resource(self, arn: str, tags: Optional[List[Dict[str, str]]]) -> None:
"""Store associated list of dicts with ARN.
Note: the storage is internal to this class instance.
diff --git a/setup.cfg b/setup.cfg
index 08b437f69..188eab56f 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -229,7 +229,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
-files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/eks,moto/elasticache,moto/elasticbeanstalk,moto/elastictranscoder,moto/elb,moto/es,moto/moto_api,moto/neptune
+files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/eks,moto/elasticache,moto/elasticbeanstalk,moto/elastictranscoder,moto/elb,moto/elbv2,moto/es,moto/moto_api,moto/neptune
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract