Implement Elastic IP

This commit is contained in:
Ilya Sukhanov 2013-09-03 21:47:16 -04:00
parent a63601e481
commit f8f8d25426
4 changed files with 415 additions and 9 deletions

View File

@ -15,6 +15,9 @@ from .utils import (
random_subnet_id, random_subnet_id,
random_volume_id, random_volume_id,
random_vpc_id, random_vpc_id,
random_eip_association_id,
random_eip_allocation_id,
random_ip,
) )
@ -575,9 +578,92 @@ class SpotRequestBackend(object):
return requests return requests
class ElasticAddress():
def __init__(self, domain):
self.public_ip = random_ip()
self.allocation_id = random_eip_allocation_id() if domain == "vpc" else None
self.domain = domain
self.instance = None
self.association_id = None
class ElasticAddressBackend(object):
def __init__(self):
self.addresses = []
super(ElasticAddressBackend, self).__init__()
def allocate_address(self, domain):
address = ElasticAddress(domain)
self.addresses.append(address)
return address
def address_by_ip(self, ips):
return [address for address in self.addresses
if address.public_ip in ips]
def address_by_allocation(self, allocation_ids):
return [address for address in self.addresses
if address.allocation_id in allocation_ids]
def address_by_association(self, association_ids):
return [address for address in self.addresses
if address.association_id in association_ids]
def associate_address(self, instance, address=None, allocation_id=None, reassociate=False):
eips = []
if address:
eips = self.address_by_ip([address])
elif allocation_id:
eips = self.address_by_allocation([allocation_id])
eip = eips[0] if len(eips) > 0 else None
if eip and eip.instance is None or reassociate:
eip.instance = instance
if eip.domain == "vpc":
eip.association_id = random_eip_association_id()
return eip
else:
return None
def describe_addresses(self):
return self.addresses
def disassociate_address(self, address=None, association_id=None):
eips = []
if address:
eips = self.address_by_ip([address])
elif association_id:
eips = self.address_by_association([association_id])
if eips:
eip = eips[0]
eip.instance = None
eip.association_id = None
return True
else:
return False
def release_address(self, address=None, allocation_id=None):
eips = []
if address:
eips = self.address_by_ip([address])
elif allocation_id:
eips = self.address_by_allocation([allocation_id])
if eips:
eip = eips[0]
self.disassociate_address(address=eip.public_ip)
eip.allocation_id = None
self.addresses.remove(eip)
return True
else:
return False
class EC2Backend(BaseBackend, InstanceBackend, TagBackend, AmiBackend, class EC2Backend(BaseBackend, InstanceBackend, TagBackend, AmiBackend,
RegionsAndZonesBackend, SecurityGroupBackend, EBSBackend, RegionsAndZonesBackend, SecurityGroupBackend, EBSBackend,
VPCBackend, SubnetBackend, SpotRequestBackend): VPCBackend, SubnetBackend, SpotRequestBackend, ElasticAddressBackend):
pass pass

View File

