Merge pull request #1150 from gvlproject/refactor_filtering

Refactored handling of unknown filter names
This commit is contained in:
Jack Danger 2017-09-16 12:07:13 -07:00 committed by GitHub
commit 35feb9feb7
2 changed files with 89 additions and 130 deletions

View File

@ -375,3 +375,20 @@ class RulesPerSecurityGroupLimitExceededError(EC2ClientError):
"RulesPerSecurityGroupLimitExceeded",
'The maximum number of rules per security group '
'has been reached.')
class MotoNotImplementedError(NotImplementedError):
def __init__(self, blurb):
super(MotoNotImplementedError, self).__init__(
"{0} has not been implemented in Moto yet."
" Feel free to open an issue at"
" https://github.com/spulec/moto/issues".format(blurb))
class FilterNotImplementedError(MotoNotImplementedError):
def __init__(self, filter_name, method_name):
super(FilterNotImplementedError, self).__init__(
"The filter '{0}' for {1}".format(
filter_name, method_name))

View File

@ -62,6 +62,8 @@ from .exceptions import (
InvalidVpnConnectionIdError,
InvalidCustomerGatewayIdError,
RulesPerSecurityGroupLimitExceededError,
MotoNotImplementedError,
FilterNotImplementedError
)
from .utils import (
EC2_RESOURCE_TO_PREFIX,
@ -144,7 +146,7 @@ class TaggedEC2Resource(BaseModel):
for key, value in tag_map.items():
self.ec2_backend.create_tags([self.id], {key: value})
def get_filter_value(self, filter_name):
def get_filter_value(self, filter_name, method_name=None):
tags = self.get_tags()
if filter_name.startswith('tag:'):
@ -154,12 +156,12 @@ class TaggedEC2Resource(BaseModel):
return tag['value']
return ''
if filter_name == 'tag-key':
elif filter_name == 'tag-key':
return [tag['key'] for tag in tags]
if filter_name == 'tag-value':
elif filter_name == 'tag-value':
return [tag['value'] for tag in tags]
else:
raise FilterNotImplementedError(filter_name, method_name)
class NetworkInterface(TaggedEC2Resource):
@ -261,17 +263,9 @@ class NetworkInterface(TaggedEC2Resource):
return [group.id for group in self._group_set]
elif filter_name == 'availability-zone':
return self.subnet.availability_zone
filter_value = super(
NetworkInterface, self).get_filter_value(filter_name)
if filter_value is None:
self.ec2_backend.raise_not_implemented_error(
"The filter '{0}' for DescribeNetworkInterfaces".format(
filter_name)
)
return filter_value
else:
return super(NetworkInterface, self).get_filter_value(
filter_name, 'DescribeNetworkInterfaces')
class NetworkInterfaceBackend(object):
@ -802,6 +796,8 @@ class KeyPair(object):
return self.name
elif filter_name == 'fingerprint':
return self.fingerprint
else:
raise FilterNotImplementedError(filter_name, 'DescribeKeyPairs')
class KeyPairBackend(object):
@ -1039,14 +1035,9 @@ class Ami(TaggedEC2Resource):
return self.state
elif filter_name == 'name':
return self.name
filter_value = super(Ami, self).get_filter_value(filter_name)
if filter_value is None:
self.ec2_backend.raise_not_implemented_error(
"The filter '{0}' for DescribeImages".format(filter_name))
return filter_value
else:
return super(Ami, self).get_filter_value(
filter_name, 'DescribeImages')
class AmiBackend(object):
@ -1703,43 +1694,31 @@ class Volume(TaggedEC2Resource):
return 'available'
def get_filter_value(self, filter_name):
if filter_name.startswith('attachment') and not self.attachment:
return None
if filter_name == 'attachment.attach-time':
elif filter_name == 'attachment.attach-time':
return self.attachment.attach_time
if filter_name == 'attachment.device':
elif filter_name == 'attachment.device':
return self.attachment.device
if filter_name == 'attachment.instance-id':
elif filter_name == 'attachment.instance-id':
return self.attachment.instance.id
if filter_name == 'attachment.status':
elif filter_name == 'attachment.status':
return self.attachment.status
if filter_name == 'create-time':
elif filter_name == 'create-time':
return self.create_time
if filter_name == 'size':
elif filter_name == 'size':
return self.size
if filter_name == 'snapshot-id':
elif filter_name == 'snapshot-id':
return self.snapshot_id
if filter_name == 'status':
elif filter_name == 'status':
return self.status
if filter_name == 'volume-id':
elif filter_name == 'volume-id':
return self.id
if filter_name == 'encrypted':
elif filter_name == 'encrypted':
return str(self.encrypted).lower()
filter_value = super(Volume, self).get_filter_value(filter_name)
if filter_value is None:
self.ec2_backend.raise_not_implemented_error(
"The filter '{0}' for DescribeVolumes".format(filter_name))
return filter_value
else:
return super(Volume, self).get_filter_value(
filter_name, 'DescribeVolumes')
class Snapshot(TaggedEC2Resource):
@ -1754,35 +1733,23 @@ class Snapshot(TaggedEC2Resource):
self.encrypted = encrypted
def get_filter_value(self, filter_name):
if filter_name == 'description':
return self.description
if filter_name == 'snapshot-id':
elif filter_name == 'snapshot-id':
return self.id
if filter_name == 'start-time':
elif filter_name == 'start-time':
return self.start_time
if filter_name == 'volume-id':
elif filter_name == 'volume-id':
return self.volume.id
if filter_name == 'volume-size':
elif filter_name == 'volume-size':
return self.volume.size
if filter_name == 'encrypted':
elif filter_name == 'encrypted':
return str(self.encrypted).lower()
if filter_name == 'status':
elif filter_name == 'status':
return self.status
filter_value = super(Snapshot, self).get_filter_value(filter_name)
if filter_value is None:
self.ec2_backend.raise_not_implemented_error(
"The filter '{0}' for DescribeSnapshots".format(filter_name))
return filter_value
else:
return super(Snapshot, self).get_filter_value(
filter_name, 'DescribeSnapshots')
class EBSBackend(object):
@ -1944,16 +1911,10 @@ class VPC(TaggedEC2Resource):
elif filter_name in ('dhcp-options-id', 'dhcpOptionsId'):
if not self.dhcp_options:
return None
return self.dhcp_options.id
filter_value = super(VPC, self).get_filter_value(filter_name)
if filter_value is None:
self.ec2_backend.raise_not_implemented_error(
"The filter '{0}' for DescribeVPCs".format(filter_name))
return filter_value
else:
return super(VPC, self).get_filter_value(
filter_name, 'DescribeVpcs')
class VPCBackend(object):
@ -2187,14 +2148,9 @@ class Subnet(TaggedEC2Resource):
return self.availability_zone
elif filter_name in ('defaultForAz', 'default-for-az'):
return self.default_for_az
filter_value = super(Subnet, self).get_filter_value(filter_name)
if filter_value is None:
self.ec2_backend.raise_not_implemented_error(
"The filter '{0}' for DescribeSubnets".format(filter_name))
return filter_value
else:
return super(Subnet, self).get_filter_value(
filter_name, 'DescribeSubnets')
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -2332,14 +2288,9 @@ class RouteTable(TaggedEC2Resource):
return self.associations.keys()
elif filter_name == "association.subnet-id":
return self.associations.values()
filter_value = super(RouteTable, self).get_filter_value(filter_name)
if filter_value is None:
self.ec2_backend.raise_not_implemented_error(
"The filter '{0}' for DescribeRouteTables".format(filter_name))
return filter_value
else:
return super(RouteTable, self).get_filter_value(
filter_name, 'DescribeRouteTables')
class RouteTableBackend(object):
@ -2686,16 +2637,11 @@ class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource):
def get_filter_value(self, filter_name):
if filter_name == 'state':
return self.state
if filter_name == 'spot-instance-request-id':
elif filter_name == 'spot-instance-request-id':
return self.id
filter_value = super(SpotInstanceRequest,
self).get_filter_value(filter_name)
if filter_value is None:
self.ec2_backend.raise_not_implemented_error(
"The filter '{0}' for DescribeSpotInstanceRequests".format(filter_name))
return filter_value
else:
return super(SpotInstanceRequest, self).get_filter_value(
filter_name, 'DescribeSpotInstanceRequests')
def launch_instance(self):
reservation = self.ec2_backend.add_instances(
@ -2966,15 +2912,13 @@ class ElasticAddress(object):
return self.instance.id
elif filter_name == 'network-interface-id' and self.eni:
return self.eni.id
elif filter_name == 'network-interface-owner-id':
msg = "The filter '{0}' for DescribeAddresses has not been" \
" implemented in Moto yet. Feel free to open an issue at" \
" https://github.com/spulec/moto/issues".format(filter_name)
raise NotImplementedError(msg)
elif filter_name == 'private-ip-address' and self.eni:
return self.eni.private_ip_address
elif filter_name == 'public-ip':
return self.public_ip
else:
# TODO: implement network-interface-owner-id
raise FilterNotImplementedError(filter_name, 'DescribeAddresses')
class ElasticAddressBackend(object):
@ -3134,15 +3078,9 @@ class DHCPOptionsSet(TaggedEC2Resource):
elif filter_name == 'value':
values = [item for item in list(self._options.values()) if item]
return itertools.chain(*values)
filter_value = super(
DHCPOptionsSet, self).get_filter_value(filter_name)
if filter_value is None:
self.ec2_backend.raise_not_implemented_error(
"The filter '{0}' for DescribeDhcpOptions".format(filter_name))
return filter_value
else:
return super(DHCPOptionsSet, self).get_filter_value(
filter_name, 'DescribeDhcpOptions')
@property
def options(self):
@ -3229,6 +3167,10 @@ class VPNConnection(TaggedEC2Resource):
self.options = None
self.static_routes = None
def get_filter_value(self, filter_name):
return super(VPNConnection, self).get_filter_value(
filter_name, 'DescribeVpnConnections')
class VPNConnectionBackend(object):
def __init__(self):
@ -3408,14 +3350,9 @@ class NetworkAcl(TaggedEC2Resource):
return self.id
elif filter_name == "association.subnet-id":
return [assoc.subnet_id for assoc in self.associations.values()]
filter_value = super(NetworkAcl, self).get_filter_value(filter_name)
if filter_value is None:
self.ec2_backend.raise_not_implemented_error(
"The filter '{0}' for DescribeNetworkAcls".format(filter_name))
return filter_value
else:
return super(NetworkAcl, self).get_filter_value(
filter_name, 'DescribeNetworkAcls')
class NetworkAclEntry(TaggedEC2Resource):
@ -3444,6 +3381,10 @@ class VpnGateway(TaggedEC2Resource):
self.attachments = {}
super(VpnGateway, self).__init__()
def get_filter_value(self, filter_name):
return super(VpnGateway, self).get_filter_value(
filter_name, 'DescribeVpnGateways')
class VpnGatewayAttachment(object):
def __init__(self, vpc_id, state):
@ -3505,6 +3446,10 @@ class CustomerGateway(TaggedEC2Resource):
self.attachments = {}
super(CustomerGateway, self).__init__()
def get_filter_value(self, filter_name):
return super(CustomerGateway, self).get_filter_value(
filter_name, 'DescribeCustomerGateways')
class CustomerGatewayBackend(object):
def __init__(self):
@ -3648,10 +3593,7 @@ class EC2Backend(BaseBackend, InstanceBackend, TagBackend, AmiBackend,
raise EC2ClientError(code, message)
def raise_not_implemented_error(self, blurb):
msg = "{0} has not been implemented in Moto yet." \
" Feel free to open an issue at" \
" https://github.com/spulec/moto/issues".format(blurb)
raise NotImplementedError(msg)
raise MotoNotImplementedError(blurb)
def do_resources_exist(self, resource_ids):
for resource_id in resource_ids: