Route53Resolver: Add resolver endpoint-related APIs (#4559)

This commit is contained in:
kbalk 2021-11-17 15:06:35 -05:00 committed by GitHub
parent f717e494d3
commit 958a129f97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1804 additions and 7 deletions

View File

@ -3668,6 +3668,75 @@
- [ ] update_traffic_policy_instance
</details>
## route53resolver
<summary>14% implemented</summary>
<details>
- [ ] associate_firewall_rule_group
- [ ] associate_resolver_endpoint_ip_address
- [ ] associate_resolver_query_log_config
- [ ] associate_resolver_rule
- [ ] create_firewall_domain_list
- [ ] create_firewall_rule
- [ ] create_firewall_rule_group
- [X] create_resolver_endpoint
- [ ] create_resolver_query_log_config
- [ ] create_resolver_rule
- [ ] delete_firewall_domain_list
- [ ] delete_firewall_rule
- [ ] delete_firewall_rule_group
- [X] delete_resolver_endpoint
- [ ] delete_resolver_query_log_config
- [ ] delete_resolver_rule
- [ ] disassociate_firewall_rule_group
- [ ] disassociate_resolver_endpoint_ip_address
- [ ] disassociate_resolver_query_log_config
- [ ] disassociate_resolver_rule
- [ ] get_firewall_config
- [ ] get_firewall_domain_list
- [ ] get_firewall_rule_group
- [ ] get_firewall_rule_group_association
- [ ] get_firewall_rule_group_policy
- [ ] get_resolver_config
- [ ] get_resolver_dnssec_config
- [X] get_resolver_endpoint
- [ ] get_resolver_query_log_config
- [ ] get_resolver_query_log_config_association
- [ ] get_resolver_query_log_config_policy
- [ ] get_resolver_rule
- [ ] get_resolver_rule_association
- [ ] get_resolver_rule_policy
- [ ] import_firewall_domains
- [ ] list_firewall_configs
- [ ] list_firewall_domain_lists
- [ ] list_firewall_domains
- [ ] list_firewall_rule_group_associations
- [ ] list_firewall_rule_groups
- [ ] list_firewall_rules
- [ ] list_resolver_configs
- [ ] list_resolver_dnssec_configs
- [X] list_resolver_endpoint_ip_addresses
- [X] list_resolver_endpoints
- [ ] list_resolver_query_log_config_associations
- [ ] list_resolver_query_log_configs
- [ ] list_resolver_rule_associations
- [ ] list_resolver_rules
- [X] list_tags_for_resource
- [ ] put_firewall_rule_group_policy
- [ ] put_resolver_query_log_config_policy
- [ ] put_resolver_rule_policy
- [X] tag_resource
- [X] untag_resource
- [ ] update_firewall_config
- [ ] update_firewall_domains
- [ ] update_firewall_rule
- [ ] update_firewall_rule_group_association
- [ ] update_resolver_config
- [ ] update_resolver_dnssec_config
- [X] update_resolver_endpoint
- [ ] update_resolver_rule
</details>
## s3
<details>
<summary>59% implemented</summary>
@ -4727,7 +4796,6 @@
- route53-recovery-control-config
- route53-recovery-readiness
- route53domains
- route53resolver
- s3control
- s3outposts
- sagemaker-a2i-runtime
@ -4772,4 +4840,4 @@
- workmailmessageflow
- workspaces
- xray
</details>
</details>

View File

@ -0,0 +1,92 @@
.. _implementedservice_route53resolver:
.. |start-h3| raw:: html
<h3>
.. |end-h3| raw:: html
</h3>
===============
route53resolver
===============
|start-h3| Example usage |end-h3|
.. sourcecode:: python
@mock_route53resolver
def test_route53resolver_behaviour:
boto3.client("route53resolver")
...
|start-h3| Implemented features for this service |end-h3|
- [ ] associate_firewall_rule_group
- [ ] associate_resolver_endpoint_ip_address
- [ ] associate_resolver_query_log_config
- [ ] associate_resolver_rule
- [ ] create_firewall_domain_list
- [ ] create_firewall_rule
- [ ] create_firewall_rule_group
- [X] create_resolver_endpoint
- [ ] create_resolver_query_log_config
- [ ] create_resolver_rule
- [ ] delete_firewall_domain_list
- [ ] delete_firewall_rule
- [ ] delete_firewall_rule_group
- [X] delete_resolver_endpoint
- [ ] delete_resolver_query_log_config
- [ ] delete_resolver_rule
- [ ] disassociate_firewall_rule_group
- [ ] disassociate_resolver_endpoint_ip_address
- [ ] disassociate_resolver_query_log_config
- [ ] disassociate_resolver_rule
- [ ] get_firewall_config
- [ ] get_firewall_domain_list
- [ ] get_firewall_rule_group
- [ ] get_firewall_rule_group_association
- [ ] get_firewall_rule_group_policy
- [ ] get_resolver_dnssec_config
- [ ] get_resolver_config
- [X] get_resolver_endpoint
- [ ] get_resolver_query_log_config
- [ ] get_resolver_query_log_config_association
- [ ] get_resolver_query_log_config_policy
- [ ] get_resolver_rule
- [ ] get_resolver_rule_association
- [ ] get_resolver_rule_policy
- [ ] import_firewall_domains
- [ ] list_firewall_configs
- [ ] list_firewall_domain_lists
- [ ] list_firewall_domains
- [ ] list_firewall_rule_group_associations
- [ ] list_firewall_rule_groups
- [ ] list_firewall_rules
- [ ] list_resolver_config
- [ ] list_resolver_dnssec_configs
- [X] list_resolver_endpoint_ip_addresses
- [X] list_resolver_endpoints
- [ ] list_resolver_query_log_config_associations
- [ ] list_resolver_query_log_configs
- [ ] list_resolver_rule_associations
- [ ] list_resolver_rules
- [X] list_tags_for_resource
- [ ] put_firewall_rule_group_policy
- [ ] put_resolver_query_log_config_policy
- [ ] put_resolver_rule_policy
- [X] tag_resource
- [X] untag_resource
- [ ] update_firewall_config
- [ ] update_firewall_domains
- [ ] update_firewall_rule
- [ ] update_firewall_rule_group_association
- [ ] update_resolver_config
- [ ] update_resolver_dnssec_config
- [X] update_resolver_endpoint
- [ ] update_resolver_rule

View File

@ -123,6 +123,9 @@ mock_resourcegroupstaggingapi = lazy_load(
)
mock_route53 = lazy_load(".route53", "mock_route53")
mock_route53_deprecated = lazy_load(".route53", "mock_route53_deprecated")
mock_route53resolver = lazy_load(
".route53resolver", "mock_route53resolver", boto3_name="route53resolver"
)
mock_s3 = lazy_load(".s3", "mock_s3")
mock_s3_deprecated = lazy_load(".s3", "mock_s3_deprecated")
mock_sagemaker = lazy_load(".sagemaker", "mock_sagemaker")

View File

@ -1,4 +1,4 @@
# autogenerated by /Users/markus/git_projects_os/moto/scripts/update_backend_index.py
# autogenerated by scripts/update_backend_index.py
import re
backend_url_patterns = [
@ -96,7 +96,11 @@ backend_url_patterns = [
re.compile("https?://resource-groups(-fips)?\\.(.+)\\.amazonaws.com"),
),
("resourcegroupstaggingapi", re.compile("https?://tagging\\.(.+)\\.amazonaws.com")),
("route53", re.compile("https?://route53(.*)\\.amazonaws.com")),
("route53", re.compile("https?://route53(\\..+)?\\.amazonaws.com")),
(
"route53resolver",
re.compile("https?://route53resolver\\.(.+)\\.amazonaws\\.com"),
),
("s3", re.compile("https?://s3(.*)\\.amazonaws.com")),
(
"s3",

View File

@ -1,7 +1,7 @@
"""Route53 base URL and path."""
from .responses import Route53
url_bases = [r"https?://route53(.*)\.amazonaws.com"]
url_bases = [r"https?://route53(\..+)?\.amazonaws.com"]
def tag_response1(*args, **kwargs):

View File

@ -0,0 +1,5 @@
"""route53resolver module initialization; sets value for base decorator."""
from .models import route53resolver_backends
from ..core.models import base_decorator
mock_route53resolver = base_decorator(route53resolver_backends)

View File

@ -0,0 +1,95 @@
"""Exceptions raised by the route53resolver service."""
from moto.core.exceptions import JsonRESTError
class RRValidationException(JsonRESTError):
"""Report one of more parameter validation errors."""
code = 400
def __init__(self, error_tuples):
"""Validation errors are concatenated into one exception message.
error_tuples is a list of tuples. Each tuple contains:
- name of invalid parameter,
- value of invalid parameter,
- string describing the constraints for that parameter.
"""
msg_leader = (
f"{len(error_tuples)} "
f"validation error{'s' if len(error_tuples) > 1 else ''} detected: "
)
msgs = []
for arg_name, arg_value, constraint in error_tuples:
msgs.append(
f"Value '{arg_value}' at '{arg_name}' failed to satisfy "
f"constraint: Member must {constraint}"
)
super().__init__("ValidationException", msg_leader + "; ".join(msgs))
class InvalidNextTokenException(JsonRESTError):
"""Invalid next token parameter used to return a list of entities."""
code = 400
def __init__(self):
super().__init__(
"InvalidNextTokenException",
"Invalid value passed for the NextToken parameter",
)
class InvalidParameterException(JsonRESTError):
"""One or more parameters in request are not valid."""
code = 400
def __init__(self, message):
super().__init__("InvalidParameterException", message)
class InvalidRequestException(JsonRESTError):
"""The request is invalid."""
code = 400
def __init__(self, message):
super().__init__("InvalidRequestException", message)
class LimitExceededException(JsonRESTError):
"""The request caused one or more limits to be exceeded."""
code = 400
def __init__(self, message):
super().__init__("LimitExceededException", message)
class ResourceExistsException(JsonRESTError):
"""The resource already exists."""
code = 400
def __init__(self, message):
super().__init__("ResourceExistsException", message)
class ResourceNotFoundException(JsonRESTError):
"""The specified resource doesn't exist."""
code = 400
def __init__(self, message):
super().__init__("ResourceNotFoundException", message)
class TagValidationException(JsonRESTError):
"""Tag validation failed."""
code = 400
def __init__(self, message):
super().__init__("ValidationException", message)

View File

@ -0,0 +1,397 @@
"""Route53ResolverBackend class with methods for supported APIs."""
from collections import defaultdict
from datetime import datetime, timezone
from ipaddress import ip_address, ip_network
from boto3 import Session
from moto.core import ACCOUNT_ID
from moto.core import BaseBackend, BaseModel
from moto.core.utils import get_random_hex
from moto.ec2 import ec2_backends
from moto.ec2.exceptions import InvalidSubnetIdError
from moto.ec2.exceptions import InvalidSecurityGroupNotFoundError
from moto.route53resolver.exceptions import (
InvalidParameterException,
InvalidRequestException,
LimitExceededException,
ResourceExistsException,
ResourceNotFoundException,
TagValidationException,
)
from moto.route53resolver.utils import PAGINATION_MODEL
from moto.route53resolver.validations import validate_args
from moto.utilities.paginator import paginate
from moto.utilities.tagging_service import TaggingService
class ResolverEndpoint(BaseModel): # pylint: disable=too-many-instance-attributes
"""Representation of a fake Route53 Resolver Endpoint."""
MAX_TAGS_PER_RESOLVER_ENDPOINT = 200
MAX_ENDPOINTS_PER_REGION = 4
def __init__(
self,
region,
endpoint_id,
creator_request_id,
security_group_ids,
direction,
ip_addresses,
name=None,
): # pylint: disable=too-many-arguments
self.region = region
self.creator_request_id = creator_request_id
self.name = name
self.security_group_ids = security_group_ids
self.direction = direction
self.ip_addresses = ip_addresses
# Constructed members.
self.id = endpoint_id # pylint: disable=invalid-name
# NOTE; This currently doesn't reflect IPv6 addresses.
self.subnets = self._build_subnet_info()
self.ip_address_count = len(ip_addresses)
self.host_vpc_id = self._vpc_id_from_subnet()
self.status = "OPERATIONAL"
# The status message should contain a trace Id which is the value
# of X-Amzn-Trace-Id. We don't have that info, so a random number
# of similar format and length will be used.
self.status_message = (
f"[Trace id: 1-{get_random_hex(8)}-{get_random_hex(24)}] "
f"Creating the Resolver Endpoint"
)
self.creation_time = datetime.now(timezone.utc).isoformat()
self.modification_time = datetime.now(timezone.utc).isoformat()
@property
def arn(self):
"""Return ARN for this resolver endpoint."""
return f"arn:aws:route53resolver:{self.region}:{ACCOUNT_ID}:resolver-endpoint/{self.id}"
def _vpc_id_from_subnet(self):
"""Return VPC Id associated with the subnet.
The assumption is that all of the subnets are associated with the
same VPC. We don't check that assumption, but otherwise the existence
of the subnets has already been checked.
"""
first_subnet_id = self.ip_addresses[0]["SubnetId"]
subnet_info = ec2_backends[self.region].get_all_subnets(
subnet_ids=[first_subnet_id]
)[0]
return subnet_info.vpc_id
def _build_subnet_info(self):
"""Create a dict of subnet info, including ip addrs and ENI ids.
self.subnets[subnet_id][ip_addr1] = eni-id1 ...
"""
subnets = defaultdict(dict)
for entry in self.ip_addresses:
subnets[entry["SubnetId"]][entry["Ip"]] = f"rni-{get_random_hex(17)}"
return subnets
def create_eni(self):
"""Create a VPC ENI for each combo of AZ, subnet and IP."""
for subnet, ip_info in self.subnets.items():
for ip_addr, eni_id in ip_info.items():
ec2_backends[self.region].create_network_interface(
description=f"Route 53 Resolver: {self.id}:{eni_id}",
group_ids=self.security_group_ids,
interface_type="interface",
private_ip_address=ip_addr,
private_ip_addresses=[
{"Primary": True, "PrivateIpAddress": ip_addr}
],
subnet=subnet,
)
def description(self):
"""Return a dictionary of relevant info for this resolver endpoint."""
return {
"Id": self.id,
"CreatorRequestId": self.creator_request_id,
"Arn": self.arn,
"Name": self.name,
"SecurityGroupIds": self.security_group_ids,
"Direction": self.direction,
"IpAddressCount": self.ip_address_count,
"HostVPCId": self.host_vpc_id,
"Status": self.status,
"StatusMessage": self.status_message,
"CreationTime": self.creation_time,
"ModificationTime": self.modification_time,
}
def ip_descriptions(self):
"""Return a list of dicts describing resolver endpoint IP addresses."""
description = []
for subnet_id, ip_info in self.subnets.items():
for ip_addr, eni_id in ip_info.items():
description.append(
{
"IpId": eni_id,
"SubnetId": subnet_id,
"Ip": ip_addr,
"Status": "ATTACHED",
"StatusMessage": "This IP address is operational.",
"CreationTime": self.creation_time,
"ModificationTime": self.modification_time,
}
)
return description
def update_name(self, name):
"""Replace existing name with new name."""
self.name = name
self.modification_time = datetime.now(timezone.utc).isoformat()
class Route53ResolverBackend(BaseBackend):
"""Implementation of Route53Resolver APIs."""
def __init__(self, region_name=None):
self.region_name = region_name
self.resolver_endpoints = {} # Key is self-generated ID (endpoint_id)
self.tagger = TaggingService()
def reset(self):
"""Re-initialize all attributes for this instance."""
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
@staticmethod
def default_vpc_endpoint_service(service_region, zones):
"""List of dicts representing default VPC endpoints for this service."""
return BaseBackend.default_vpc_endpoint_service_factory(
service_region, zones, "route53resolver"
)
@staticmethod
def _verify_subnet_ips(region, ip_addresses):
"""Perform additional checks on the IPAddresses.
NOTE: This does not include IPv6 addresses.
"""
if len(ip_addresses) < 2:
raise InvalidRequestException(
"Resolver endpoint needs to have at least 2 IP addresses"
)
subnets = defaultdict(set)
for subnet_id, ip_addr in [(x["SubnetId"], x["Ip"]) for x in ip_addresses]:
try:
subnet_info = ec2_backends[region].get_all_subnets(
subnet_ids=[subnet_id]
)[0]
except InvalidSubnetIdError as exc:
raise InvalidParameterException(
f"The subnet ID '{subnet_id}' does not exist"
) from exc
# IP in IPv4 CIDR range and not reserved?
if ip_address(ip_addr) in subnet_info.reserved_ips or ip_address(
ip_addr
) not in ip_network(subnet_info.cidr_block):
raise InvalidRequestException(
f"IP address '{ip_addr}' is either not in subnet "
f"'{subnet_id}' CIDR range or is reserved"
)
if ip_addr in subnets[subnet_id]:
raise ResourceExistsException(
f"The IP address '{ip_addr}' in subnet '{subnet_id}' is already in use"
)
subnets[subnet_id].add(ip_addr)
@staticmethod
def _verify_security_group_ids(region, security_group_ids):
"""Perform additional checks on the security groups."""
if len(security_group_ids) > 10:
raise InvalidParameterException("Maximum of 10 security groups are allowed")
for group_id in security_group_ids:
if not group_id.startswith("sg-"):
raise InvalidParameterException(
f"Malformed security group ID: Invalid id: '{group_id}' "
f"(expecting 'sg-...')"
)
try:
ec2_backends[region].describe_security_groups(group_ids=[group_id])
except InvalidSecurityGroupNotFoundError as exc:
raise ResourceNotFoundException(
f"The security group '{group_id}' does not exist"
) from exc
def create_resolver_endpoint(
self,
region,
creator_request_id,
name,
security_group_ids,
direction,
ip_addresses,
tags,
): # pylint: disable=too-many-arguments
"""Return description for a newly created resolver endpoint.
NOTE: IPv6 IPs are currently not being filtered when
calculating the create_resolver_endpoint() IpAddresses.
"""
validate_args(
[
("creatorRequestId", creator_request_id),
("direction", direction),
("ipAddresses", ip_addresses),
("name", name),
("securityGroupIds", security_group_ids),
("ipAddresses.subnetId", ip_addresses),
]
)
errmsg = self.tagger.validate_tags(
tags or [], limit=ResolverEndpoint.MAX_TAGS_PER_RESOLVER_ENDPOINT,
)
if errmsg:
raise TagValidationException(errmsg)
endpoints = [x for x in self.resolver_endpoints.values() if x.region == region]
if len(endpoints) > ResolverEndpoint.MAX_ENDPOINTS_PER_REGION:
raise LimitExceededException(
f"Account '{ACCOUNT_ID}' has exceeded 'max-endpoints'"
)
self._verify_subnet_ips(region, ip_addresses)
self._verify_security_group_ids(region, security_group_ids)
if creator_request_id in [
x.creator_request_id for x in self.resolver_endpoints.values()
]:
raise ResourceExistsException(
f"Resolver endpoint with creator request ID "
f"'{creator_request_id}' already exists"
)
endpoint_id = (
f"rslvr-{'in' if direction == 'INBOUND' else 'out'}-{get_random_hex(17)}"
)
resolver_endpoint = ResolverEndpoint(
region,
endpoint_id,
creator_request_id,
security_group_ids,
direction,
ip_addresses,
name,
)
resolver_endpoint.create_eni()
self.resolver_endpoints[endpoint_id] = resolver_endpoint
self.tagger.tag_resource(resolver_endpoint.arn, tags or [])
return resolver_endpoint
def _validate_resolver_endpoint_id(self, resolver_endpoint_id):
"""Raise an exception if the id is invalid or unknown."""
validate_args([("resolverEndpointId", resolver_endpoint_id)])
if resolver_endpoint_id not in self.resolver_endpoints:
raise ResourceNotFoundException(
f"Resolver endpoint with ID '{resolver_endpoint_id}' does not exist"
)
def delete_resolver_endpoint(self, resolver_endpoint_id):
"""Delete a resolver endpoint."""
self._validate_resolver_endpoint_id(resolver_endpoint_id)
self.tagger.delete_all_tags_for_resource(resolver_endpoint_id)
resolver_endpoint = self.resolver_endpoints.pop(resolver_endpoint_id)
resolver_endpoint.status = "DELETING"
resolver_endpoint.status_message = resolver_endpoint.status_message.replace(
"Creating", "Deleting"
)
return resolver_endpoint
def get_resolver_endpoint(self, resolver_endpoint_id):
"""Return info for specified resolver endpoint."""
self._validate_resolver_endpoint_id(resolver_endpoint_id)
return self.resolver_endpoints[resolver_endpoint_id]
@paginate(pagination_model=PAGINATION_MODEL)
def list_resolver_endpoint_ip_addresses(
self, resolver_endpoint_id, next_token=None, max_results=None,
): # pylint: disable=unused-argument
"""List IP endresses for specified resolver endpoint."""
self._validate_resolver_endpoint_id(resolver_endpoint_id)
endpoint = self.resolver_endpoints[resolver_endpoint_id]
return endpoint.ip_descriptions()
@paginate(pagination_model=PAGINATION_MODEL)
def list_resolver_endpoints(
self, filters=None, next_token=None, max_results=None,
): # pylint: disable=unused-argument
"""List all resolver endpoints, using filters if specified."""
# TODO - check subsequent filters
# TODO - validate name, values for filters
return sorted(self.resolver_endpoints.values(), key=lambda x: x.name)
@paginate(pagination_model=PAGINATION_MODEL)
def list_tags_for_resource(
self, resource_arn, next_token=None, max_results=None,
): # pylint: disable=unused-argument
"""List all tags for the given resource."""
self._matched_arn(resource_arn)
return self.tagger.list_tags_for_resource(resource_arn).get("Tags")
def _matched_arn(self, resource_arn):
"""Given ARN, raise exception if there is no corresponding resource."""
for resolver_endpoint in self.resolver_endpoints.values():
if resolver_endpoint.arn == resource_arn:
return
raise ResourceNotFoundException(
f"Resolver endpoint with ID '{resource_arn}' does not exist"
)
def tag_resource(self, resource_arn, tags):
"""Add or overwrite one or more tags for specified resource."""
self._matched_arn(resource_arn)
errmsg = self.tagger.validate_tags(
tags, limit=ResolverEndpoint.MAX_TAGS_PER_RESOLVER_ENDPOINT,
)
if errmsg:
raise TagValidationException(errmsg)
self.tagger.tag_resource(resource_arn, tags)
def untag_resource(self, resource_arn, tag_keys):
"""Removes tags from a resource."""
self._matched_arn(resource_arn)
self.tagger.untag_resource_using_names(resource_arn, tag_keys)
def update_resolver_endpoint(self, resolver_endpoint_id, name):
"""Update name of Resolver endpoint."""
self._validate_resolver_endpoint_id(resolver_endpoint_id)
validate_args([("name", name)])
resolver_endpoint = self.resolver_endpoints[resolver_endpoint_id]
resolver_endpoint.update_name(name)
return resolver_endpoint
route53resolver_backends = {}
for available_region in Session().get_available_regions("route53resolver"):
route53resolver_backends[available_region] = Route53ResolverBackend(
available_region
)
for available_region in Session().get_available_regions(
"route53resolver", partition_name="aws-us-gov"
):
route53resolver_backends[available_region] = Route53ResolverBackend(
available_region
)
for available_region in Session().get_available_regions(
"route53resolver", partition_name="aws-cn"
):
route53resolver_backends[available_region] = Route53ResolverBackend(
available_region
)

View File

@ -0,0 +1,146 @@
"""Handles incoming route53resolver requests/responses."""
import json
from moto.core.exceptions import InvalidToken
from moto.core.responses import BaseResponse
from moto.route53resolver.exceptions import InvalidNextTokenException
from moto.route53resolver.models import route53resolver_backends
from moto.route53resolver.validations import validate_args
class Route53ResolverResponse(BaseResponse):
"""Handler for Route53Resolver requests and responses."""
@property
def route53resolver_backend(self):
"""Return backend instance specific for this region."""
return route53resolver_backends[self.region]
def create_resolver_endpoint(self):
"""Create an inbound or outbound Resolver endpoint."""
creator_request_id = self._get_param("CreatorRequestId")
name = self._get_param("Name")
security_group_ids = self._get_param("SecurityGroupIds")
direction = self._get_param("Direction")
ip_addresses = self._get_param("IpAddresses")
tags = self._get_param("Tags", [])
resolver_endpoint = self.route53resolver_backend.create_resolver_endpoint(
region=self.region,
creator_request_id=creator_request_id,
name=name,
security_group_ids=security_group_ids,
direction=direction,
ip_addresses=ip_addresses,
tags=tags,
)
return json.dumps({"ResolverEndpoint": resolver_endpoint.description()})
def delete_resolver_endpoint(self):
"""Delete a Resolver endpoint."""
resolver_endpoint_id = self._get_param("ResolverEndpointId")
resolver_endpoint = self.route53resolver_backend.delete_resolver_endpoint(
resolver_endpoint_id=resolver_endpoint_id,
)
return json.dumps({"ResolverEndpoint": resolver_endpoint.description()})
def get_resolver_endpoint(self):
"""Return info about a specific Resolver endpoint."""
resolver_endpoint_id = self._get_param("ResolverEndpointId")
resolver_endpoint = self.route53resolver_backend.get_resolver_endpoint(
resolver_endpoint_id=resolver_endpoint_id,
)
return json.dumps({"ResolverEndpoint": resolver_endpoint.description()})
def list_resolver_endpoint_ip_addresses(self):
"""Returns list of IP addresses for specified Resolver endpoint."""
resolver_endpoint_id = self._get_param("ResolverEndpointId")
next_token = self._get_param("NextToken")
max_results = self._get_param("MaxResults", 10)
validate_args([("maxResults", max_results)])
try:
(
ip_addresses,
next_token,
) = self.route53resolver_backend.list_resolver_endpoint_ip_addresses(
resolver_endpoint_id=resolver_endpoint_id,
next_token=next_token,
max_results=max_results,
)
except InvalidToken as exc:
raise InvalidNextTokenException() from exc
response = {
"IpAddresses": ip_addresses,
"MaxResults": max_results,
}
if next_token:
response["NextToken"] = next_token
return json.dumps(response)
def list_resolver_endpoints(self):
"""Returns list of all Resolver endpoints, filtered if specified."""
filters = self._get_param("Filters")
next_token = self._get_param("NextToken")
max_results = self._get_param("MaxResults", 10)
validate_args([("maxResults", max_results)])
try:
(
endpoints,
next_token,
) = self.route53resolver_backend.list_resolver_endpoints(
filters=filters, next_token=next_token, max_results=max_results
)
except InvalidToken as exc:
raise InvalidNextTokenException() from exc
response = {
"ResolverEndpoints": [x.description() for x in endpoints],
"MaxResults": max_results,
}
if next_token:
response["NextToken"] = next_token
return json.dumps(response)
def list_tags_for_resource(self):
"""Lists all tags for the given resource."""
resource_arn = self._get_param("ResourceArn")
next_token = self._get_param("NextToken")
max_results = self._get_param("MaxResults")
try:
(tags, next_token) = self.route53resolver_backend.list_tags_for_resource(
resource_arn=resource_arn,
next_token=next_token,
max_results=max_results,
)
except InvalidToken as exc:
raise InvalidNextTokenException() from exc
response = {"Tags": tags}
if next_token:
response["NextToken"] = next_token
return json.dumps(response)
def tag_resource(self):
"""Add one or more tags to a specified resource."""
resource_arn = self._get_param("ResourceArn")
tags = self._get_param("Tags")
self.route53resolver_backend.tag_resource(resource_arn=resource_arn, tags=tags)
return ""
def untag_resource(self):
"""Removes one or more tags from the specified resource."""
resource_arn = self._get_param("ResourceArn")
tag_keys = self._get_param("TagKeys")
self.route53resolver_backend.untag_resource(
resource_arn=resource_arn, tag_keys=tag_keys
)
return ""
def update_resolver_endpoint(self):
"""Update name of Resolver endpoint."""
resolver_endpoint_id = self._get_param("ResolverEndpointId")
name = self._get_param("Name")
resolver_endpoint = self.route53resolver_backend.update_resolver_endpoint(
resolver_endpoint_id=resolver_endpoint_id, name=name,
)
return json.dumps({"ResolverEndpoint": resolver_endpoint.description()})

View File

@ -0,0 +1,11 @@
"""route53resolver base URL and path."""
from .responses import Route53ResolverResponse
url_bases = [
r"https?://route53resolver\.(.+)\.amazonaws\.com",
]
url_paths = {
"{0}/$": Route53ResolverResponse.dispatch,
}

View File

@ -0,0 +1,22 @@
"""Pagination control model for Route53Resolver."""
PAGINATION_MODEL = {
"list_resolver_endpoint_ip_addresses": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"page_ending_range_keys": ["IpId"],
},
"list_resolver_endpoints": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"page_ending_range_keys": ["id"],
},
"list_tags_for_resource": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"page_ending_range_keys": ["Key"],
},
}