@ -1,21 +1,132 @@
from jinja2 import Template from jinja2 import Template
from moto.ec2.models import ec2_backend from moto.ec2.models import ec2_backend
from moto.ec2.utils import resource_ids_from_querystring from moto.ec2.utils import sequence_from_querystring
class ElasticIPAddresses(object): class ElasticIPAddresses(object):
def allocate_address(self): def allocate_address(self):
raise NotImplementedError('ElasticIPAddresses.allocate_address is not yet implemented') if "Domain" in self.querystring:
domain = self.querystring.get('Domain')[0]
if domain != "vpc":
return "Invalid domain:{0}.".format(domain), dict(status=400)
else:
domain = "standard"
address = ec2_backend.allocate_address(domain)
template = Template(ALLOCATE_ADDRESS_RESPONSE)
return template.render(address=address)
def associate_address(self): def associate_address(self):
raise NotImplementedError('ElasticIPAddresses.associate_address is not yet implemented') if "InstanceId" in self.querystring:
instance = ec2_backend.get_instance(self.querystring['InstanceId'][0])
elif "NetworkInterfaceId" in self.querystring:
raise NotImplementedError("Lookup by allocation id not implemented")
else:
return "Invalid request, expect InstanceId/NetworkId parameter.", dict(status=400)
reassociate = False
if "AllowReassociation" in self.querystring:
reassociate = self.querystring['AllowReassociation'][0] == "true"
if "PublicIp" in self.querystring:
eip = ec2_backend.associate_address(instance, address=self.querystring['PublicIp'][0], reassociate=reassociate)
elif "AllocationId" in self.querystring:
eip = ec2_backend.associate_address(instance, allocation_id=self.querystring['AllocationId'][0], reassociate=reassociate)
else:
return "Invalid request, expect PublicIp/AllocationId parameter.", dict(status=400)
if eip:
template = Template(ASSOCIATE_ADDRESS_RESPONSE)
return template.render(address=eip)
else:
return "Failed to associate address.", dict(status=400)
def describe_addresses(self): def describe_addresses(self):
raise NotImplementedError('ElasticIPAddresses.describe_addresses is not yet implemented') template = Template(DESCRIBE_ADDRESS_RESPONSE)
if "Filter.1.Name" in self.querystring:
raise NotImplementedError("Filtering not supported in describe_address.")
elif "PublicIp.1" in self.querystring:
public_ips = sequence_from_querystring("PublicIp", self.querystring)
addresses = ec2_backend.address_by_ip(public_ips)
elif "AllocationId.1" in self.querystring:
allocation_ids = sequence_from_querystring("AllocationId", self.querystring)
addresses = ec2_backend.address_by_allocation(allocation_ids)
else:
addresses = ec2_backend.describe_addresses()
return template.render(addresses=addresses)
def disassociate_address(self): def disassociate_address(self):
raise NotImplementedError('ElasticIPAddresses.disassociate_address is not yet implemented') if "PublicIp" in self.querystring:
disassociated = ec2_backend.disassociate_address(address=self.querystring['PublicIp'][0])
elif "AssociationId" in self.querystring:
disassociated = ec2_backend.disassociate_address(association_id=self.querystring['AssociationId'][0])
else:
return "Invalid request, expect PublicIp/AssociationId parameter.", dict(status=400)
if disassociated:
return Template(DISASSOCIATE_ADDRESS_RESPONSE).render()
else:
return "Address conresponding to PublicIp/AssociationIP not found.", dict(status=400)
def release_address(self): def release_address(self):
raise NotImplementedError('ElasticIPAddresses.release_address is not yet implemented') if "PublicIp" in self.querystring:
released = ec2_backend.release_address(address=self.querystring['PublicIp'][0])
elif "AllocationId" in self.querystring:
released = ec2_backend.release_address(allocation_id=self.querystring['AllocationId'][0])
else:
return "Invalid request, expect PublicIp/AllocationId parameter.", dict(status=400)
if released:
return Template(RELEASE_ADDRESS_RESPONSE).render()
else:
return "Address conresponding to PublicIp/AssociationIP not found.", dict(status=400)
ALLOCATE_ADDRESS_RESPONSE = """<AllocateAddressResponse xmlns="http://ec2.amazonaws.com/doc/2013-07-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<publicIp>{{ address.public_ip }}</publicIp>
<domain>{{ address.domain }}</domain>
{% if address.allocation_id %}
<allocationId>{{ address.allocation_id }}</allocationId>
{% endif %}
</AllocateAddressResponse>"""
ASSOCIATE_ADDRESS_RESPONSE = """<AssociateAddressResponse xmlns="http://ec2.amazonaws.com/doc/2013-07-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<return>true</return>
{% if address.association_id %}
<associationId>{{ address.association_id }}</associationId>
{% endif %}
</AssociateAddressResponse>"""
DESCRIBE_ADDRESS_RESPONSE = """<DescribeAddressesResponse xmlns="http://ec2.amazonaws.com/doc/2013-07-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<addressesSet>
{% for address in addresses %}
<item>
<publicIp>{{ address.public_ip }}</publicIp>
<domain>{{ address.domain }}</domain>
{% if address.instance %}
<instanceId>{{ address.instance.id }}</instanceId>
{% else %}
<instanceId/>
{% endif %}
{% if address.association_id %}
<associationId>{{ address.association_id }}</associationId>
{% endif %}
</item>
{% endfor %}
</addressesSet>
</DescribeAddressesResponse>"""
DISASSOCIATE_ADDRESS_RESPONSE = """<DisassociateAddressResponse xmlns="http://ec2.amazonaws.com/doc/2013-07-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<return>true</return>
</DisassociateAddressResponse>"""
RELEASE_ADDRESS_RESPONSE = """<ReleaseAddressResponse xmlns="http://ec2.amazonaws.com/doc/2013-07-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<return>true</return>
</ReleaseAddressResponse>"""

