Implement ec2 dhcp_options

This commit is contained in:
Ilya Sukhanov 2014-04-15 19:02:26 -04:00
parent e76b4c1250
commit 15b750a277
5 changed files with 428 additions and 10 deletions

View File

@ -1,4 +1,60 @@
from werkzeug.exceptions import BadRequest
from jinja2 import Template
class InvalidIdError(RuntimeError):
def __init__(self, id_value):
super(InvalidIdError, self).__init__()
self.id = id_value
class EC2ClientError(BadRequest):
def __init__(self, code, message):
super(EC2ClientError, self).__init__()
self.description = ERROR_RESPONSE_TEMPLATE.render(
code=code, message=message)
class DependencyViolationError(EC2ClientError):
def __init__(self, message):
super(DependencyViolationError, self).__init__(
"DependencyViolation", message)
class InvalidDHCPOptionsIdError(EC2ClientError):
def __init__(self, dhcp_options_id):
super(InvalidDHCPOptionsIdError, self).__init__(
"InvalidDhcpOptionID.NotFound",
"DhcpOptionID {0} does not exist."
.format(dhcp_options_id))
class InvalidVPCIdError(EC2ClientError):
def __init__(self, vpc_id):
super(InvalidVPCIdError, self).__init__(
"InvalidVpcID.NotFound",
"VpcID {0} does not exist."
.format(vpc_id))
class InvalidParameterValueError(EC2ClientError):
def __init__(self, parameter_value):
super(InvalidParameterValueError, self).__init__(
"InvalidParameterValue",
"Value ({0}) for parameter value is invalid. Invalid DHCP option value.".format(
parameter_value))
ERROR_RESPONSE = u"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Errors>
<Error>
<Code>{{code}}</Code>
<Message>{{message}}</Message>
</Error>
</Errors>
<RequestID>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestID>
</Response>
"""
ERROR_RESPONSE_TEMPLATE = Template(ERROR_RESPONSE)

View File

@ -5,9 +5,14 @@ from collections import defaultdict
from boto.ec2.instance import Instance as BotoInstance, Reservation
from moto.core import BaseBackend
from .exceptions import InvalidIdError
from .exceptions import (
InvalidIdError,
DependencyViolationError,
InvalidDHCPOptionsIdError
)
from .utils import (
random_ami_id,
random_dhcp_option_id,
random_eip_allocation_id,
random_eip_association_id,
random_gateway_id,
@ -645,6 +650,7 @@ class VPC(object):
def __init__(self, vpc_id, cidr_block):
self.id = vpc_id
self.cidr_block = cidr_block
self.dhcp_options = None
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json):
@ -678,7 +684,12 @@ class VPCBackend(object):
return self.vpcs.values()
def delete_vpc(self, vpc_id):
return self.vpcs.pop(vpc_id, None)
vpc = self.vpcs.pop(vpc_id, None)
if vpc and vpc.dhcp_options:
vpc.dhcp_options.vpc = None
self.delete_dhcp_options_set(vpc.dhcp_options.id)
vpc.dhcp_options = None
return vpc
class Subnet(object):
@ -1039,12 +1050,70 @@ class ElasticAddressBackend(object):
return False
class DHCPOptionsSet(object):
def __init__(self, domain_name_servers=None, domain_name=None,
ntp_servers=None, netbios_name_servers=None,
netbios_node_type=None):
self._options = {
"domain-name-servers": domain_name_servers,
"domain-name": domain_name,
"ntp-servers": ntp_servers,
"netbios-name-servers": netbios_name_servers,
"netbios-node-type": netbios_node_type,
}
self.id = random_dhcp_option_id()
self.vpc = None
@property
def options(self):
return self._options
class DHCPOptionsSetBackend(object):
def __init__(self):
self.dhcp_options_sets = {}
super(DHCPOptionsSetBackend, self).__init__()
def associate_dhcp_options(self, dhcp_options, vpc):
dhcp_options.vpc = vpc
vpc.dhcp_options = dhcp_options
def create_dhcp_options(
self, domain_name_servers=None, domain_name=None,
ntp_servers=None, netbios_name_servers=None,
netbios_node_type=None):
options = DHCPOptionsSet(
domain_name_servers, domain_name, ntp_servers,
netbios_name_servers, netbios_node_type
)
self.dhcp_options_sets[options.id] = options
return options
def describe_dhcp_options(self, options_ids=None):
options_sets = []
for option_id in options_ids or []:
if option_id in self.dhcp_options_sets:
options_sets.append(self.dhcp_options_sets[option_id])
else:
raise InvalidDHCPOptionsIdError(option_id)
return options_sets or self.dhcp_options_sets.values()
def delete_dhcp_options_set(self, options_id):
if options_id in self.dhcp_options_sets:
if self.dhcp_options_sets[options_id].vpc:
raise DependencyViolationError("Cannot delete assigned DHCP options.")
dhcp_opt = self.dhcp_options_sets.pop(options_id)
else:
raise InvalidDHCPOptionsIdError(options_id)
return True
class EC2Backend(BaseBackend, InstanceBackend, TagBackend, AmiBackend,
RegionsAndZonesBackend, SecurityGroupBackend, EBSBackend,
VPCBackend, SubnetBackend, SubnetRouteTableAssociationBackend,
RouteTableBackend, RouteBackend, InternetGatewayBackend,
VPCGatewayAttachmentBackend, SpotRequestBackend,
ElasticAddressBackend, KeyPairBackend):
ElasticAddressBackend, KeyPairBackend, DHCPOptionsSetBackend):
pass

View File

@ -1,15 +1,156 @@
from jinja2 import Template
from moto.core.responses import BaseResponse
from moto.ec2.utils import (
dhcp_configuration_from_querystring,
sequence_from_querystring)
from moto.ec2.models import ec2_backend
from moto.ec2.exceptions import(
InvalidVPCIdError,
InvalidParameterValueError
)
NETBIOS_NODE_TYPES = [1, 2, 4, 8]
class DHCPOptions(BaseResponse):
def associate_dhcp_options(self):
raise NotImplementedError('DHCPOptions(AmazonVPC).associate_dhcp_options is not yet implemented')
dhcp_opt_id = self.querystring.get("DhcpOptionsId", [None])[0]
vpc_id = self.querystring.get("VpcId", [None])[0]
dhcp_opt = ec2_backend.describe_dhcp_options([dhcp_opt_id])[0]
vpc = ec2_backend.get_vpc(vpc_id)
if not vpc:
raise InvalidVPCIdError(vpc_id)
ec2_backend.associate_dhcp_options(dhcp_opt, vpc)
template = Template(ASSOCIATE_DHCP_OPTIONS_RESPONSE)
return template.render()
def create_dhcp_options(self):
raise NotImplementedError('DHCPOptions(AmazonVPC).create_dhcp_options is not yet implemented')
dhcp_config = dhcp_configuration_from_querystring(self.querystring)
# TODO validate we only got the options we know about
domain_name_servers = dhcp_config.get("domain-name-servers", None)
domain_name = dhcp_config.get("domain-name", None)
ntp_servers = dhcp_config.get("ntp-servers", None)
netbios_name_servers = dhcp_config.get("netbios-name-servers", None)
netbios_node_type = dhcp_config.get("netbios-node-type", None)
for field_value in domain_name_servers, ntp_servers, netbios_name_servers:
if field_value and len(field_value) > 4:
raise InvalidParameterValueError(",".join(field_value))
if netbios_node_type and netbios_node_type[0] not in NETBIOS_NODE_TYPES:
raise InvalidParameterValueError(netbios_node_type)
dhcp_options_set = ec2_backend.create_dhcp_options(
domain_name_servers=domain_name_servers,
domain_name=domain_name,
ntp_servers=ntp_servers,
netbios_name_servers=netbios_name_servers,
netbios_node_type=netbios_node_type
)
template = Template(CREATE_DHCP_OPTIONS_RESPONSE)
return template.render(dhcp_options_set=dhcp_options_set)
def delete_dhcp_options(self):
raise NotImplementedError('DHCPOptions(AmazonVPC).delete_dhcp_options is not yet implemented')
# TODO InvalidDhcpOptionsId.Malformed
delete_status = False
if "DhcpOptionsId" in self.querystring:
dhcp_opt_id = self.querystring["DhcpOptionsId"][0]
delete_status = ec2_backend.delete_dhcp_options_set(dhcp_opt_id)
template = Template(DELETE_DHCP_OPTIONS_RESPONSE)
return template.render(delete_status=delete_status)
def describe_dhcp_options(self):
raise NotImplementedError('DHCPOptions(AmazonVPC).describe_dhcp_options is not yet implemented')
if "Filter.1.Name" in self.querystring:
raise NotImplementedError("Filtering not supported in describe_dhcp_options.")
elif "DhcpOptionsId.1" in self.querystring:
dhcp_opt_ids = sequence_from_querystring("DhcpOptionsId", self.querystring)
dhcp_opt = ec2_backend.describe_dhcp_options(dhcp_opt_ids)
else:
dhcp_opt = ec2_backend.describe_dhcp_options()
template = Template(DESCRIBE_DHCP_OPTIONS_RESPONSE)
return template.render(dhcp_options=dhcp_opt)
CREATE_DHCP_OPTIONS_RESPONSE = u"""
<CreateDhcpOptionsResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>
<dhcpOptions>
<dhcpOptionsId>{{ dhcp_options_set.id }}</dhcpOptionsId>
<dhcpConfigurationSet>
{% for key, values in dhcp_options_set.options.iteritems() %}
{{ values }}
{% if values %}
<item>
<key>{{key}}</key>
<valueSet>
{% for value in values %}
<item>
<value>{{ value }}</value>
</item>
{% endfor %}
</valueSet>
</item>
{% endif %}
{% endfor %}
</dhcpConfigurationSet>
<tagSet/>
</dhcpOptions>
</CreateDhcpOptionsResponse>
"""
DELETE_DHCP_OPTIONS_RESPONSE = u"""
<DeleteDhcpOptionsResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>
<return>{{delete_status}}</return>
</DeleteDhcpOptionsResponse>
"""
DESCRIBE_DHCP_OPTIONS_RESPONSE = u"""
<DescribeDhcpOptionsResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>
<item>
{% for dhcp_options_set in dhcp_options %}
<dhcpOptions>
<dhcpOptionsId>{{ dhcp_options_set.id }}</dhcpOptionsId>
<dhcpConfigurationSet>
{% for key, values in dhcp_options_set.options.iteritems() %}
{{ values }}
{% if values %}
<item>
<key>{{key}}</key>
<valueSet>
{% for value in values %}
<item>
<value>{{ value }}</value>
</item>
{% endfor %}
</valueSet>
</item>
{% endif %}
{% endfor %}
</dhcpConfigurationSet>
<tagSet/>
</dhcpOptions>
{% endfor %}
</item>
</DescribeDhcpOptionsResponse>
"""
ASSOCIATE_DHCP_OPTIONS_RESPONSE = u"""
<AssociateDhcpOptionsResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>
<return>true</return>
</AssociateDhcpOptionsResponse>
"""

View File

@ -62,6 +62,10 @@ def random_eip_allocation_id():
return random_id(prefix='eipalloc')
def random_dhcp_option_id():
return random_id(prefix='dopt')
def random_ip():
return "127.{0}.{1}.{2}".format(
random.randint(0, 255),
@ -112,6 +116,44 @@ def resource_ids_from_querystring(querystring_dict):
return response_values
def dhcp_configuration_from_querystring(querystring, option=u'DhcpConfiguration'):
"""
turn:
{u'AWSAccessKeyId': [u'the_key'],
u'Action': [u'CreateDhcpOptions'],
u'DhcpConfiguration.1.Key': [u'domain-name'],
u'DhcpConfiguration.1.Value.1': [u'example.com'],
u'DhcpConfiguration.2.Key': [u'domain-name-servers'],
u'DhcpConfiguration.2.Value.1': [u'10.0.0.6'],
u'DhcpConfiguration.2.Value.2': [u'10.0.0.7'],
u'Signature': [u'uUMHYOoLM6r+sT4fhYjdNT6MHw22Wj1mafUpe0P0bY4='],
u'SignatureMethod': [u'HmacSHA256'],
u'SignatureVersion': [u'2'],
u'Timestamp': [u'2014-03-18T21:54:01Z'],
u'Version': [u'2013-10-15']}
into:
{u'domain-name': [u'example.com'], u'domain-name-servers': [u'10.0.0.6', u'10.0.0.7']}
"""
key_needle = re.compile(u'{0}.[0-9]+.Key'.format(option), re.UNICODE)
response_values = {}
for key, value in querystring.iteritems():
if key_needle.match(key):
values = []
key_index = key.split(".")[1]
value_index = 1
while True:
value_key = u'{0}.{1}.Value.{2}'.format(option, key_index, value_index)
if value_key in querystring:
values.extend(querystring[value_key])
else:
break
value_index += 1
response_values[value[0]] = values
return response_values
def filters_from_querystring(querystring_dict):
response_values = {}
for key, value in querystring_dict.iteritems():
@ -131,6 +173,7 @@ def keypair_names_from_querystring(querystring_dict):
keypair_names.append(value[0])
return keypair_names
filter_dict_attribute_mapping = {
'instance-state-name': 'state'
}
@ -161,7 +204,7 @@ def filter_reservations(reservations, filter_dict):
return result
# not really random
# not really random ( http://xkcd.com/221/ )
def random_key_pair():
return {
'fingerprint': ('1f:51:ae:28:bf:89:e9:d8:1f:25:5d:37:2d:'

View File

@ -1,9 +1,118 @@
import boto
from boto.exception import EC2ResponseError
import sure # noqa
from moto import mock_ec2
SAMPLE_DOMAIN_NAME = u'example.com'
SAMPLE_NAME_SERVERS = [u'10.0.0.6', u'10.0.0.7']
@mock_ec2
def test_dhcp_options():
pass
def test_dhcp_options_associate():
""" associate dhcp option """
conn = boto.connect_vpc('the_key', 'the_secret')
dhcp_options = conn.create_dhcp_options(SAMPLE_DOMAIN_NAME, SAMPLE_NAME_SERVERS)
vpc = conn.create_vpc("10.0.0.0/16")
rval = conn.associate_dhcp_options(dhcp_options.id, vpc.id)
rval.should.be.equal(True)
@mock_ec2
def test_dhcp_options_associate_invalid_dhcp_id():
""" associate dhcp option bad dhcp options id """
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
conn.associate_dhcp_options.when.called_with("foo", vpc.id).should.throw(EC2ResponseError)
@mock_ec2
def test_dhcp_options_associate_invalid_vpc_id():
""" associate dhcp option invalid vpc id """
conn = boto.connect_vpc('the_key', 'the_secret')
dhcp_options = conn.create_dhcp_options(SAMPLE_DOMAIN_NAME, SAMPLE_NAME_SERVERS)
conn.associate_dhcp_options.when.called_with(dhcp_options.id, "foo").should.throw(EC2ResponseError)
@mock_ec2
def test_dhcp_options_delete_with_vpc():
"""Test deletion of dhcp options with vpc"""
conn = boto.connect_vpc('the_key', 'the_secret')
dhcp_options = conn.create_dhcp_options(SAMPLE_DOMAIN_NAME, SAMPLE_NAME_SERVERS)
dhcp_options_id = dhcp_options.id
vpc = conn.create_vpc("10.0.0.0/16")
rval = conn.associate_dhcp_options(dhcp_options_id, vpc.id)
rval.should.be.equal(True)
#conn.delete_dhcp_options(dhcp_options_id)
conn.delete_dhcp_options.when.called_with(dhcp_options_id).should.throw(EC2ResponseError)
vpc.delete()
conn.get_all_dhcp_options.when.called_with([dhcp_options_id]).should.throw(EC2ResponseError)
@mock_ec2
def test_create_dhcp_options():
"""Create most basic dhcp option"""
conn = boto.connect_vpc('the_key', 'the_secret')
dhcp_option = conn.create_dhcp_options(SAMPLE_DOMAIN_NAME, SAMPLE_NAME_SERVERS)
dhcp_option.options[u'domain-name'][0].should.be.equal(SAMPLE_DOMAIN_NAME)
dhcp_option.options[u'domain-name-servers'][0].should.be.equal(SAMPLE_NAME_SERVERS[0])
dhcp_option.options[u'domain-name-servers'][1].should.be.equal(SAMPLE_NAME_SERVERS[1])
@mock_ec2
def test_create_dhcp_options_invalid_options():
"""Create invalid dhcp options"""
conn = boto.connect_vpc('the_key', 'the_secret')
servers = ["f", "f", "f", "f", "f"]
conn.create_dhcp_options.when.called_with(ntp_servers=servers).should.throw(EC2ResponseError)
conn.create_dhcp_options.when.called_with(netbios_node_type="0").should.throw(EC2ResponseError)
@mock_ec2
def test_describe_dhcp_options():
"""Test dhcp options lookup by id"""
conn = boto.connect_vpc('the_key', 'the_secret')
dhcp_option = conn.create_dhcp_options()
dhcp_options = conn.get_all_dhcp_options([dhcp_option.id])
dhcp_options.should.be.length_of(1)
dhcp_options = conn.get_all_dhcp_options()
dhcp_options.should.be.length_of(1)
@mock_ec2
def test_describe_dhcp_options_invalid_id():
"""get error on invalid dhcp_option_id lookup"""
conn = boto.connect_vpc('the_key', 'the_secret')
conn.get_all_dhcp_options.when.called_with(["1"]).should.throw(EC2ResponseError)
@mock_ec2
def test_delete_dhcp_options():
"""delete dhcp option"""
conn = boto.connect_vpc('the_key', 'the_secret')
dhcp_option = conn.create_dhcp_options()
dhcp_options = conn.get_all_dhcp_options([dhcp_option.id])
dhcp_options.should.be.length_of(1)
conn.delete_dhcp_options(dhcp_option.id) # .should.be.equal(True)
conn.get_all_dhcp_options.when.called_with([dhcp_option.id]).should.throw(EC2ResponseError)
@mock_ec2
def test_delete_dhcp_options_invalid_id():
conn = boto.connect_vpc('the_key', 'the_secret')
dhcp_option = conn.create_dhcp_options()
conn.delete_dhcp_options.when.called_with("1").should.throw(EC2ResponseError)