View File

@ -0,0 +1,103 @@
"""Route53ResolverBackend validations that result in ValidationException.
Note that ValidationExceptions are accumulative.
"""
import re
from moto.route53resolver.exceptions import RRValidationException
def validate_args(validators):
"""Raise exception if any of the validations fails.
validators is a list of tuples each containing the following:
(printable field name, field value)
The error messages are accumulated before the exception is raised.
"""
validation_map = {
"creatorRequestId": validate_creator_request_id,
"direction": validate_direction,
"resolverEndpointId": validate_endpoint_id,
"ipAddresses": validate_ip_addresses,
"maxResults": validate_max_results,
"name": validate_name,
"securityGroupIds": validate_security_group_ids,
"ipAddresses.subnetId": validate_subnets,
}
err_msgs = []
# This eventually could be a switch (python 3.10), eliminating the need
# for the above map and individual functions.
for (fieldname, value) in validators:
msg = validation_map[fieldname](value)
if msg:
err_msgs.append((fieldname, value, msg))
if err_msgs:
raise RRValidationException(err_msgs)
def validate_creator_request_id(value):
"""Raise exception if the creator_request id has invalid length."""
if value and len(value) > 255:
return "have length less than or equal to 255"
return ""
def validate_direction(value):
"""Raise exception if direction not one of the allowed values."""
if value and value not in ["INBOUND", "OUTBOUND"]:
return "satisfy enum value set: [INBOUND, OUTBOUND]"
return ""
def validate_endpoint_id(value):
"""Raise exception if resolver endpoint id has invalid length."""
if len(value) > 64:
return "have length less than or equal to 64"
return ""
def validate_ip_addresses(value):
"""Raise exception if IPs fail to match length constraint."""
if len(value) > 10:
return "have length less than or equal to 10"
return ""
def validate_max_results(value):
"""Raise exception if number of endpoints or IPs is too large."""
if value and value > 100:
return "have length less than or equal to 100"
return ""
def validate_name(value):
"""Raise exception if name fails to match constraints."""
if value:
if len(value) > 64:
return "have length less than or equal to 64"
name_pattern = r"^(?!^[0-9]+$)([a-zA-Z0-9-_' ']+)$"
if not re.match(name_pattern, value):
return fr"satisfy regular expression pattern: {name_pattern}"
return ""
def validate_security_group_ids(value):
"""Raise exception if IPs fail to match length constraint."""
# Too many security group IDs is an InvalidParameterException.
for group_id in value:
if len(group_id) > 64:
return (
"have length less than or equal to 64 and Member must have "
"length greater than or equal to 1"
)
return ""
def validate_subnets(value):
"""Raise exception if subnets fail to match length constraint."""
for subnet_id in [x["SubnetId"] for x in value]:
if len(subnet_id) > 32:
return "have length less than or equal to 32"
return ""

