Route53Resolver: Add resolver rule-related APIs (#4603)

This commit is contained in:
kbalk 2021-11-22 12:57:07 -05:00 committed by GitHub
parent ea67a15dcd
commit f4ec2fc462
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 886 additions and 28 deletions

View File

@ -3675,7 +3675,7 @@
## route53resolver ## route53resolver
<details> <details>
<summary>14% implemented</summary> <summary>21% implemented</summary>
- [ ] associate_firewall_rule_group - [ ] associate_firewall_rule_group
- [ ] associate_resolver_endpoint_ip_address - [ ] associate_resolver_endpoint_ip_address
@ -3686,13 +3686,13 @@
- [ ] create_firewall_rule_group - [ ] create_firewall_rule_group
- [X] create_resolver_endpoint - [X] create_resolver_endpoint
- [ ] create_resolver_query_log_config - [ ] create_resolver_query_log_config
- [ ] create_resolver_rule - [X] create_resolver_rule
- [ ] delete_firewall_domain_list - [ ] delete_firewall_domain_list
- [ ] delete_firewall_rule - [ ] delete_firewall_rule
- [ ] delete_firewall_rule_group - [ ] delete_firewall_rule_group
- [X] delete_resolver_endpoint - [X] delete_resolver_endpoint
- [ ] delete_resolver_query_log_config - [ ] delete_resolver_query_log_config
- [ ] delete_resolver_rule - [X] delete_resolver_rule
- [ ] disassociate_firewall_rule_group - [ ] disassociate_firewall_rule_group
- [ ] disassociate_resolver_endpoint_ip_address - [ ] disassociate_resolver_endpoint_ip_address
- [ ] disassociate_resolver_query_log_config - [ ] disassociate_resolver_query_log_config
@ -3708,7 +3708,7 @@
- [ ] get_resolver_query_log_config - [ ] get_resolver_query_log_config
- [ ] get_resolver_query_log_config_association - [ ] get_resolver_query_log_config_association
- [ ] get_resolver_query_log_config_policy - [ ] get_resolver_query_log_config_policy
- [ ] get_resolver_rule - [X] get_resolver_rule
- [ ] get_resolver_rule_association - [ ] get_resolver_rule_association
- [ ] get_resolver_rule_policy - [ ] get_resolver_rule_policy
- [ ] import_firewall_domains - [ ] import_firewall_domains
@ -3725,7 +3725,7 @@
- [ ] list_resolver_query_log_config_associations - [ ] list_resolver_query_log_config_associations
- [ ] list_resolver_query_log_configs - [ ] list_resolver_query_log_configs
- [ ] list_resolver_rule_associations - [ ] list_resolver_rule_associations
- [ ] list_resolver_rules - [X] list_resolver_rules
- [X] list_tags_for_resource - [X] list_tags_for_resource
- [ ] put_firewall_rule_group_policy - [ ] put_firewall_rule_group_policy
- [ ] put_resolver_query_log_config_policy - [ ] put_resolver_query_log_config_policy
@ -4865,4 +4865,4 @@
- workmailmessageflow - workmailmessageflow
- workspaces - workspaces
- xray - xray
</details> </details>

View File

@ -39,10 +39,10 @@ route53resolver
NOTE: IPv6 IPs are currently not being filtered when NOTE: IPv6 IPs are currently not being filtered when
calculating the create_resolver_endpoint() IpAddresses. calculating the create_resolver_endpoint() IpAddresses.
- [ ] create_resolver_query_log_config - [ ] create_resolver_query_log_config
- [ ] create_resolver_rule - [X] create_resolver_rule
- [ ] delete_firewall_domain_list - [ ] delete_firewall_domain_list
- [ ] delete_firewall_rule - [ ] delete_firewall_rule
- [ ] delete_firewall_rule_group - [ ] delete_firewall_rule_group
@ -50,7 +50,7 @@ route53resolver
Delete a resolver endpoint. Delete a resolver endpoint.
- [ ] delete_resolver_query_log_config - [ ] delete_resolver_query_log_config
- [ ] delete_resolver_rule - [X] delete_resolver_rule
- [ ] disassociate_firewall_rule_group - [ ] disassociate_firewall_rule_group
- [ ] disassociate_resolver_endpoint_ip_address - [ ] disassociate_resolver_endpoint_ip_address
- [ ] disassociate_resolver_query_log_config - [ ] disassociate_resolver_query_log_config
@ -68,7 +68,7 @@ route53resolver
- [ ] get_resolver_query_log_config - [ ] get_resolver_query_log_config
- [ ] get_resolver_query_log_config_association - [ ] get_resolver_query_log_config_association
- [ ] get_resolver_query_log_config_policy - [ ] get_resolver_query_log_config_policy
- [ ] get_resolver_rule - [X] get_resolver_rule
- [ ] get_resolver_rule_association - [ ] get_resolver_rule_association
- [ ] get_resolver_rule_policy - [ ] get_resolver_rule_policy
- [ ] import_firewall_domains - [ ] import_firewall_domains
@ -89,7 +89,7 @@ route53resolver
- [ ] list_resolver_query_log_config_associations - [ ] list_resolver_query_log_config_associations
- [ ] list_resolver_query_log_configs - [ ] list_resolver_query_log_configs
- [ ] list_resolver_rule_associations - [ ] list_resolver_rule_associations
- [ ] list_resolver_rules - [X] list_resolver_rules
- [X] list_tags_for_resource - [X] list_tags_for_resource
List all tags for the given resource. List all tags for the given resource.

View File

@ -1,7 +1,7 @@
"""Route53ResolverBackend class with methods for supported APIs.""" """Route53ResolverBackend class with methods for supported APIs."""
from collections import defaultdict from collections import defaultdict
from datetime import datetime, timezone from datetime import datetime, timezone
from ipaddress import ip_address, ip_network from ipaddress import ip_address, ip_network, IPv4Address
import re import re
from boto3 import Session from boto3 import Session
@ -29,6 +29,83 @@ from moto.utilities.tagging_service import TaggingService
CAMEL_TO_SNAKE_PATTERN = re.compile(r"(?<!^)(?=[A-Z])") CAMEL_TO_SNAKE_PATTERN = re.compile(r"(?<!^)(?=[A-Z])")
class ResolverRule(BaseModel): # pylint: disable=too-many-instance-attributes
"""Representation of a fake Route53 Resolver Rule."""
MAX_TAGS_PER_RESOLVER_RULE = 200
MAX_RULES_PER_REGION = 1000
# There are two styles of filter names and either will be transformed
# into lowercase snake.
FILTER_NAMES = [
"creator_request_id",
"domain_name",
"name",
"resolver_endpoint_id",
"status",
"rule_type", # actual filter is "Type"
]
def __init__(
self,
region,
rule_id,
creator_request_id,
rule_type,
domain_name,
target_ips=None,
resolver_endpoint_id=None,
name=None,
): # pylint: disable=too-many-arguments
self.region = region
self.creator_request_id = creator_request_id
self.name = name
self.rule_id = rule_id
self.rule_type = rule_type
self.domain_name = domain_name + "."
self.target_ips = target_ips
self.resolver_endpoint_id = resolver_endpoint_id
# Constructed members.
self.id = rule_id # pylint: disable=invalid-name
self.status = "COMPLETE"
# The status message should contain a trace Id which is the value
# of X-Amzn-Trace-Id. We don't have that info, so a random number
# of similar format and length will be used.
self.status_message = (
f"[Trace id: 1-{get_random_hex(8)}-{get_random_hex(24)}] "
f"Successfully created Resolver Rule"
)
self.share_status = "SHARED_WITH_ME"
self.creation_time = datetime.now(timezone.utc).isoformat()
self.modification_time = datetime.now(timezone.utc).isoformat()
@property
def arn(self):
"""Return ARN for this resolver rule."""
return f"arn:aws:route53resolver:{self.region}:{ACCOUNT_ID}:resolver-rule/{self.id}"
def description(self):
"""Return a dictionary of relevant info for this resolver rule."""
return {
"Id": self.id,
"CreatorRequestId": self.creator_request_id,
"Arn": self.arn,
"DomainName": self.domain_name,
"Status": self.status,
"StatusMessage": self.status_message,
"RuleType": self.rule_type,
"Name": self.name,
"TargetIps": self.target_ips,
"ResolverEndpointId": self.resolver_endpoint_id,
"OwnerId": ACCOUNT_ID,
"ShareStatus": self.share_status,
"CreationTime": self.creation_time,
"ModificationTime": self.modification_time,
}
class ResolverEndpoint(BaseModel): # pylint: disable=too-many-instance-attributes class ResolverEndpoint(BaseModel): # pylint: disable=too-many-instance-attributes
"""Representation of a fake Route53 Resolver Endpoint.""" """Representation of a fake Route53 Resolver Endpoint."""
@ -79,7 +156,7 @@ class ResolverEndpoint(BaseModel): # pylint: disable=too-many-instance-attribut
# of similar format and length will be used. # of similar format and length will be used.
self.status_message = ( self.status_message = (
f"[Trace id: 1-{get_random_hex(8)}-{get_random_hex(24)}] " f"[Trace id: 1-{get_random_hex(8)}-{get_random_hex(24)}] "
f"Creating the Resolver Endpoint" f"Successfully created Resolver Endpoint"
) )
self.creation_time = datetime.now(timezone.utc).isoformat() self.creation_time = datetime.now(timezone.utc).isoformat()
self.modification_time = datetime.now(timezone.utc).isoformat() self.modification_time = datetime.now(timezone.utc).isoformat()
@ -174,6 +251,7 @@ class Route53ResolverBackend(BaseBackend):
def __init__(self, region_name=None): def __init__(self, region_name=None):
self.region_name = region_name self.region_name = region_name
self.resolver_endpoints = {} # Key is self-generated ID (endpoint_id) self.resolver_endpoints = {} # Key is self-generated ID (endpoint_id)
self.resolver_rules = {} # Key is self-generated ID (rule_id)
self.tagger = TaggingService() self.tagger = TaggingService()
def reset(self): def reset(self):
@ -310,6 +388,100 @@ class Route53ResolverBackend(BaseBackend):
self.tagger.tag_resource(resolver_endpoint.arn, tags or []) self.tagger.tag_resource(resolver_endpoint.arn, tags or [])
return resolver_endpoint return resolver_endpoint
def create_resolver_rule(
self,
region,
creator_request_id,
name,
rule_type,
domain_name,
target_ips,
resolver_endpoint_id,
tags,
): # pylint: disable=too-many-arguments
"""Return description for a newly created resolver rule."""
validate_args(
[
("creatorRequestId", creator_request_id),
("ruleType", rule_type),
("domainName", domain_name),
("name", name),
*[("targetIps.port", x) for x in target_ips],
("resolverEndpointId", resolver_endpoint_id),
]
)
errmsg = self.tagger.validate_tags(
tags or [], limit=ResolverRule.MAX_TAGS_PER_RESOLVER_RULE,
)
if errmsg:
raise TagValidationException(errmsg)
rules = [x for x in self.resolver_rules.values() if x.region == region]
if len(rules) > ResolverRule.MAX_RULES_PER_REGION:
# Did not verify that this is the actual error message.
raise LimitExceededException(
f"Account '{ACCOUNT_ID}' has exceeded 'max-rules'"
)
# Per the AWS documentation and as seen with the AWS console, target
# ips are only relevant when the value of Rule is FORWARD. However,
# boto3 ignores this condition and so shall we.
for ip_addr in [x["Ip"] for x in target_ips]:
try:
# boto3 fails with an InternalServiceException if IPv6
# addresses are used, which isn't helpful.
if not isinstance(ip_address(ip_addr), IPv4Address):
raise InvalidParameterException(
f"Only IPv4 addresses may be used: '{ip_addr}'"
)
except ValueError as exc:
raise InvalidParameterException(
f"Invalid IP address: '{ip_addr}'"
) from exc
# The boto3 documentation indicates that ResolverEndpoint is
# optional, as does the AWS documention. But if resolver_endpoint_id
# is set to None or an empty string, it results in boto3 raising
# a ParamValidationError either regarding the type or len of string.
if resolver_endpoint_id:
if resolver_endpoint_id not in [
x.id for x in self.resolver_endpoints.values()
]:
raise ResourceNotFoundException(
f"Resolver endpoint with ID '{resolver_endpoint_id}' does not exist."
)
if rule_type == "SYSTEM":
raise InvalidRequestException(
"Cannot specify resolver endpoint ID and target IP "
"for SYSTEM type resolver rule"
)
if creator_request_id in [
x.creator_request_id for x in self.resolver_rules.values()
]:
raise ResourceExistsException(
f"Resolver rule with creator request ID "
f"'{creator_request_id}' already exists"
)
rule_id = f"rslvr-rr-{get_random_hex(17)}"
resolver_rule = ResolverRule(
region,
rule_id,
creator_request_id,
rule_type,
domain_name,
target_ips,
resolver_endpoint_id,
name,
)
self.resolver_rules[rule_id] = resolver_rule
self.tagger.tag_resource(resolver_rule.arn, tags or [])
return resolver_rule
def _validate_resolver_endpoint_id(self, resolver_endpoint_id): def _validate_resolver_endpoint_id(self, resolver_endpoint_id):
"""Raise an exception if the id is invalid or unknown.""" """Raise an exception if the id is invalid or unknown."""
validate_args([("resolverEndpointId", resolver_endpoint_id)]) validate_args([("resolverEndpointId", resolver_endpoint_id)])
@ -325,15 +497,39 @@ class Route53ResolverBackend(BaseBackend):
resolver_endpoint = self.resolver_endpoints.pop(resolver_endpoint_id) resolver_endpoint = self.resolver_endpoints.pop(resolver_endpoint_id)
resolver_endpoint.status = "DELETING" resolver_endpoint.status = "DELETING"
resolver_endpoint.status_message = resolver_endpoint.status_message.replace( resolver_endpoint.status_message = resolver_endpoint.status_message.replace(
"Creating", "Deleting" "Successfully created", "Deleting"
) )
return resolver_endpoint return resolver_endpoint
def _validate_resolver_rule_id(self, resolver_rule_id):
"""Raise an exception if the id is invalid or unknown."""
validate_args([("resolverRuleId", resolver_rule_id)])
if resolver_rule_id not in self.resolver_rules:
raise ResourceNotFoundException(
f"Resolver rule with ID '{resolver_rule_id}' does not exist"
)
def delete_resolver_rule(self, resolver_rule_id):
"""Delete a resolver rule."""
self._validate_resolver_rule_id(resolver_rule_id)
self.tagger.delete_all_tags_for_resource(resolver_rule_id)
resolver_rule = self.resolver_rules.pop(resolver_rule_id)
resolver_rule.status = "DELETING"
resolver_rule.status_message = resolver_rule.status_message.replace(
"Successfully created", "Deleting"
)
return resolver_rule
def get_resolver_endpoint(self, resolver_endpoint_id): def get_resolver_endpoint(self, resolver_endpoint_id):
"""Return info for specified resolver endpoint.""" """Return info for specified resolver endpoint."""
self._validate_resolver_endpoint_id(resolver_endpoint_id) self._validate_resolver_endpoint_id(resolver_endpoint_id)
return self.resolver_endpoints[resolver_endpoint_id] return self.resolver_endpoints[resolver_endpoint_id]
def get_resolver_rule(self, resolver_rule_id):
"""Return info for specified resolver rule."""
self._validate_resolver_rule_id(resolver_rule_id)
return self.resolver_rules[resolver_rule_id]
@paginate(pagination_model=PAGINATION_MODEL) @paginate(pagination_model=PAGINATION_MODEL)
def list_resolver_endpoint_ip_addresses( def list_resolver_endpoint_ip_addresses(
self, resolver_endpoint_id, next_token=None, max_results=None, self, resolver_endpoint_id, next_token=None, max_results=None,
@ -358,6 +554,8 @@ class Route53ResolverBackend(BaseBackend):
filter_name = "host_vpc_id" filter_name = "host_vpc_id"
elif filter_name == "HostVpcId": elif filter_name == "HostVpcId":
filter_name = "WRONG" filter_name = "WRONG"
elif filter_name in ["Type", "TYPE"]:
filter_name = "rule_type"
elif not filter_name.isupper(): elif not filter_name.isupper():
filter_name = CAMEL_TO_SNAKE_PATTERN.sub("_", filter_name) filter_name = CAMEL_TO_SNAKE_PATTERN.sub("_", filter_name)
rr_filter["Field"] = filter_name.lower() rr_filter["Field"] = filter_name.lower()
@ -410,22 +608,42 @@ class Route53ResolverBackend(BaseBackend):
return endpoints return endpoints
@paginate(pagination_model=PAGINATION_MODEL) @paginate(pagination_model=PAGINATION_MODEL)
def list_tags_for_resource( def list_resolver_rules(
self, resource_arn, next_token=None, max_results=None, self, filters, next_token=None, max_results=None,
): # pylint: disable=unused-argument ): # pylint: disable=unused-argument
"""List all tags for the given resource.""" """List all resolver rules, using filters if specified."""
self._matched_arn(resource_arn) if not filters:
return self.tagger.list_tags_for_resource(resource_arn).get("Tags") filters = []
self._add_field_name_to_filter(filters)
self._validate_filters(filters, ResolverRule.FILTER_NAMES)
rules = []
for rule in sorted(self.resolver_rules.values(), key=lambda x: x.name):
if self._matches_all_filters(rule, filters):
rules.append(rule)
return rules
def _matched_arn(self, resource_arn): def _matched_arn(self, resource_arn):
"""Given ARN, raise exception if there is no corresponding resource.""" """Given ARN, raise exception if there is no corresponding resource."""
for resolver_endpoint in self.resolver_endpoints.values(): for resolver_endpoint in self.resolver_endpoints.values():
if resolver_endpoint.arn == resource_arn: if resolver_endpoint.arn == resource_arn:
return return
for resolver_rule in self.resolver_rules.values():
if resolver_rule.arn == resource_arn:
return
raise ResourceNotFoundException( raise ResourceNotFoundException(
f"Resolver endpoint with ID '{resource_arn}' does not exist" f"Resolver endpoint with ID '{resource_arn}' does not exist"
) )
@paginate(pagination_model=PAGINATION_MODEL)
def list_tags_for_resource(
self, resource_arn, next_token=None, max_results=None,
): # pylint: disable=unused-argument
"""List all tags for the given resource."""
self._matched_arn(resource_arn)
return self.tagger.list_tags_for_resource(resource_arn).get("Tags")
def tag_resource(self, resource_arn, tags): def tag_resource(self, resource_arn, tags):
"""Add or overwrite one or more tags for specified resource.""" """Add or overwrite one or more tags for specified resource."""
self._matched_arn(resource_arn) self._matched_arn(resource_arn)

View File

@ -35,6 +35,27 @@ class Route53ResolverResponse(BaseResponse):
) )
return json.dumps({"ResolverEndpoint": resolver_endpoint.description()}) return json.dumps({"ResolverEndpoint": resolver_endpoint.description()})
def create_resolver_rule(self):
"""Specify which Resolver enpoint the queries will pass through."""
creator_request_id = self._get_param("CreatorRequestId")
name = self._get_param("Name")
rule_type = self._get_param("RuleType")
domain_name = self._get_param("DomainName")
target_ips = self._get_param("TargetIps", [])
resolver_endpoint_id = self._get_param("ResolverEndpointId")
tags = self._get_param("Tags", [])
resolver_rule = self.route53resolver_backend.create_resolver_rule(
region=self.region,
creator_request_id=creator_request_id,
name=name,
rule_type=rule_type,
domain_name=domain_name,
target_ips=target_ips,
resolver_endpoint_id=resolver_endpoint_id,
tags=tags,
)
return json.dumps({"ResolverRule": resolver_rule.description()})
def delete_resolver_endpoint(self): def delete_resolver_endpoint(self):
"""Delete a Resolver endpoint.""" """Delete a Resolver endpoint."""
resolver_endpoint_id = self._get_param("ResolverEndpointId") resolver_endpoint_id = self._get_param("ResolverEndpointId")
@ -43,6 +64,14 @@ class Route53ResolverResponse(BaseResponse):
) )
return json.dumps({"ResolverEndpoint": resolver_endpoint.description()}) return json.dumps({"ResolverEndpoint": resolver_endpoint.description()})
def delete_resolver_rule(self):
"""Delete a Resolver rule."""
resolver_rule_id = self._get_param("ResolverRuleId")
resolver_rule = self.route53resolver_backend.delete_resolver_rule(
resolver_rule_id=resolver_rule_id,
)
return json.dumps({"ResolverRule": resolver_rule.description()})
def get_resolver_endpoint(self): def get_resolver_endpoint(self):
"""Return info about a specific Resolver endpoint.""" """Return info about a specific Resolver endpoint."""
resolver_endpoint_id = self._get_param("ResolverEndpointId") resolver_endpoint_id = self._get_param("ResolverEndpointId")
@ -51,6 +80,14 @@ class Route53ResolverResponse(BaseResponse):
) )
return json.dumps({"ResolverEndpoint": resolver_endpoint.description()}) return json.dumps({"ResolverEndpoint": resolver_endpoint.description()})
def get_resolver_rule(self):
"""Return info about a specific Resolver rule."""
resolver_rule_id = self._get_param("ResolverRuleId")
resolver_rule = self.route53resolver_backend.get_resolver_rule(
resolver_rule_id=resolver_rule_id,
)
return json.dumps({"ResolverRule": resolver_rule.description()})
def list_resolver_endpoint_ip_addresses(self): def list_resolver_endpoint_ip_addresses(self):
"""Returns list of IP addresses for specified Resolver endpoint.""" """Returns list of IP addresses for specified Resolver endpoint."""
resolver_endpoint_id = self._get_param("ResolverEndpointId") resolver_endpoint_id = self._get_param("ResolverEndpointId")
@ -101,6 +138,27 @@ class Route53ResolverResponse(BaseResponse):
response["NextToken"] = next_token response["NextToken"] = next_token
return json.dumps(response) return json.dumps(response)
def list_resolver_rules(self):
"""Returns list of all Resolver rules, filtered if specified."""
filters = self._get_param("Filters")
next_token = self._get_param("NextToken")
max_results = self._get_param("MaxResults", 10)
validate_args([("maxResults", max_results)])
try:
(rules, next_token) = self.route53resolver_backend.list_resolver_rules(
filters, next_token=next_token, max_results=max_results
)
except InvalidToken as exc:
raise InvalidNextTokenException() from exc
response = {
"ResolverRules": [x.description() for x in rules],
"MaxResults": max_results,
}
if next_token:
response["NextToken"] = next_token
return json.dumps(response)
def list_tags_for_resource(self): def list_tags_for_resource(self):
"""Lists all tags for the given resource.""" """Lists all tags for the given resource."""
resource_arn = self._get_param("ResourceArn") resource_arn = self._get_param("ResourceArn")