View File

@ -46,6 +46,22 @@ def random_vpc_id():
return random_id(prefix='vpc') return random_id(prefix='vpc')
def random_eip_association_id():
return random_id(prefix='eipassoc')
def random_eip_allocation_id():
return random_id(prefix='eipalloc')
def random_ip():
return "127.{0}.{1}.{2}".format(
random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255)
)
def instance_ids_from_querystring(querystring_dict): def instance_ids_from_querystring(querystring_dict):
instance_ids = [] instance_ids = []
for key, value in querystring_dict.iteritems(): for key, value in querystring_dict.iteritems():
@ -62,6 +78,14 @@ def image_ids_from_querystring(querystring_dict):
return image_ids return image_ids
def sequence_from_querystring(parameter, querystring_dict):
parameter_values = []
for key, value in querystring_dict.iteritems():
if parameter in key:
parameter_values.append(value[0])
return parameter_values
def resource_ids_from_querystring(querystring_dict): def resource_ids_from_querystring(querystring_dict):
prefix = 'ResourceId' prefix = 'ResourceId'
response_values = {} response_values = {}

View File

@ -1,9 +1,194 @@
"""Test mocking of Elatic IP Address"""
import boto import boto
from boto.exception import EC2ResponseError
import sure # noqa import sure # noqa
from moto import mock_ec2 from moto import mock_ec2
import logging
import types
@mock_ec2 @mock_ec2
def test_elastic_ip_addresses(): def test_eip_allocate_classic():
pass """Allocate/release Classic EIP"""
conn = boto.connect_ec2('the_key', 'the_secret')
standard = conn.allocate_address()
standard.should.be.a(boto.ec2.address.Address)
standard.public_ip.should.be.a(types.UnicodeType)
standard.instance_id.should.be.none
standard.domain.should.be.equal("standard")
standard.release()
standard.should_not.be.within(conn.get_all_addresses())
@mock_ec2
def test_eip_allocate_vpc():
"""Allocate/release VPC EIP"""
conn = boto.connect_ec2('the_key', 'the_secret')
vpc = conn.allocate_address(domain="vpc")
vpc.should.be.a(boto.ec2.address.Address)
vpc.domain.should.be.equal("vpc")
logging.debug("vpc alloc_id:".format(vpc.allocation_id))
vpc.release()
@mock_ec2
def test_eip_allocate_invalid_domain():
"""Allocate EIP invalid domain"""
conn = boto.connect_ec2('the_key', 'the_secret')
conn.allocate_address.when.called_with(domain="bogus").should.throw(EC2ResponseError)
@mock_ec2
def test_eip_associate_classic():
"""Associate/Disassociate EIP to classic instance"""
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
eip = conn.allocate_address()
eip.instance_id.should.be.none
conn.associate_address.when.called_with(public_ip=eip.public_ip).should.throw(EC2ResponseError)
conn.associate_address(instance_id=instance.id, public_ip=eip.public_ip)
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
eip.instance_id.should.be.equal(instance.id)
conn.disassociate_address(public_ip=eip.public_ip)
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
eip.instance_id.should.be.equal(u'')
eip.release()
eip.should_not.be.within(conn.get_all_addresses())
eip = None
instance.terminate()
@mock_ec2
def test_eip_associate_vpc():
"""Associate/Disassociate EIP to VPC instance"""
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
eip = conn.allocate_address(domain='vpc')
eip.instance_id.should.be.none
conn.associate_address.when.called_with(allocation_id=eip.allocation_id).should.throw(EC2ResponseError)
conn.associate_address(instance_id=instance.id, allocation_id=eip.allocation_id)
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
eip.instance_id.should.be.equal(instance.id)
conn.disassociate_address(association_id=eip.association_id)
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
eip.instance_id.should.be.equal(u'')
eip.association_id.should.be.none
eip.release()
eip = None
instance.terminate()
@mock_ec2
def test_eip_reassociate():
"""reassociate EIP"""
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
eip = conn.allocate_address()
conn.associate_address(instance_id=instance.id, public_ip=eip.public_ip)
conn.associate_address.when.called_with(instance_id=instance.id, public_ip=eip.public_ip, allow_reassociation=False).should.throw(EC2ResponseError)
conn.associate_address.when.called_with(instance_id=instance.id, public_ip=eip.public_ip, allow_reassociation=True).should_not.throw(EC2ResponseError)
eip.release()
eip = None
instance.terminate()
@mock_ec2
def test_eip_associate_invalid_args():
"""Associate EIP, invalid args """
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
eip = conn.allocate_address()
conn.associate_address.when.called_with(instance_id=instance.id).should.throw(EC2ResponseError)
instance.terminate()
@mock_ec2
def test_eip_disassociate_bogus_association():
"""Disassociate bogus EIP"""
conn = boto.connect_ec2('the_key', 'the_secret')
conn.disassociate_address.when.called_with(association_id="bogus").should.throw(EC2ResponseError)
@mock_ec2
def test_eip_release_bogus_eip():
"""Release bogus EIP"""
conn = boto.connect_ec2('the_key', 'the_secret')
conn.release_address.when.called_with(allocation_id="bogus").should.throw(EC2ResponseError)
@mock_ec2
def test_eip_disassociate_arg_error():
"""Invalid arguments disassociate address"""
conn = boto.connect_ec2('the_key', 'the_secret')
conn.disassociate_address.when.called_with().should.throw(EC2ResponseError)
@mock_ec2
def test_eip_release_arg_error():
"""Invalid arguments release address"""
conn = boto.connect_ec2('the_key', 'the_secret')
conn.release_address.when.called_with().should.throw(EC2ResponseError)
@mock_ec2
def test_eip_describe():
"""Listing of allocated Elastic IP Addresses."""
conn = boto.connect_ec2('the_key', 'the_secret')
eips = []
number_of_classic_ips = 2
number_of_vpc_ips = 2
#allocate some IPs
for _ in range(number_of_classic_ips):
eips.append(conn.allocate_address())
for _ in range(number_of_vpc_ips):
eips.append(conn.allocate_address(domain='vpc'))
len(eips).should.be.equal(number_of_classic_ips + number_of_vpc_ips)
# Can we find each one individually?
for eip in eips:
if eip.allocation_id:
lookup_addresses = conn.get_all_addresses(allocation_ids=[eip.allocation_id])
else:
lookup_addresses = conn.get_all_addresses(addresses=[eip.public_ip])
len(lookup_addresses).should.be.equal(1)
lookup_addresses[0].public_ip.should.be.equal(eip.public_ip)
# Can we find first two when we search for them?
lookup_addresses = conn.get_all_addresses(addresses=[eips[0].public_ip, eips[1].public_ip])
len(lookup_addresses).should.be.equal(2)
lookup_addresses[0].public_ip.should.be.equal(eips[0].public_ip)
lookup_addresses[1].public_ip.should.be.equal(eips[1].public_ip)
#Release all IPs
for eip in eips:
eip.release()
len(conn.get_all_addresses()).should.be.equal(0)
@mock_ec2
def test_eip_describe_none():
"""Find nothing when seach for bogus IP"""
conn = boto.connect_ec2('the_key', 'the_secret')
lookup_addresses = conn.get_all_addresses(addresses=["256.256.256.256"])
len(lookup_addresses).should.be.equal(0)