View File

@ -74,7 +74,7 @@ class DomainDispatcherApplication(object):
if "amazonaws.com" in host:
print(
"Unable to find appropriate backend for {}."
"Remember to add the URL to urls.py, and run script/update_backend_index.py to index it.".format(
"Remember to add the URL to urls.py, and run scripts/update_backend_index.py to index it.".format(
host
)
)
@ -91,6 +91,7 @@ class DomainDispatcherApplication(object):
credential_scope = auth.split(",")[0].split()[1]
_, _, region, service, _ = credential_scope.split("/")
service = SIGNING_ALIASES.get(service.lower(), service)
service = service.lower()
except ValueError:
# Signature format does not match, this is exceptional and we can't
# infer a service-region. A reduced set of services still use

View File

@ -101,12 +101,14 @@ class TaggingService:
result[tag[self.key_name]] = None
return result
def validate_tags(self, tags):
def validate_tags(self, tags, limit=0):
"""Returns error message if tags in 'tags' list of dicts are invalid.
The validation does not include a check for duplicate keys.
Duplicate keys are not always an error and the error message isn't
consistent across services, so this should be a separate check.
If limit is provided, then the number of tags will be checked.
"""
errors = []
key_regex = re.compile(r"^(?!aws:)([\w\s\d_.:/=+\-@]*)$")
@ -147,6 +149,12 @@ class TaggingService:
r"^[{a-zA-Z0-9 }_.://=+-@%]*$"
)
if limit and len(tags) > limit:
errors.append(
f"Value '{tags}' at 'tags' failed to satisfy constraint: "
f"Member must have length less than or equal to {limit}"
)
errors_len = len(errors)
return (
(

View File

@ -105,6 +105,7 @@ extras_per_service["dynamodbstreams"] = extras_per_service["awslambda"]
extras_per_service["efs"] = extras_per_service["ec2"]
# DirectoryService needs EC2 to verify VPCs and subnets.
extras_per_service["ds"] = extras_per_service["ec2"]
extras_per_service["route53resolver"] = extras_per_service["ec2"]
extras_require = {
"all": all_extra_deps,
"server": all_server_deps,

View File

View File

@ -0,0 +1,691 @@
"""Unit tests for route53resolver endpoint-related APIs."""
from datetime import datetime, timezone
import boto3
from botocore.exceptions import ClientError
import pytest
from moto import mock_route53resolver
from moto import settings
from moto.core import ACCOUNT_ID
from moto.core.utils import get_random_hex
from moto.ec2 import mock_ec2
TEST_REGION = "us-east-1" if settings.TEST_SERVER_MODE else "us-west-2"
def create_security_group(ec2_client):
"""Return a security group ID."""
group_name = "RRUnitTests"
# Does the security group already exist?
groups = ec2_client.describe_security_groups(
Filters=[{"Name": "group-name", "Values": [group_name]}]
)
# If so, we're done. Otherwise, create it.
if groups["SecurityGroups"]:
return groups["SecurityGroups"][0]["GroupId"]
response = ec2_client.create_security_group(
Description="Security group used by unit tests", GroupName=group_name,
)
return response["GroupId"]
def create_vpc(ec2_client):
"""Return the ID for a valid VPC."""
return ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]["VpcId"]
def create_subnets(ec2_client, vpc_id):
"""Returns the IDs for two valid subnets."""
subnet_ids = []
for cidr_block in ["10.0.1.0/24", "10.0.0.0/24"]:
subnet_ids.append(
ec2_client.create_subnet(
VpcId=vpc_id, CidrBlock=cidr_block, AvailabilityZone=f"{TEST_REGION}a",
)["Subnet"]["SubnetId"]
)
return subnet_ids
def create_test_endpoint(client, ec2_client, name=None, tags=None):
"""Create an endpoint that can be used for testing purposes.
Can't be used for unit tests that need to know/test the arguments.
"""
if not tags:
tags = []
random_num = get_random_hex(10)
subnet_ids = create_subnets(ec2_client, create_vpc(ec2_client))
resolver_endpoint = client.create_resolver_endpoint(
CreatorRequestId=random_num,
Name=name if name else "X" + random_num,
SecurityGroupIds=[create_security_group(ec2_client)],
Direction="INBOUND",
IpAddresses=[
{"SubnetId": subnet_ids[0], "Ip": "10.0.1.200"},
{"SubnetId": subnet_ids[1], "Ip": "10.0.0.20"},
],
Tags=tags,
)
return resolver_endpoint["ResolverEndpoint"]
@mock_route53resolver
def test_route53resolver_invalid_create_endpoint_args():
"""Test invalid arguments to the create_resolver_endpoint API."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
random_num = get_random_hex(10)
# Verify ValidationException error messages are accumulated properly:
# - creator requestor ID that exceeds the allowed length of 255.
# - name that exceeds the allowed length of 64.
# - direction that's neither INBOUND or OUTBOUND.
# - more than 10 IP Address sets.
# - too many security group IDs.
long_id = random_num * 25 + "123456"
long_name = random_num * 6 + "abcde"
too_many_security_groups = ["sg-" + get_random_hex(63)]
bad_direction = "foo"
too_many_ip_addresses = [{"SubnetId": f"{x}", "Ip": f"{x}" * 7} for x in range(11)]
with pytest.raises(ClientError) as exc:
client.create_resolver_endpoint(
CreatorRequestId=long_id,
Name=long_name,
SecurityGroupIds=too_many_security_groups,
Direction=bad_direction,
IpAddresses=too_many_ip_addresses,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "5 validation errors detected" in err["Message"]
assert (
f"Value '{long_id}' at 'creatorRequestId' failed to satisfy constraint: "
f"Member must have length less than or equal to 255"
) in err["Message"]
assert (
f"Value '{too_many_security_groups}' at 'securityGroupIds' failed to "
f"satisfy constraint: Member must have length less than or equal to 64"
) in err["Message"]
assert (
f"Value '{long_name}' at 'name' failed to satisfy constraint: "
f"Member must have length less than or equal to 64"
) in err["Message"]
assert (
f"Value '{bad_direction}' at 'direction' failed to satisfy constraint: "
f"Member must satisfy enum value set: [INBOUND, OUTBOUND]"
) in err["Message"]
assert (
f"Value '{too_many_ip_addresses}' at 'ipAddresses' failed to satisfy "
f"constraint: Member must have length less than or equal to 10"
) in err["Message"]
# Some single ValidationException errors ...
bad_chars_in_name = "0@*3"
ok_group_ids = ["sg-" + random_num]
ok_ip_addrs = [{"SubnetId": f"{x}", "Ip": f"{x}" * 7} for x in range(10)]
with pytest.raises(ClientError) as exc:
client.create_resolver_endpoint(
CreatorRequestId=random_num,
Name=bad_chars_in_name,
SecurityGroupIds=ok_group_ids,
Direction="INBOUND",
IpAddresses=ok_ip_addrs,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
fr"Value '{bad_chars_in_name}' at 'name' failed to satisfy constraint: "
fr"Member must satisfy regular expression pattern: "
fr"^(?!^[0-9]+$)([a-zA-Z0-9-_' ']+)$"
) in err["Message"]
subnet_too_long = [{"SubnetId": "a" * 33, "Ip": "1.2.3.4"}]
with pytest.raises(ClientError) as exc:
client.create_resolver_endpoint(
CreatorRequestId=random_num,
Name="X" + random_num,
SecurityGroupIds=ok_group_ids,
Direction="OUTBOUND",
IpAddresses=subnet_too_long,
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
f"Value '{subnet_too_long}' at 'ipAddresses.subnetId' failed to "
f"satisfy constraint: Member must have length less than or equal to 32"
) in err["Message"]
@mock_ec2
@mock_route53resolver
def test_route53resolver_bad_create_endpoint_subnets():
"""Test bad subnet scenarios for create_resolver_endpoint API."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
random_num = get_random_hex(10)
# Need 2 IP addresses at the minimum.
subnet_ids = create_subnets(ec2_client, create_vpc(ec2_client))
with pytest.raises(ClientError) as exc:
client.create_resolver_endpoint(
CreatorRequestId=random_num,
Name="X" + random_num,
SecurityGroupIds=[f"sg-{random_num}"],
Direction="INBOUND",
IpAddresses=[{"SubnetId": subnet_ids[0], "Ip": "1.2.3.4"}],
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidRequestException"
assert "Resolver endpoint needs to have at least 2 IP addresses" in err["Message"]
# Need an IP that's within in the subnet.
bad_ip_addr = "1.2.3.4"
with pytest.raises(ClientError) as exc:
client.create_resolver_endpoint(
CreatorRequestId=random_num,
Name="X" + random_num,
SecurityGroupIds=[f"sg-{random_num}"],
Direction="INBOUND",
IpAddresses=[
{"SubnetId": subnet_ids[0], "Ip": bad_ip_addr},
{"SubnetId": subnet_ids[1], "Ip": bad_ip_addr},
],
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidRequestException"
assert (
f"IP address '{bad_ip_addr}' is either not in subnet "
f"'{subnet_ids[0]}' CIDR range or is reserved"
) in err["Message"]
# Bad subnet ID.
with pytest.raises(ClientError) as exc:
client.create_resolver_endpoint(
CreatorRequestId=random_num,
Name="X" + random_num,
SecurityGroupIds=[f"sg-{random_num}"],
Direction="INBOUND",
IpAddresses=[
{"SubnetId": "foo", "Ip": "1.2.3.4"},
{"SubnetId": subnet_ids[1], "Ip": "1.2.3.4"},
],
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert "The subnet ID 'foo' does not exist" in err["Message"]
# Can't reuse a ip address in a subnet.
subnet_ids = create_subnets(ec2_client, create_vpc(ec2_client))
with pytest.raises(ClientError) as exc:
client.create_resolver_endpoint(
CreatorRequestId="B" + random_num,
Name="B" + random_num,
SecurityGroupIds=[create_security_group(ec2_client)],
Direction="INBOUND",
IpAddresses=[
{"SubnetId": subnet_ids[0], "Ip": "10.0.1.200"},
{"SubnetId": subnet_ids[0], "Ip": "10.0.1.200"},
],
)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceExistsException"
assert (
f"The IP address '10.0.1.200' in subnet '{subnet_ids[0]}' is already in use"
) in err["Message"]
@mock_ec2
@mock_route53resolver
def test_route53resolver_bad_create_endpoint_security_groups():
"""Test bad security group scenarios for create_resolver_endpoint API."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
random_num = get_random_hex(10)
subnet_ids = create_subnets(ec2_client, create_vpc(ec2_client))
ip_addrs = [
{"SubnetId": subnet_ids[0], "Ip": "10.0.1.200"},
{"SubnetId": subnet_ids[1], "Ip": "10.0.0.20"},
]
# Subnet must begin with "sg-".
with pytest.raises(ClientError) as exc:
client.create_resolver_endpoint(
CreatorRequestId=random_num,
Name="X" + random_num,
SecurityGroupIds=["foo"],
Direction="INBOUND",
IpAddresses=ip_addrs,
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert (
"Malformed security group ID: Invalid id: 'foo' (expecting 'sg-...')"
) in err["Message"]
# Non-existent security group id.
with pytest.raises(ClientError) as exc:
client.create_resolver_endpoint(
CreatorRequestId=random_num,
Name="X" + random_num,
SecurityGroupIds=["sg-abc"],
Direction="INBOUND",
IpAddresses=ip_addrs,
)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert "The security group 'sg-abc' does not exist" in err["Message"]
# Too many security group ids.
with pytest.raises(ClientError) as exc:
client.create_resolver_endpoint(
CreatorRequestId=random_num,
Name="X" + random_num,
SecurityGroupIds=["sg-abc"] * 11,
Direction="INBOUND",
IpAddresses=ip_addrs,
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert "Maximum of 10 security groups are allowed" in err["Message"]
@mock_ec2
@mock_route53resolver
def test_route53resolver_create_resolver_endpoint(): # pylint: disable=too-many-locals
"""Test good create_resolver_endpoint API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
random_num = get_random_hex(10)
vpc_id = create_vpc(ec2_client)
subnet_ids = create_subnets(ec2_client, vpc_id)
ip_addrs = [
{"SubnetId": subnet_ids[0], "Ip": "10.0.1.200"},
{"SubnetId": subnet_ids[1], "Ip": "10.0.0.20"},
]
security_group_id = create_security_group(ec2_client)
creator_request_id = random_num
name = "X" + random_num
response = client.create_resolver_endpoint(
CreatorRequestId=creator_request_id,
Name=name,
SecurityGroupIds=[security_group_id],
Direction="INBOUND",
IpAddresses=ip_addrs,
)
endpoint = response["ResolverEndpoint"]
id_value = endpoint["Id"]
assert id_value.startswith("rslvr-in-")
assert endpoint["CreatorRequestId"] == creator_request_id
assert (
endpoint["Arn"]
== f"arn:aws:route53resolver:{TEST_REGION}:{ACCOUNT_ID}:resolver-endpoint/{id_value}"
)
assert endpoint["Name"] == name
assert endpoint["SecurityGroupIds"] == [security_group_id]
assert endpoint["Direction"] == "INBOUND"
assert endpoint["IpAddressCount"] == 2
assert endpoint["HostVPCId"] == vpc_id
assert endpoint["Status"] == "OPERATIONAL"
assert "Creating the Resolver Endpoint" in endpoint["StatusMessage"]
time_format = "%Y-%m-%dT%H:%M:%S.%f+00:00"
now = datetime.now(timezone.utc).replace(tzinfo=None)
creation_time = datetime.strptime(endpoint["CreationTime"], time_format)
creation_time = creation_time.replace(tzinfo=None)
assert creation_time <= now
modification_time = datetime.strptime(endpoint["ModificationTime"], time_format)
modification_time = modification_time.replace(tzinfo=None)
assert modification_time <= now
@mock_ec2
@mock_route53resolver
def test_route53resolver_other_create_resolver_endpoint_errors():
"""Test good delete_resolver_endpoint API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Create a good endpoint that we can use to test.
created_endpoint = create_test_endpoint(client, ec2_client)
request_id = created_endpoint["CreatorRequestId"]
# Attempt to create another endpoint with the same creator request id.
vpc_id = create_vpc(ec2_client)
subnet_ids = create_subnets(ec2_client, vpc_id)
with pytest.raises(ClientError) as exc:
client.create_resolver_endpoint(
CreatorRequestId=created_endpoint["CreatorRequestId"],
Name="X" + get_random_hex(10),
SecurityGroupIds=created_endpoint["SecurityGroupIds"],
Direction="INBOUND",
IpAddresses=[
{"SubnetId": subnet_ids[0], "Ip": "10.0.1.200"},
{"SubnetId": subnet_ids[1], "Ip": "10.0.0.20"},
],
)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceExistsException"
assert (
f"Resolver endpoint with creator request ID '{request_id}' already exists"
) in err["Message"]
# Too many endpoints.
random_num = get_random_hex(10)
for idx in range(4):
create_test_endpoint(client, ec2_client, name=f"A{idx}-{random_num}")
with pytest.raises(ClientError) as exc:
create_test_endpoint(client, ec2_client, name=f"A5-{random_num}")
err = exc.value.response["Error"]
assert err["Code"] == "LimitExceededException"
assert f"Account '{ACCOUNT_ID}' has exceeded 'max-endpoints" in err["Message"]
@mock_ec2
@mock_route53resolver
def test_route53resolver_delete_resolver_endpoint():
"""Test good delete_resolver_endpoint API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
created_endpoint = create_test_endpoint(client, ec2_client)
# Now delete the resolver endpoint and verify the response.
response = client.delete_resolver_endpoint(
ResolverEndpointId=created_endpoint["Id"]
)
endpoint = response["ResolverEndpoint"]
assert endpoint["CreatorRequestId"] == created_endpoint["CreatorRequestId"]
assert endpoint["Id"] == created_endpoint["Id"]
assert endpoint["Arn"] == created_endpoint["Arn"]
assert endpoint["Name"] == created_endpoint["Name"]
assert endpoint["SecurityGroupIds"] == created_endpoint["SecurityGroupIds"]
assert endpoint["Direction"] == created_endpoint["Direction"]
assert endpoint["IpAddressCount"] == created_endpoint["IpAddressCount"]
assert endpoint["HostVPCId"] == created_endpoint["HostVPCId"]
assert endpoint["Status"] == "DELETING"
assert "Deleting" in endpoint["StatusMessage"]
assert endpoint["CreationTime"] == created_endpoint["CreationTime"]
@mock_route53resolver
def test_route53resolver_bad_delete_resolver_endpoint():
"""Test delete_resolver_endpoint API calls with a bad ID."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
random_num = get_random_hex(10)
# Use a resolver endpoint id that is too long.
long_id = "0123456789" * 6 + "xxxxx"
with pytest.raises(ClientError) as exc:
client.delete_resolver_endpoint(ResolverEndpointId=long_id)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
f"Value '{long_id}' at 'resolverEndpointId' failed to satisfy "
f"constraint: Member must have length less than or equal to 64"
) in err["Message"]
# Delete a non-existent resolver endpoint.
with pytest.raises(ClientError) as exc:
client.delete_resolver_endpoint(ResolverEndpointId=random_num)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert f"Resolver endpoint with ID '{random_num}' does not exist" in err["Message"]
@mock_ec2
@mock_route53resolver
def test_route53resolver_get_resolver_endpoint():
"""Test good get_resolver_endpoint API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
created_endpoint = create_test_endpoint(client, ec2_client)
# Now get the resolver endpoint and verify the response.
response = client.get_resolver_endpoint(ResolverEndpointId=created_endpoint["Id"])
endpoint = response["ResolverEndpoint"]
assert endpoint["CreatorRequestId"] == created_endpoint["CreatorRequestId"]
assert endpoint["Id"] == created_endpoint["Id"]
assert endpoint["Arn"] == created_endpoint["Arn"]
assert endpoint["Name"] == created_endpoint["Name"]
assert endpoint["SecurityGroupIds"] == created_endpoint["SecurityGroupIds"]
assert endpoint["Direction"] == created_endpoint["Direction"]
assert endpoint["IpAddressCount"] == created_endpoint["IpAddressCount"]
assert endpoint["HostVPCId"] == created_endpoint["HostVPCId"]
assert endpoint["Status"] == created_endpoint["Status"]
assert endpoint["StatusMessage"] == created_endpoint["StatusMessage"]
assert endpoint["CreationTime"] == created_endpoint["CreationTime"]
assert endpoint["ModificationTime"] == created_endpoint["ModificationTime"]
@mock_route53resolver
def test_route53resolver_bad_get_resolver_endpoint():
"""Test get_resolver_endpoint API calls with a bad ID."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
random_num = get_random_hex(10)
# Use a resolver endpoint id that is too long.
long_id = "0123456789" * 6 + "xxxxx"
with pytest.raises(ClientError) as exc:
client.get_resolver_endpoint(ResolverEndpointId=long_id)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
f"Value '{long_id}' at 'resolverEndpointId' failed to satisfy "
f"constraint: Member must have length less than or equal to 64"
) in err["Message"]
# Delete a non-existent resolver endpoint.
with pytest.raises(ClientError) as exc:
client.get_resolver_endpoint(ResolverEndpointId=random_num)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert f"Resolver endpoint with ID '{random_num}' does not exist" in err["Message"]
@mock_ec2
@mock_route53resolver
def test_route53resolver_update_resolver_endpoint():
"""Test good update_resolver_endpoint API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
created_endpoint = create_test_endpoint(client, ec2_client)
# Now update the resolver endpoint name and verify the response.
new_name = "NewName" + get_random_hex(6)
response = client.update_resolver_endpoint(
ResolverEndpointId=created_endpoint["Id"], Name=new_name,
)
endpoint = response["ResolverEndpoint"]
assert endpoint["CreatorRequestId"] == created_endpoint["CreatorRequestId"]
assert endpoint["Id"] == created_endpoint["Id"]
assert endpoint["Arn"] == created_endpoint["Arn"]
assert endpoint["Name"] == new_name
assert endpoint["SecurityGroupIds"] == created_endpoint["SecurityGroupIds"]
assert endpoint["Direction"] == created_endpoint["Direction"]
assert endpoint["IpAddressCount"] == created_endpoint["IpAddressCount"]
assert endpoint["HostVPCId"] == created_endpoint["HostVPCId"]
assert endpoint["Status"] == created_endpoint["Status"]
assert endpoint["StatusMessage"] == created_endpoint["StatusMessage"]
assert endpoint["CreationTime"] == created_endpoint["CreationTime"]
assert endpoint["ModificationTime"] != created_endpoint["ModificationTime"]
@mock_route53resolver
def test_route53resolver_bad_update_resolver_endpoint():
"""Test update_resolver_endpoint API calls with a bad ID."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
random_num = get_random_hex(10)
random_name = "Z" + get_random_hex(10)
# Use a resolver endpoint id that is too long.
long_id = "0123456789" * 6 + "xxxxx"
with pytest.raises(ClientError) as exc:
client.update_resolver_endpoint(ResolverEndpointId=long_id, Name=random_name)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
f"Value '{long_id}' at 'resolverEndpointId' failed to satisfy "
f"constraint: Member must have length less than or equal to 64"
) in err["Message"]
# Delete a non-existent resolver endpoint.
with pytest.raises(ClientError) as exc:
client.update_resolver_endpoint(ResolverEndpointId=random_num, Name=random_name)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert f"Resolver endpoint with ID '{random_num}' does not exist" in err["Message"]
@mock_ec2
@mock_route53resolver
def test_route53resolver_list_resolver_endpoint_ip_addresses():
"""Test good list_resolver_endpoint_ip_addresses API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
random_num = get_random_hex(10)
subnet_ids = create_subnets(ec2_client, create_vpc(ec2_client))
response = client.create_resolver_endpoint(
CreatorRequestId="B" + random_num,
Name="B" + random_num,
SecurityGroupIds=[create_security_group(ec2_client)],
Direction="INBOUND",
IpAddresses=[
{"SubnetId": subnet_ids[0], "Ip": "10.0.1.200"},
{"SubnetId": subnet_ids[1], "Ip": "10.0.0.20"},
{"SubnetId": subnet_ids[0], "Ip": "10.0.1.201"},
],
)
endpoint_id = response["ResolverEndpoint"]["Id"]
response = client.list_resolver_endpoint_ip_addresses(
ResolverEndpointId=endpoint_id
)
assert len(response["IpAddresses"]) == 3
assert response["MaxResults"] == 10
# Set max_results to return 1 address, use next_token to get remaining.
response = client.list_resolver_endpoint_ip_addresses(
ResolverEndpointId=endpoint_id, MaxResults=1
)
ip_addresses = response["IpAddresses"]
assert len(ip_addresses) == 1
assert response["MaxResults"] == 1
assert "NextToken" in response
assert ip_addresses[0]["IpId"].startswith("rni-")
assert ip_addresses[0]["SubnetId"] == subnet_ids[0]
assert ip_addresses[0]["Ip"] == "10.0.1.200"
assert ip_addresses[0]["Status"] == "ATTACHED"
assert ip_addresses[0]["StatusMessage"] == "This IP address is operational."
assert "CreationTime" in ip_addresses[0]
assert "ModificationTime" in ip_addresses[0]
response = client.list_resolver_endpoint_ip_addresses(
ResolverEndpointId=endpoint_id, NextToken=response["NextToken"]
)
assert len(response["IpAddresses"]) == 2
assert response["MaxResults"] == 10
assert "NextToken" not in response
@mock_ec2
@mock_route53resolver
def test_route53resolver_bad_list_resolver_endpoint_ip_addresses():
"""Test bad list_resolver_endpoint_ip_addresses API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Bad endpoint id.
with pytest.raises(ClientError) as exc:
client.list_resolver_endpoint_ip_addresses(ResolverEndpointId="foo")
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert "Resolver endpoint with ID 'foo' does not exist" in err["Message"]
# Good endpoint id, but bad max_results.
random_num = get_random_hex(10)
response = create_test_endpoint(client, ec2_client, name=f"A-{random_num}")
with pytest.raises(ClientError) as exc:
client.list_resolver_endpoint_ip_addresses(
ResolverEndpointId=response["Id"], MaxResults=250
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
"Value '250' at 'maxResults' failed to satisfy constraint: Member "
"must have length less than or equal to 100"
) in err["Message"]
@mock_ec2
@mock_route53resolver
def test_route53resolver_list_resolver_endpoints():
"""Test good list_resolver_endpoint API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
random_num = get_random_hex(10)
# List endpoints when there are none.
response = client.list_resolver_endpoints()
assert len(response["ResolverEndpoints"]) == 0
assert response["MaxResults"] == 10
assert "NextToken" not in response
# Create 5 endpoints, verify all 5 are listed when no filters, max_results.
for idx in range(4):
create_test_endpoint(client, ec2_client, name=f"A{idx}-{random_num}")
response = client.list_resolver_endpoints()
endpoints = response["ResolverEndpoints"]
assert len(endpoints) == 4
assert response["MaxResults"] == 10
for idx in range(4):
assert endpoints[idx]["Name"].startswith(f"A{idx}")
# Set max_results to return 1 endpoint, use next_token to get remaining 3.
response = client.list_resolver_endpoints(MaxResults=1)
endpoints = response["ResolverEndpoints"]
assert len(endpoints) == 1
assert response["MaxResults"] == 1
assert "NextToken" in response
assert endpoints[0]["Name"].startswith("A0")
response = client.list_resolver_endpoints(NextToken=response["NextToken"])
endpoints = response["ResolverEndpoints"]
assert len(endpoints) == 3
assert response["MaxResults"] == 10
assert "NextToken" not in response
for idx, endpoint in enumerate(endpoints):
assert endpoint["Name"].startswith(f"A{idx + 1}")
@mock_ec2
@mock_route53resolver
def test_route53resolver_bad_list_resolver_endpoints():
"""Test bad list_resolver_endpoints API calls."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Bad max_results.
random_num = get_random_hex(10)
create_test_endpoint(client, ec2_client, name=f"A-{random_num}")
with pytest.raises(ClientError) as exc:
client.list_resolver_endpoints(MaxResults=250)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "1 validation error detected" in err["Message"]
assert (
"Value '250' at 'maxResults' failed to satisfy constraint: Member "
"must have length less than or equal to 100"
) in err["Message"]

View File

@ -0,0 +1,150 @@
"""Route53Resolver-related unit tests focusing on tag-related functionality."""
import boto3
from botocore.exceptions import ClientError
import pytest
from moto import mock_route53resolver
from moto.ec2 import mock_ec2
from moto.route53resolver.models import ResolverEndpoint
from .test_route53resolver_endpoint import TEST_REGION, create_test_endpoint
@mock_ec2
@mock_route53resolver
def test_route53resolver_tag_resource():
"""Test the addition of tags to a resource."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
resolver_endpoint = create_test_endpoint(client, ec2_client)
# Unknown resolver endpoint id.
bad_arn = "foobar"
with pytest.raises(ClientError) as exc:
client.tag_resource(ResourceArn=bad_arn, Tags=[{"Key": "foo", "Value": "bar"}])
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert f"Resolver endpoint with ID '{bad_arn}' does not exist" in err["Message"]
# Too many tags.
tags = [
{"Key": f"{x}", "Value": f"{x}"}
for x in range(ResolverEndpoint.MAX_TAGS_PER_RESOLVER_ENDPOINT + 1)
]
with pytest.raises(ClientError) as exc:
client.tag_resource(ResourceArn=resolver_endpoint["Arn"], Tags=tags)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
f"'tags' failed to satisfy constraint: Member must have length less "
f"than or equal to {ResolverEndpoint.MAX_TAGS_PER_RESOLVER_ENDPOINT}"
) in err["Message"]
# Bad tags.
with pytest.raises(ClientError) as exc:
client.tag_resource(
ResourceArn=resolver_endpoint["Arn"],
Tags=[{"Key": "foo!", "Value": "bar"}],
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
"1 validation error detected: Value 'foo!' at 'tags.1.member.key' "
"failed to satisfy constraint: Member must satisfy regular "
"expression pattern"
) in err["Message"]
# Successful addition of tags.
added_tags = [{"Key": f"{x}", "Value": f"{x}"} for x in range(10)]
client.tag_resource(ResourceArn=resolver_endpoint["Arn"], Tags=added_tags)
result = client.list_tags_for_resource(ResourceArn=resolver_endpoint["Arn"])
assert len(result["Tags"]) == 10
assert result["Tags"] == added_tags
@mock_ec2
@mock_route53resolver
def test_route53resolver_untag_resource():
"""Test the removal of tags to a resource."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Create a resolver endpoint for testing purposes.
tag_list = [
{"Key": "one", "Value": "1"},
{"Key": "two", "Value": "2"},
{"Key": "three", "Value": "3"},
]
resolver_endpoint = create_test_endpoint(client, ec2_client, tags=tag_list)
# Untag all of the tags. Verify there are no more tags.
client.untag_resource(
ResourceArn=resolver_endpoint["Arn"], TagKeys=[x["Key"] for x in tag_list]
)
result = client.list_tags_for_resource(ResourceArn=resolver_endpoint["Arn"])
assert not result["Tags"]
assert "NextToken" not in result
@mock_ec2
@mock_route53resolver
def test_route53resolver_list_tags_for_resource():
"""Test ability to list all tags for a resource."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Create a resolver endpoint to work with.
tags = [
{"Key": f"{x}_k", "Value": f"{x}_v"}
for x in range(1, ResolverEndpoint.MAX_TAGS_PER_RESOLVER_ENDPOINT)
]
resolver_endpoint = create_test_endpoint(client, ec2_client, tags=tags)
# Verify limit and next token works.
result = client.list_tags_for_resource(
ResourceArn=resolver_endpoint["Arn"], MaxResults=1
)
assert len(result["Tags"]) == 1
assert result["Tags"] == [{"Key": "1_k", "Value": "1_v"}]
assert result["NextToken"]
result = client.list_tags_for_resource(
ResourceArn=resolver_endpoint["Arn"],
MaxResults=10,
NextToken=result["NextToken"],
)
assert len(result["Tags"]) == 10
assert result["Tags"] == [
{"Key": f"{x}_k", "Value": f"{x}_v"} for x in range(2, 12)
]
assert result["NextToken"]
@mock_ec2
@mock_route53resolver
def test_route53resolver_bad_list_tags_for_resource():
"""Test ability to list all tags for a resource."""
client = boto3.client("route53resolver", region_name=TEST_REGION)
ec2_client = boto3.client("ec2", region_name=TEST_REGION)
# Create a resolver endpoint to work with.
tags = [{"Key": "foo", "Value": "foobar"}]
resolver_endpoint = create_test_endpoint(client, ec2_client, tags=tags)
# Bad resolver endpoint ARN.
bad_arn = "xyz"
with pytest.raises(ClientError) as exc:
client.list_tags_for_resource(ResourceArn=bad_arn)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert f"Resolver endpoint with ID '{bad_arn}' does not exist" in err["Message"]
# Bad next token.
with pytest.raises(ClientError) as exc:
client.list_tags_for_resource(
ResourceArn=resolver_endpoint["Arn"], NextToken="foo"
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidNextTokenException"
assert "Invalid value passed for the NextToken parameter" in err["Message"]