View File

@ -13,6 +13,12 @@ PAGINATION_MODEL = {
"limit_default": 100, "limit_default": 100,
"page_ending_range_keys": ["id"], "page_ending_range_keys": ["id"],
}, },
"list_resolver_rules": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"page_ending_range_keys": ["id"],
},
"list_tags_for_resource": { "list_tags_for_resource": {
"input_token": "next_token", "input_token": "next_token",
"limit_key": "max_results", "limit_key": "max_results",

View File

@ -18,12 +18,16 @@ def validate_args(validators):
validation_map = { validation_map = {
"creatorRequestId": validate_creator_request_id, "creatorRequestId": validate_creator_request_id,
"direction": validate_direction, "direction": validate_direction,
"resolverEndpointId": validate_endpoint_id, "domainName": validate_domain_name,
"ipAddresses": validate_ip_addresses, "ipAddresses": validate_ip_addresses,
"ipAddresses.subnetId": validate_subnets,
"maxResults": validate_max_results, "maxResults": validate_max_results,
"name": validate_name, "name": validate_name,
"resolverEndpointId": validate_endpoint_id,
"resolverRuleId": validate_rule_id,
"ruleType": validate_rule_type,
"securityGroupIds": validate_security_group_ids, "securityGroupIds": validate_security_group_ids,
"ipAddresses.subnetId": validate_subnets, "targetIps.port": validate_target_port,
} }
err_msgs = [] err_msgs = []
@ -38,7 +42,7 @@ def validate_args(validators):
def validate_creator_request_id(value): def validate_creator_request_id(value):
"""Raise exception if the creator_request id has invalid length.""" """Raise exception if the creator_request_id has invalid length."""
if value and len(value) > 255: if value and len(value) > 255:
return "have length less than or equal to 255" return "have length less than or equal to 255"
return "" return ""
@ -51,9 +55,16 @@ def validate_direction(value):
return "" return ""
def validate_domain_name(value):
"""Raise exception if the domain_name has invalid length."""
if len(value) > 256:
return "have length less than or equal to 256"
return ""
def validate_endpoint_id(value): def validate_endpoint_id(value):
"""Raise exception if resolver endpoint id has invalid length.""" """Raise exception if resolver endpoint id has invalid length."""
if len(value) > 64: if value and len(value) > 64:
return "have length less than or equal to 64" return "have length less than or equal to 64"
return "" return ""
@ -83,6 +94,20 @@ def validate_name(value):
return "" return ""
def validate_rule_id(value):
"""Raise exception if resolver rule id has invalid length."""
if value and len(value) > 64:
return "have length less than or equal to 64"
return ""
def validate_rule_type(value):
"""Raise exception if rule_type not one of the allowed values."""
if value and value not in ["FORWARD", "SYSTEM", "RECURSIVE"]:
return "satisfy enum value set: [FORWARD, SYSTEM, RECURSIVE]"
return ""
def validate_security_group_ids(value): def validate_security_group_ids(value):
"""Raise exception if IPs fail to match length constraint.""" """Raise exception if IPs fail to match length constraint."""
# Too many security group IDs is an InvalidParameterException. # Too many security group IDs is an InvalidParameterException.
@ -101,3 +126,10 @@ def validate_subnets(value):
if len(subnet_id) > 32: if len(subnet_id) > 32:
return "have length less than or equal to 32" return "have length less than or equal to 32"
return "" return ""
def validate_target_port(value):
"""Raise exception if target port fails to match length constraint."""
if value and value["Port"] > 65535:
return "have value less than or equal to 65535"
return ""

View File

@ -335,7 +335,7 @@ def test_route53resolver_create_resolver_endpoint(): # pylint: disable=too-many
assert endpoint["IpAddressCount"] == 2 assert endpoint["IpAddressCount"] == 2
assert endpoint["HostVPCId"] == vpc_id assert endpoint["HostVPCId"] == vpc_id
assert endpoint["Status"] == "OPERATIONAL" assert endpoint["Status"] == "OPERATIONAL"
assert "Creating the Resolver Endpoint" in endpoint["StatusMessage"] assert "Successfully created Resolver Endpoint" in endpoint["StatusMessage"]
time_format = "%Y-%m-%dT%H:%M:%S.%f+00:00" time_format = "%Y-%m-%dT%H:%M:%S.%f+00:00"
now = datetime.now(timezone.utc).replace(tzinfo=None) now = datetime.now(timezone.utc).replace(tzinfo=None)
@ -351,7 +351,7 @@ def test_route53resolver_create_resolver_endpoint(): # pylint: disable=too-many
@mock_ec2 @mock_ec2
@mock_route53resolver @mock_route53resolver
def test_route53resolver_other_create_resolver_endpoint_errors(): def test_route53resolver_other_create_resolver_endpoint_errors():
"""Test good delete_resolver_endpoint API calls.""" """Test other error scenarios for create_resolver_endpoint API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION) client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION) ec2_client = boto3.client("ec2", region_name=TEST_REGION)
@ -632,7 +632,7 @@ def test_route53resolver_bad_list_resolver_endpoint_ip_addresses():
@mock_ec2 @mock_ec2
@mock_route53resolver @mock_route53resolver
def test_route53resolver_list_resolver_endpoints(): def test_route53resolver_list_resolver_endpoints():
"""Test good list_resolver_endpoint API calls.""" """Test good list_resolver_endpoints API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION) client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION) ec2_client = boto3.client("ec2", region_name=TEST_REGION)
random_num = get_random_hex(10) random_num = get_random_hex(10)
@ -673,7 +673,7 @@ def test_route53resolver_list_resolver_endpoints():
@mock_ec2 @mock_ec2
@mock_route53resolver @mock_route53resolver
def test_route53resolver_list_resolver_endpoints_filters(): def test_route53resolver_list_resolver_endpoints_filters():
"""Test good list_resolver_endpoint API calls that use filters.""" """Test good list_resolver_endpoints API calls that use filters."""
client = boto3.client("route53resolver", region_name=TEST_REGION) client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION) ec2_client = boto3.client("ec2", region_name=TEST_REGION)
random_num = get_random_hex(10) random_num = get_random_hex(10)
@ -736,6 +736,10 @@ def test_route53resolver_list_resolver_endpoints_filters():
Filters=[{"Name": "IpAddressCount", "Values": ["4"]}] Filters=[{"Name": "IpAddressCount", "Values": ["4"]}]
) )
assert len(response["ResolverEndpoints"]) == 4 assert len(response["ResolverEndpoints"]) == 4
response = client.list_resolver_endpoints(
Filters=[{"Name": "IpAddressCount", "Values": ["0", "7"]}]
)
assert len(response["ResolverEndpoints"]) == 0
response = client.list_resolver_endpoints( response = client.list_resolver_endpoints(
Filters=[{"Name": "Name", "Values": [f"F1-{random_num}"]}] Filters=[{"Name": "Name", "Values": [f"F1-{random_num}"]}]
@ -770,7 +774,7 @@ def test_route53resolver_list_resolver_endpoints_filters():
@mock_route53resolver @mock_route53resolver
def test_route53resolver_bad_list_resolver_endpoints_filters(): def test_route53resolver_bad_list_resolver_endpoints_filters():
"""Test bad list_resolver_endpoint API calls that use filters.""" """Test bad list_resolver_endpoints API calls that use filters."""
client = boto3.client("route53resolver", region_name=TEST_REGION) client = boto3.client("route53resolver", region_name=TEST_REGION)
# botocore barfs on an empty "Values": # botocore barfs on an empty "Values":
@ -784,6 +788,14 @@ def test_route53resolver_bad_list_resolver_endpoints_filters():
assert err["Code"] == "InvalidParameterException" assert err["Code"] == "InvalidParameterException"
assert "The filter 'foo' is invalid" in err["Message"] assert "The filter 'foo' is invalid" in err["Message"]
with pytest.raises(ClientError) as exc:
client.list_resolver_endpoints(
Filters=[{"Name": "HostVpcId", "Values": ["bar"]}]
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert "The filter 'HostVpcId' is invalid" in err["Message"]
@mock_ec2 @mock_ec2
@mock_route53resolver @mock_route53resolver

View File

@ -0,0 +1,532 @@
"""Unit tests for route53resolver rule-related APIs."""
from datetime import datetime, timezone
import boto3
from botocore.exceptions import ClientError
import pytest
from moto import mock_route53resolver
from moto.core import ACCOUNT_ID
from moto.core.utils import get_random_hex
from moto.ec2 import mock_ec2
from .test_route53resolver_endpoint import TEST_REGION, create_test_endpoint
def create_test_rule(client, name=None, tags=None):
"""Create an rule that can be used for testing purposes.
Can't be used for unit tests that need to know/test the arguments.
"""
if not tags:
tags = []
random_num = get_random_hex(10)
resolver_rule = client.create_resolver_rule(
CreatorRequestId=random_num,
Name=name if name else "X" + random_num,
RuleType="FORWARD",
DomainName=f"X{random_num}.com",
TargetIps=[
{"Ip": "10.0.1.200", "Port": 123},
{"Ip": "10.0.0.20", "Port": 456},
],
# ResolverEndpointId=random_num -- will test this separately
Tags=tags,
)
return resolver_rule["ResolverRule"]
@mock_route53resolver
def test_route53resolver_invalid_create_rule_args():
"""Test invalid arguments to the create_resolver_rule API."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
random_num = get_random_hex(10)
# Verify ValidationException error messages are accumulated properly:
# - creator requestor ID that exceeds the allowed length of 255.
# - name that exceeds the allowed length of 64.
# - rule_type that's not FORWARD, SYSTEM or RECURSIVE.
# - domain_name that exceeds the allowed length of 256.
long_id = random_num * 25 + "123456"
long_name = random_num * 6 + "abcde"
bad_rule_type = "foo"
long_domain_name = "bar" * 86
with pytest.raises(ClientError) as exc:
client.create_resolver_rule(
CreatorRequestId=long_id,
Name=long_name,
RuleType=bad_rule_type,
DomainName=long_domain_name,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "4 validation errors detected" in err["Message"]
assert (
f"Value '{long_id}' at 'creatorRequestId' failed to satisfy constraint: "
f"Member must have length less than or equal to 255"
) in err["Message"]
assert (
f"Value '{long_name}' at 'name' failed to satisfy constraint: "
f"Member must have length less than or equal to 64"
) in err["Message"]
assert (
f"Value '{bad_rule_type}' at 'ruleType' failed to satisfy constraint: "
f"Member must satisfy enum value set: [FORWARD, SYSTEM, RECURSIVE]"
) in err["Message"]
assert (
f"Value '{long_domain_name}' at 'domainName' failed to satisfy "
f"constraint: Member must have length less than or equal to 256"
) in err["Message"]
# Some single ValidationException errors ...
bad_target_ips = [
{"Ip": "10.1.0.22", "Port": 5},
{"Ip": "10.1.0.23", "Port": 700000},
{"Ip": "10.1.0.24", "Port": 70},
]
with pytest.raises(ClientError) as exc:
client.create_resolver_rule(
CreatorRequestId=random_num,
Name="A" + random_num,
RuleType="FORWARD",
DomainName=f"{random_num}.com",
TargetIps=bad_target_ips,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
f"Value '{bad_target_ips[1]}' at 'targetIps.port' failed to "
f"satisfy constraint: Member must have value less than or equal to "
f"65535"
) in err["Message"]
too_long_resolver_endpoint_id = "foo" * 25
with pytest.raises(ClientError) as exc:
client.create_resolver_rule(
CreatorRequestId=random_num,
Name="A" + random_num,
RuleType="FORWARD",
DomainName=f"{random_num}.com",
ResolverEndpointId=too_long_resolver_endpoint_id,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
f"Value '{too_long_resolver_endpoint_id}' at 'resolverEndpointId' "
f"failed to satisfy constraint: Member must have length less than or "
f"equal to 64"
) in err["Message"]
@mock_ec2
@mock_route53resolver
def test_route53resolver_create_resolver_rule(): # pylint: disable=too-many-locals
"""Test good create_resolver_rule API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
random_num = get_random_hex(10)
# Create a good endpoint that we can use to test.
created_endpoint = create_test_endpoint(client, ec2_client)
endpoint_id = created_endpoint["Id"]
creator_request_id = random_num
name = "X" + random_num
domain_name = f"{random_num}.test"
target_ips = [{"Ip": "1.2.3.4", "Port": 5}]
response = client.create_resolver_rule(
CreatorRequestId=creator_request_id,
Name=name,
RuleType="FORWARD",
DomainName=domain_name,
TargetIps=target_ips,
ResolverEndpointId=endpoint_id,
)
rule = response["ResolverRule"]
id_value = rule["Id"]
assert id_value.startswith("rslvr-rr-")
assert rule["CreatorRequestId"] == creator_request_id
assert (
rule["Arn"]
== f"arn:aws:route53resolver:{TEST_REGION}:{ACCOUNT_ID}:resolver-rule/{id_value}"
)
assert rule["DomainName"] == domain_name + "."
assert rule["Status"] == "COMPLETE"
assert "Successfully created Resolver Rule" in rule["StatusMessage"]
assert rule["RuleType"] == "FORWARD"
assert rule["Name"] == name
assert len(rule["TargetIps"]) == 1
assert rule["TargetIps"][0]["Ip"] == target_ips[0]["Ip"]
assert rule["TargetIps"][0]["Port"] == target_ips[0]["Port"]
assert rule["ResolverEndpointId"] == endpoint_id
assert rule["OwnerId"] == ACCOUNT_ID
assert rule["ShareStatus"] == "SHARED_WITH_ME"
time_format = "%Y-%m-%dT%H:%M:%S.%f+00:00"
now = datetime.now(timezone.utc).replace(tzinfo=None)
creation_time = datetime.strptime(rule["CreationTime"], time_format)
creation_time = creation_time.replace(tzinfo=None)
assert creation_time <= now
modification_time = datetime.strptime(rule["ModificationTime"], time_format)
modification_time = modification_time.replace(tzinfo=None)
assert modification_time <= now
@mock_ec2
@mock_route53resolver
def test_route53resolver_bad_create_resolver_rule():
"""Test error scenarios for create_resolver_rule API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
random_num = get_random_hex(10)
# Create a good endpoint and rule that we can use to test.
created_endpoint = create_test_endpoint(client, ec2_client)
endpoint_id = created_endpoint["Id"]
created_rule = create_test_rule(client)
creator_request_id = created_rule["CreatorRequestId"]
# Attempt to create another rule with the same creator request id.
with pytest.raises(ClientError) as exc:
client.create_resolver_rule(
CreatorRequestId=creator_request_id,
Name="B" + random_num,
RuleType="FORWARD",
DomainName=f"{random_num}.test",
TargetIps=[{"Ip": "1.2.3.4", "Port": 5}],
ResolverEndpointId=endpoint_id,
)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceExistsException"
assert (
f"Resolver rule with creator request ID '{creator_request_id}' already exists"
) in err["Message"]
# Attempt to create a rule with a IPv6 address.
with pytest.raises(ClientError) as exc:
client.create_resolver_rule(
CreatorRequestId=get_random_hex(10),
Name="B" + random_num,
RuleType="FORWARD",
DomainName=f"{random_num}.test",
TargetIps=[{"Ip": "201:db8:1234::", "Port": 5}],
ResolverEndpointId=endpoint_id,
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert "Only IPv4 addresses may be used: '201:db8:1234::'" in err["Message"]
# Attempt to create a rule with an invalid IPv4 address.
with pytest.raises(ClientError) as exc:
client.create_resolver_rule(
CreatorRequestId=get_random_hex(10),
Name="B" + random_num,
RuleType="FORWARD",
DomainName=f"{random_num}.test",
TargetIps=[{"Ip": "20.1.2:", "Port": 5}],
ResolverEndpointId=endpoint_id,
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert "Invalid IP address: '20.1.2:'" in err["Message"]
# Attempt to create a rule with a non-existent resolver endpoint id.
with pytest.raises(ClientError) as exc:
client.create_resolver_rule(
CreatorRequestId=get_random_hex(10),
Name="B" + random_num,
RuleType="FORWARD",
DomainName=f"{random_num}.test",
TargetIps=[{"Ip": "1.2.3.4", "Port": 5}],
ResolverEndpointId="fooey",
)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert "Resolver endpoint with ID 'fooey' does not exist" in err["Message"]
# Create a rule with a resolver endpoint id and a rule type of SYSTEM.
with pytest.raises(ClientError) as exc:
client.create_resolver_rule(
CreatorRequestId=get_random_hex(10),
Name="B" + random_num,
RuleType="SYSTEM",
DomainName=f"{random_num}.test",
TargetIps=[{"Ip": "1.2.3.4", "Port": 5}],
ResolverEndpointId=endpoint_id,
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidRequestException"
assert (
"Cannot specify resolver endpoint ID and target IP for SYSTEM type "
"resolver rule"
) in err["Message"]
# Too many rules.
for _ in range(1000):
create_test_rule(client)
with pytest.raises(ClientError) as exc:
create_test_rule(client)
err = exc.value.response["Error"]
assert err["Code"] == "LimitExceededException"
assert f"Account '{ACCOUNT_ID}' has exceeded 'max-rules" in err["Message"]
@mock_route53resolver
def test_route53resolver_delete_resolver_rule():
"""Test good delete_resolver_rule API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
created_rule = create_test_rule(client)
# Now delete the resolver rule and verify the response.
response = client.delete_resolver_rule(ResolverRuleId=created_rule["Id"])
rule = response["ResolverRule"]
assert rule["Id"] == created_rule["Id"]
assert rule["CreatorRequestId"] == created_rule["CreatorRequestId"]
assert rule["Arn"] == created_rule["Arn"]
assert rule["DomainName"] == created_rule["DomainName"]
assert rule["Status"] == "DELETING"
assert "Deleting" in rule["StatusMessage"]
assert rule["RuleType"] == created_rule["RuleType"]
assert rule["Name"] == created_rule["Name"]
assert rule["TargetIps"] == created_rule["TargetIps"]
assert rule["OwnerId"] == created_rule["OwnerId"]
assert rule["ShareStatus"] == created_rule["ShareStatus"]
assert rule["CreationTime"] == created_rule["CreationTime"]
@mock_route53resolver
def test_route53resolver_bad_delete_resolver_rule():
"""Test delete_resolver_rule API calls with a bad ID."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
random_num = get_random_hex(10)
# Use a resolver rule id that is too long.
long_id = "0123456789" * 6 + "xxxxx"
with pytest.raises(ClientError) as exc:
client.delete_resolver_rule(ResolverRuleId=long_id)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
f"Value '{long_id}' at 'resolverRuleId' failed to satisfy "
f"constraint: Member must have length less than or equal to 64"
) in err["Message"]
# Delete a non-existent resolver rule.
with pytest.raises(ClientError) as exc:
client.delete_resolver_rule(ResolverRuleId=random_num)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert f"Resolver rule with ID '{random_num}' does not exist" in err["Message"]
@mock_route53resolver
def test_route53resolver_get_resolver_rule():
"""Test good get_resolver_rule API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
created_rule = create_test_rule(client)
# Now get the resolver rule and verify the response.
response = client.get_resolver_rule(ResolverRuleId=created_rule["Id"])
rule = response["ResolverRule"]
assert rule["Id"] == created_rule["Id"]
assert rule["CreatorRequestId"] == created_rule["CreatorRequestId"]
assert rule["Arn"] == created_rule["Arn"]
assert rule["DomainName"] == created_rule["DomainName"]
assert rule["Status"] == created_rule["Status"]
assert rule["StatusMessage"] == created_rule["StatusMessage"]
assert rule["RuleType"] == created_rule["RuleType"]
assert rule["Name"] == created_rule["Name"]
assert rule["TargetIps"] == created_rule["TargetIps"]
assert rule["OwnerId"] == created_rule["OwnerId"]
assert rule["ShareStatus"] == created_rule["ShareStatus"]
assert rule["CreationTime"] == created_rule["CreationTime"]
assert rule["ModificationTime"] == created_rule["ModificationTime"]
@mock_route53resolver
def test_route53resolver_bad_get_resolver_rule():
"""Test get_resolver_rule API calls with a bad ID."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
random_num = get_random_hex(10)
# Use a resolver rule id that is too long.
long_id = "0123456789" * 6 + "xxxxx"
with pytest.raises(ClientError) as exc:
client.get_resolver_rule(ResolverRuleId=long_id)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
f"Value '{long_id}' at 'resolverRuleId' failed to satisfy "
f"constraint: Member must have length less than or equal to 64"
) in err["Message"]
# Delete a non-existent resolver rule.
with pytest.raises(ClientError) as exc:
client.get_resolver_rule(ResolverRuleId=random_num)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert f"Resolver rule with ID '{random_num}' does not exist" in err["Message"]
@mock_route53resolver
def test_route53resolver_list_resolver_rules():
"""Test good list_resolver_rules API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
random_num = get_random_hex(10)
# List rules when there are none.
response = client.list_resolver_rules()
assert len(response["ResolverRules"]) == 0
assert response["MaxResults"] == 10
assert "NextToken" not in response
# Create 4 rules, verify all 4 are listed when no filters, max_results.
for idx in range(4):
create_test_rule(client, name=f"A{idx}-{random_num}")
response = client.list_resolver_rules()
rules = response["ResolverRules"]
assert len(rules) == 4
assert response["MaxResults"] == 10
for idx in range(4):
assert rules[idx]["Name"].startswith(f"A{idx}")
# Set max_results to return 1 rule, use next_token to get remaining 3.
response = client.list_resolver_rules(MaxResults=1)
rules = response["ResolverRules"]
assert len(rules) == 1
assert response["MaxResults"] == 1
assert "NextToken" in response
assert rules[0]["Name"].startswith("A0")
response = client.list_resolver_rules(NextToken=response["NextToken"])
rules = response["ResolverRules"]
assert len(rules) == 3
assert response["MaxResults"] == 10
assert "NextToken" not in response
for idx, rule in enumerate(rules):
assert rule["Name"].startswith(f"A{idx + 1}")
@mock_ec2
@mock_route53resolver
def test_route53resolver_list_resolver_rules_filters():
"""Test good list_resolver_rules API calls that use filters."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
random_num = get_random_hex(10)
# Create some endpoints and rules for testing purposes.
endpoint1 = create_test_endpoint(client, ec2_client)["Id"]
endpoint2 = create_test_endpoint(client, ec2_client)["Id"]
rules = []
for idx in range(1, 5):
response = client.create_resolver_rule(
CreatorRequestId=f"F{idx}-{random_num}",
Name=f"F{idx}-{random_num}",
RuleType="FORWARD" if idx % 2 else "RECURSIVE",
DomainName=f"test{idx}.test",
TargetIps=[{"Ip": f"10.0.1.{idx}", "Port": 50 + idx}],
ResolverEndpointId=endpoint1 if idx % 2 else endpoint2,
)
rules.append(response["ResolverRule"])
# Try all the valid filter names, including some of the old style names.
response = client.list_resolver_rules(
Filters=[{"Name": "CreatorRequestId", "Values": [f"F3-{random_num}"]}]
)
assert len(response["ResolverRules"]) == 1
assert response["ResolverRules"][0]["CreatorRequestId"] == f"F3-{random_num}"
response = client.list_resolver_rules(
Filters=[
{
"Name": "CREATOR_REQUEST_ID",
"Values": [f"F2-{random_num}", f"F4-{random_num}"],
}
]
)
assert len(response["ResolverRules"]) == 2
assert response["ResolverRules"][0]["CreatorRequestId"] == f"F2-{random_num}"
assert response["ResolverRules"][1]["CreatorRequestId"] == f"F4-{random_num}"
response = client.list_resolver_rules(
Filters=[{"Name": "Type", "Values": ["FORWARD"]}]
)
assert len(response["ResolverRules"]) == 2
assert response["ResolverRules"][0]["CreatorRequestId"] == f"F1-{random_num}"
assert response["ResolverRules"][1]["CreatorRequestId"] == f"F3-{random_num}"
response = client.list_resolver_rules(
Filters=[{"Name": "Name", "Values": [f"F1-{random_num}"]}]
)
assert len(response["ResolverRules"]) == 1
assert response["ResolverRules"][0]["Name"] == f"F1-{random_num}"
response = client.list_resolver_rules(
Filters=[
{"Name": "RESOLVER_ENDPOINT_ID", "Values": [endpoint1, endpoint2]},
{"Name": "TYPE", "Values": ["FORWARD"]},
{"Name": "NAME", "Values": [f"F3-{random_num}"]},
]
)
assert len(response["ResolverRules"]) == 1
assert response["ResolverRules"][0]["Name"] == f"F3-{random_num}"
response = client.list_resolver_rules(
Filters=[{"Name": "DomainName", "Values": ["test4.test."]}]
)
assert len(response["ResolverRules"]) == 1
assert response["ResolverRules"][0]["Name"] == f"F4-{random_num}"
response = client.list_resolver_rules(
Filters=[{"Name": "Status", "Values": ["COMPLETE"]}]
)
assert len(response["ResolverRules"]) == 4
response = client.list_resolver_rules(
Filters=[{"Name": "Status", "Values": ["FAILED"]}]
)
assert len(response["ResolverRules"]) == 0
@mock_route53resolver
def test_route53resolver_bad_list_resolver_rules_filters():
"""Test bad list_resolver_rules API calls that use filters."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
# botocore barfs on an empty "Values":
# TypeError: list_resolver_rules() only accepts keyword arguments.
# client.list_resolver_rules([{"Name": "Direction", "Values": []}])
# client.list_resolver_rules([{"Values": []}])
with pytest.raises(ClientError) as exc:
client.list_resolver_rules(Filters=[{"Name": "foo", "Values": ["bar"]}])
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert "The filter 'foo' is invalid" in err["Message"]
@mock_route53resolver
def test_route53resolver_bad_list_resolver_rules():
"""Test bad list_resolver_rules API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
# Bad max_results.
random_num = get_random_hex(10)
create_test_rule(client, name=f"A-{random_num}")
with pytest.raises(ClientError) as exc:
client.list_resolver_rules(MaxResults=250)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
"Value '250' at 'maxResults' failed to satisfy constraint: Member "
"must have length less than or equal to 100"
) in err["Message"]