moto/moto/route53/models.py

438 lines
16 KiB
Python
Raw Normal View History

from __future__ import unicode_literals
2015-01-17 19:50:19 +00:00
import itertools
from collections import defaultdict
import string
import random
2015-01-18 00:06:43 +00:00
import uuid
2015-01-17 19:50:19 +00:00
from jinja2 import Template
from moto.core import BaseBackend, CloudFormationModel
ROUTE53_ID_CHOICE = string.ascii_uppercase + string.digits
def create_route53_zone_id():
# New ID's look like this Z1RWWTK7Y8UDDQ
2019-10-31 15:44:26 +00:00
return "".join([random.choice(ROUTE53_ID_CHOICE) for _ in range(0, 15)])
2013-11-14 19:14:14 +00:00
class HealthCheck(CloudFormationModel):
def __init__(self, health_check_id, caller_reference, health_check_args):
2015-01-18 00:06:43 +00:00
self.id = health_check_id
self.ip_address = health_check_args.get("ip_address")
self.port = health_check_args.get("port") or 80
self.type_ = health_check_args.get("type")
2015-01-18 00:06:43 +00:00
self.resource_path = health_check_args.get("resource_path")
self.fqdn = health_check_args.get("fqdn")
self.search_string = health_check_args.get("search_string")
self.request_interval = health_check_args.get("request_interval") or 30
self.failure_threshold = health_check_args.get("failure_threshold") or 3
self.health_threshold = health_check_args.get("health_threshold")
self.measure_latency = health_check_args.get("measure_latency") or False
self.inverted = health_check_args.get("inverted") or False
self.disabled = health_check_args.get("disabled") or False
self.enable_sni = health_check_args.get("enable_sni") or False
self.children = health_check_args.get("children") or None
self.caller_reference = caller_reference
2015-01-18 00:06:43 +00:00
@property
def physical_resource_id(self):
return self.id
@staticmethod
def cloudformation_name_type():
return None
@staticmethod
def cloudformation_type():
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-route53-healthcheck.html
return "AWS::Route53::HealthCheck"
2015-01-18 00:06:43 +00:00
@classmethod
2019-10-31 15:44:26 +00:00
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
properties = cloudformation_json["Properties"]["HealthCheckConfig"]
2015-01-18 00:06:43 +00:00
health_check_args = {
2019-10-31 15:44:26 +00:00
"ip_address": properties.get("IPAddress"),
"port": properties.get("Port"),
"type": properties["Type"],
"resource_path": properties.get("ResourcePath"),
"fqdn": properties.get("FullyQualifiedDomainName"),
"search_string": properties.get("SearchString"),
"request_interval": properties.get("RequestInterval"),
"failure_threshold": properties.get("FailureThreshold"),
2015-01-18 00:06:43 +00:00
}
health_check = route53_backend.create_health_check(
caller_reference=resource_name, health_check_args=health_check_args
)
2015-01-18 00:06:43 +00:00
return health_check
def to_xml(self):
2019-10-31 15:44:26 +00:00
template = Template(
"""<HealthCheck>
2015-01-18 00:06:43 +00:00
<Id>{{ health_check.id }}</Id>
<CallerReference>{{ health_check.caller_reference }}</CallerReference>
2015-01-18 00:06:43 +00:00
<HealthCheckConfig>
{% if health_check.type_ != "CALCULATED" %}
<IPAddress>{{ health_check.ip_address }}</IPAddress>
<Port>{{ health_check.port }}</Port>
{% endif %}
<Type>{{ health_check.type_ }}</Type>
{% if health_check.resource_path %}
<ResourcePath>{{ health_check.resource_path }}</ResourcePath>
{% endif %}
{% if health_check.fqdn %}
<FullyQualifiedDomainName>{{ health_check.fqdn }}</FullyQualifiedDomainName>
{% endif %}
{% if health_check.type_ != "CALCULATED" %}
<RequestInterval>{{ health_check.request_interval }}</RequestInterval>
<FailureThreshold>{{ health_check.failure_threshold }}</FailureThreshold>
<MeasureLatency>{{ health_check.measure_latency }}</MeasureLatency>
{% endif %}
{% if health_check.type_ == "CALCULATED" %}
<HealthThreshold>{{ health_check.health_threshold }}</HealthThreshold>
{% endif %}
<Inverted>{{ health_check.inverted }}</Inverted>
<Disabled>{{ health_check.disabled }}</Disabled>
<EnableSNI>{{ health_check.enable_sni }}</EnableSNI>
2015-01-18 00:06:43 +00:00
{% if health_check.search_string %}
<SearchString>{{ health_check.search_string }}</SearchString>
{% endif %}
{% if health_check.children %}
<ChildHealthChecks>
{% for child in health_check.children %}
<member>{{ child }}</member>
{% endfor %}
</ChildHealthChecks>
{% endif %}
2015-01-18 00:06:43 +00:00
</HealthCheckConfig>
<HealthCheckVersion>1</HealthCheckVersion>
2019-10-31 15:44:26 +00:00
</HealthCheck>"""
)
2015-01-18 00:06:43 +00:00
return template.render(health_check=self)
class RecordSet(CloudFormationModel):
2015-01-17 19:50:19 +00:00
def __init__(self, kwargs):
2019-10-31 15:44:26 +00:00
self.name = kwargs.get("Name")
self.type_ = kwargs.get("Type")
self.ttl = kwargs.get("TTL")
self.records = kwargs.get("ResourceRecords", [])
self.set_identifier = kwargs.get("SetIdentifier")
self.weight = kwargs.get("Weight")
self.region = kwargs.get("Region")
self.health_check = kwargs.get("HealthCheckId")
self.hosted_zone_name = kwargs.get("HostedZoneName")
self.hosted_zone_id = kwargs.get("HostedZoneId")
self.alias_target = kwargs.get("AliasTarget")
self.failover = kwargs.get("Failover")
self.geo_location = kwargs.get("GeoLocation")
2015-01-17 19:50:19 +00:00
@staticmethod
def cloudformation_name_type():
return "Name"
@staticmethod
def cloudformation_type():
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-route53-recordset.html
return "AWS::Route53::RecordSet"
@classmethod
2019-10-31 15:44:26 +00:00
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
properties = cloudformation_json["Properties"]
zone_name = properties.get("HostedZoneName")
if zone_name:
hosted_zone = route53_backend.get_hosted_zone_by_name(zone_name)
else:
2019-10-31 15:44:26 +00:00
hosted_zone = route53_backend.get_hosted_zone(properties["HostedZoneId"])
record_set = hosted_zone.add_rrset(properties)
return record_set
@classmethod
2019-10-31 15:44:26 +00:00
def update_from_cloudformation_json(
cls, original_resource, new_resource_name, cloudformation_json, region_name
):
2017-02-24 02:37:43 +00:00
cls.delete_from_cloudformation_json(
2019-10-31 15:44:26 +00:00
original_resource.name, cloudformation_json, region_name
)
return cls.create_from_cloudformation_json(
new_resource_name, cloudformation_json, region_name
)
@classmethod
2019-10-31 15:44:26 +00:00
def delete_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
2017-02-24 02:37:43 +00:00
# this will break if you changed the zone the record is in,
# unfortunately
2019-10-31 15:44:26 +00:00
properties = cloudformation_json["Properties"]
zone_name = properties.get("HostedZoneName")
if zone_name:
hosted_zone = route53_backend.get_hosted_zone_by_name(zone_name)
else:
2019-10-31 15:44:26 +00:00
hosted_zone = route53_backend.get_hosted_zone(properties["HostedZoneId"])
try:
2019-10-31 15:44:26 +00:00
hosted_zone.delete_rrset({"Name": resource_name})
except KeyError:
pass
@property
def physical_resource_id(self):
return self.name
2015-01-17 19:50:19 +00:00
def to_xml(self):
2019-10-31 15:44:26 +00:00
template = Template(
"""<ResourceRecordSet>
2015-01-17 19:50:19 +00:00
<Name>{{ record_set.name }}</Name>
<Type>{{ record_set.type_ }}</Type>
{% if record_set.set_identifier %}
<SetIdentifier>{{ record_set.set_identifier }}</SetIdentifier>
{% endif %}
{% if record_set.weight %}
<Weight>{{ record_set.weight }}</Weight>
{% endif %}
{% if record_set.region %}
<Region>{{ record_set.region }}</Region>
{% endif %}
{% if record_set.ttl %}
<TTL>{{ record_set.ttl }}</TTL>
{% endif %}
{% if record_set.failover %}
<Failover>{{ record_set.failover }}</Failover>
{% endif %}
{% if record_set.geo_location %}
<GeoLocation>
{% for geo_key in ['ContinentCode','CountryCode','SubdivisionCode'] %}
{% if record_set.geo_location[geo_key] %}<{{ geo_key }}>{{ record_set.geo_location[geo_key] }}</{{ geo_key }}>{% endif %}
{% endfor %}
</GeoLocation>
{% endif %}
{% if record_set.alias_target %}
<AliasTarget>
<HostedZoneId>{{ record_set.alias_target['HostedZoneId'] }}</HostedZoneId>
<DNSName>{{ record_set.alias_target['DNSName'] }}</DNSName>
<EvaluateTargetHealth>{{ record_set.alias_target['EvaluateTargetHealth'] }}</EvaluateTargetHealth>
</AliasTarget>
{% else %}
2015-01-17 19:50:19 +00:00
<ResourceRecords>
{% for record in record_set.records %}
<ResourceRecord>
<Value>{{ record|e }}</Value>
2015-01-17 19:50:19 +00:00
</ResourceRecord>
{% endfor %}
</ResourceRecords>
{% endif %}
2015-01-18 00:06:43 +00:00
{% if record_set.health_check %}
<HealthCheckId>{{ record_set.health_check }}</HealthCheckId>
{% endif %}
2019-10-31 15:44:26 +00:00
</ResourceRecordSet>"""
)
2015-01-17 19:50:19 +00:00
return template.render(record_set=self)
def delete(self, *args, **kwargs):
"""Not exposed as part of the Route 53 API - used for CloudFormation. args are ignored"""
2019-10-31 15:44:26 +00:00
hosted_zone = route53_backend.get_hosted_zone_by_name(self.hosted_zone_name)
if not hosted_zone:
hosted_zone = route53_backend.get_hosted_zone(self.hosted_zone_id)
2019-10-31 15:44:26 +00:00
hosted_zone.delete_rrset({"Name": self.name, "Type": self.type_})
2015-01-17 19:50:19 +00:00
def reverse_domain_name(domain_name):
2019-10-31 15:44:26 +00:00
if domain_name.endswith("."): # normalize without trailing dot
domain_name = domain_name[:-1]
2019-10-31 15:44:26 +00:00
return ".".join(reversed(domain_name.split(".")))
2015-01-17 19:50:19 +00:00
class FakeZone(CloudFormationModel):
def __init__(self, name, id_, private_zone, comment=None):
2013-11-14 19:14:14 +00:00
self.name = name
2014-08-30 01:14:24 +00:00
self.id = id_
if comment is not None:
self.comment = comment
self.private_zone = private_zone
2015-01-17 15:17:25 +00:00
self.rrsets = []
2013-11-14 19:14:14 +00:00
2015-01-17 19:50:19 +00:00
def add_rrset(self, record_set):
record_set = RecordSet(record_set)
self.rrsets.append(record_set)
return record_set
2013-11-14 19:14:14 +00:00
def upsert_rrset(self, record_set):
new_rrset = RecordSet(record_set)
for i, rrset in enumerate(self.rrsets):
2019-10-31 15:44:26 +00:00
if (
rrset.name == new_rrset.name
and rrset.type_ == new_rrset.type_
and rrset.set_identifier == new_rrset.set_identifier
):
self.rrsets[i] = new_rrset
break
else:
self.rrsets.append(new_rrset)
return new_rrset
2019-06-17 13:53:32 +00:00
def delete_rrset(self, rrset):
2017-02-24 02:37:43 +00:00
self.rrsets = [
2019-06-17 13:53:32 +00:00
record_set
for record_set in self.rrsets
2019-10-31 15:44:26 +00:00
if record_set.name != rrset["Name"]
or (rrset.get("Type") is not None and record_set.type_ != rrset["Type"])
2019-06-17 13:53:32 +00:00
]
2015-01-17 19:50:19 +00:00
def delete_rrset_by_id(self, set_identifier):
2017-02-24 02:37:43 +00:00
self.rrsets = [
2019-10-31 15:44:26 +00:00
record_set
for record_set in self.rrsets
if record_set.set_identifier != set_identifier
]
def get_record_sets(self, start_type, start_name):
def predicate(rrset):
rrset_name_reversed = reverse_domain_name(rrset.name)
start_name_reversed = reverse_domain_name(start_name)
return rrset_name_reversed < start_name_reversed or (
rrset_name_reversed == start_name_reversed and rrset.type_ < start_type
)
record_sets = sorted(
self.rrsets,
key=lambda rrset: (reverse_domain_name(rrset.name), rrset.type_),
)
if start_name:
start_type = start_type or ""
record_sets = itertools.dropwhile(predicate, record_sets)
2015-01-17 19:50:19 +00:00
return record_sets
2015-01-17 15:17:25 +00:00
@property
def physical_resource_id(self):
2017-10-21 21:10:45 +00:00
return self.id
2015-01-17 15:17:25 +00:00
@staticmethod
def cloudformation_name_type():
return "Name"
@staticmethod
def cloudformation_type():
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-route53-hostedzone.html
return "AWS::Route53::HostedZone"
2015-01-17 15:17:25 +00:00
@classmethod
2019-10-31 15:44:26 +00:00
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
hosted_zone = route53_backend.create_hosted_zone(
resource_name, private_zone=False
)
2015-01-17 15:17:25 +00:00
return hosted_zone
class RecordSetGroup(CloudFormationModel):
2015-01-17 19:50:19 +00:00
def __init__(self, hosted_zone_id, record_sets):
self.hosted_zone_id = hosted_zone_id
2015-01-17 15:17:25 +00:00
self.record_sets = record_sets
2015-01-17 19:50:19 +00:00
@property
def physical_resource_id(self):
return "arn:aws:route53:::hostedzone/{0}".format(self.hosted_zone_id)
@staticmethod
def cloudformation_name_type():
return None
@staticmethod
def cloudformation_type():
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-route53-recordsetgroup.html
return "AWS::Route53::RecordSetGroup"
2015-01-17 15:17:25 +00:00
@classmethod
2019-10-31 15:44:26 +00:00
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
properties = cloudformation_json["Properties"]
2015-01-17 15:17:25 +00:00
zone_name = properties.get("HostedZoneName")
if zone_name:
hosted_zone = route53_backend.get_hosted_zone_by_name(zone_name)
else:
hosted_zone = route53_backend.get_hosted_zone(properties["HostedZoneId"])
2015-01-17 15:17:25 +00:00
record_sets = properties["RecordSets"]
for record_set in record_sets:
2015-01-17 19:50:19 +00:00
hosted_zone.add_rrset(record_set)
2015-01-17 15:17:25 +00:00
2015-01-17 19:50:19 +00:00
record_set_group = RecordSetGroup(hosted_zone.id, record_sets)
2015-01-17 15:17:25 +00:00
return record_set_group
2013-11-14 19:14:14 +00:00
class Route53Backend(BaseBackend):
def __init__(self):
self.zones = {}
2015-01-18 00:06:43 +00:00
self.health_checks = {}
self.resource_tags = defaultdict(dict)
2013-11-14 19:14:14 +00:00
def create_hosted_zone(self, name, private_zone, comment=None):
new_id = create_route53_zone_id()
2019-10-31 15:44:26 +00:00
new_zone = FakeZone(name, new_id, private_zone=private_zone, comment=comment)
2013-11-14 19:14:14 +00:00
self.zones[new_id] = new_zone
return new_zone
def change_tags_for_resource(self, resource_id, tags):
2019-10-31 15:44:26 +00:00
if "Tag" in tags:
if isinstance(tags["Tag"], list):
for tag in tags["Tag"]:
self.resource_tags[resource_id][tag["Key"]] = tag["Value"]
else:
2019-10-31 15:44:26 +00:00
key, value = (tags["Tag"]["Key"], tags["Tag"]["Value"])
self.resource_tags[resource_id][key] = value
else:
2019-10-31 15:44:26 +00:00
if "Key" in tags:
if isinstance(tags["Key"], list):
for key in tags["Key"]:
del self.resource_tags[resource_id][key]
else:
2019-10-31 15:44:26 +00:00
del self.resource_tags[resource_id][tags["Key"]]
def list_tags_for_resource(self, resource_id):
if resource_id in self.resource_tags:
return self.resource_tags[resource_id]
return {}
2013-11-14 19:14:14 +00:00
def get_all_hosted_zones(self):
return self.zones.values()
2014-08-30 01:14:24 +00:00
def get_hosted_zone(self, id_):
return self.zones.get(id_.replace("/hostedzone/", ""))
2013-11-14 19:14:14 +00:00
2015-01-17 15:17:25 +00:00
def get_hosted_zone_by_name(self, name):
for zone in self.get_all_hosted_zones():
if zone.name == name:
return zone
2014-08-30 01:14:24 +00:00
def delete_hosted_zone(self, id_):
return self.zones.pop(id_.replace("/hostedzone/", ""), None)
2013-11-14 19:14:14 +00:00
def create_health_check(self, caller_reference, health_check_args):
2015-01-18 00:06:43 +00:00
health_check_id = str(uuid.uuid4())
health_check = HealthCheck(health_check_id, caller_reference, health_check_args)
2015-01-18 00:06:43 +00:00
self.health_checks[health_check_id] = health_check
return health_check
def get_health_checks(self):
return self.health_checks.values()
def delete_health_check(self, health_check_id):
return self.health_checks.pop(health_check_id, None)
2013-11-14 19:14:14 +00:00
2017-02-24 02:37:43 +00:00
2013-11-14 19:14:14 +00:00
route53_backend = Route53Backend()