Enforcing ELB security groups must be real

This commit is contained in:
Jack Danger 2017-07-19 15:58:49 -07:00
parent b512316c82
commit 6ed8d12317
5 changed files with 79 additions and 13 deletions

View File

@ -3553,8 +3553,8 @@ class EC2Backend(BaseBackend, InstanceBackend, TagBackend, AmiBackend,
DHCPOptionsSetBackend, NetworkAclBackend, VpnGatewayBackend,
CustomerGatewayBackend, NatGatewayBackend):
def __init__(self, region_name):
super(EC2Backend, self).__init__()
self.region_name = region_name
super(EC2Backend, self).__init__()
# Default VPC exists by default, which is the current behavior
# of EC2-VPC. See for detail:

View File

@ -64,3 +64,11 @@ class EmptyListenersError(ELBClientError):
super(EmptyListenersError, self).__init__(
"ValidationError",
"Listeners cannot be empty")
class InvalidSecurityGroupError(ELBClientError):
def __init__(self):
super(InvalidSecurityGroupError, self).__init__(
"ValidationError",
"One or more of the specified security groups do not exist.")

View File

@ -20,6 +20,7 @@ from .exceptions import (
DuplicateLoadBalancerName,
DuplicateListenerError,
EmptyListenersError,
InvalidSecurityGroupError,
LoadBalancerNotFoundError,
TooManyTagsError,
)
@ -63,7 +64,7 @@ class FakeBackend(BaseModel):
class FakeLoadBalancer(BaseModel):
def __init__(self, name, zones, ports, scheme='internet-facing', vpc_id=None, subnets=None):
def __init__(self, name, zones, ports, scheme='internet-facing', vpc_id=None, subnets=None, security_groups=None):
self.name = name
self.health_check = None
self.instance_ids = []
@ -77,6 +78,7 @@ class FakeLoadBalancer(BaseModel):
self.policies.other_policies = []
self.policies.app_cookie_stickiness_policies = []
self.policies.lb_cookie_stickiness_policies = []
self.security_groups = security_groups or []
self.subnets = subnets or []
self.vpc_id = vpc_id or 'vpc-56e10e3d'
self.tags = {}
@ -233,7 +235,7 @@ class ELBBackend(BaseBackend):
self.__dict__ = {}
self.__init__(region_name)
def create_load_balancer(self, name, zones, ports, scheme='internet-facing', subnets=None):
def create_load_balancer(self, name, zones, ports, scheme='internet-facing', subnets=None, security_groups=None):
vpc_id = None
ec2_backend = ec2_backends[self.region_name]
if subnets:
@ -243,8 +245,19 @@ class ELBBackend(BaseBackend):
raise DuplicateLoadBalancerName(name)
if not ports:
raise EmptyListenersError()
if not security_groups:
security_groups = []
for security_group in security_groups:
if ec2_backend.get_security_group_from_id(security_group) is None:
raise InvalidSecurityGroupError()
new_load_balancer = FakeLoadBalancer(
name=name, zones=zones, ports=ports, scheme=scheme, subnets=subnets, vpc_id=vpc_id)
name=name,
zones=zones,
ports=ports,
scheme=scheme,
subnets=subnets,
security_groups=security_groups,
vpc_id=vpc_id)
self.load_balancers[name] = new_load_balancer
return new_load_balancer
@ -302,6 +315,14 @@ class ELBBackend(BaseBackend):
def get_load_balancer(self, load_balancer_name):
return self.load_balancers.get(load_balancer_name)
def apply_security_groups_to_load_balancer(self, load_balancer_name, security_group_ids):
load_balancer = self.load_balancers.get(load_balancer_name)
ec2_backend = ec2_backends[self.region_name]
for security_group_id in security_group_ids:
if ec2_backend.get_security_group_from_id(security_group_id) is None:
raise InvalidSecurityGroupError()
load_balancer.security_groups = security_group_ids
def configure_health_check(self, load_balancer_name, timeout,
healthy_threshold, unhealthy_threshold, interval,
target):

View File

@ -27,6 +27,7 @@ class ELBResponse(BaseResponse):
ports = self._get_list_prefix("Listeners.member")
scheme = self._get_param('Scheme')
subnets = self._get_multi_param("Subnets.member")
security_groups = self._get_multi_param("SecurityGroups.member")
load_balancer = self.elb_backend.create_load_balancer(
name=load_balancer_name,
@ -34,6 +35,7 @@ class ELBResponse(BaseResponse):
ports=ports,
scheme=scheme,
subnets=subnets,
security_groups=security_groups,
)
self._add_tags(load_balancer)
template = self.response_template(CREATE_LOAD_BALANCER_TEMPLATE)
@ -84,6 +86,13 @@ class ELBResponse(BaseResponse):
template = self.response_template(DELETE_LOAD_BALANCER_TEMPLATE)
return template.render()
def apply_security_groups_to_load_balancer(self):
load_balancer_name = self._get_param('LoadBalancerName')
security_group_ids = self._get_multi_param("SecurityGroups.member")
self.elb_backend.apply_security_groups_to_load_balancer(load_balancer_name, security_group_ids)
template = self.response_template(APPLY_SECURITY_GROUPS_TEMPLATE)
return template.render(security_group_ids=security_group_ids)
def configure_health_check(self):
check = self.elb_backend.configure_health_check(
load_balancer_name=self._get_param('LoadBalancerName'),
@ -99,8 +108,7 @@ class ELBResponse(BaseResponse):
def register_instances_with_load_balancer(self):
load_balancer_name = self._get_param('LoadBalancerName')
instance_ids = [value[0] for key, value in self.querystring.items(
) if "Instances.member" in key]
instance_ids = [param.values()[0] for param in self._get_list_prefix('Instances.member')]
template = self.response_template(REGISTER_INSTANCES_TEMPLATE)
load_balancer = self.elb_backend.register_instances(
load_balancer_name, instance_ids)
@ -119,8 +127,7 @@ class ELBResponse(BaseResponse):
def deregister_instances_from_load_balancer(self):
load_balancer_name = self._get_param('LoadBalancerName')
instance_ids = [value[0] for key, value in self.querystring.items(
) if "Instances.member" in key]
instance_ids = [param.values()[0] for param in self._get_list_prefix('Instances.member')]
template = self.response_template(DEREGISTER_INSTANCES_TEMPLATE)
load_balancer = self.elb_backend.deregister_instances(
load_balancer_name, instance_ids)
@ -252,8 +259,7 @@ class ELBResponse(BaseResponse):
def describe_instance_health(self):
load_balancer_name = self._get_param('LoadBalancerName')
instance_ids = [value[0] for key, value in self.querystring.items(
) if "Instances.member" in key]
instance_ids = [param.values()[0] for param in self._get_list_prefix('Instances.member')]
if len(instance_ids) == 0:
instance_ids = self.elb_backend.get_load_balancer(
load_balancer_name).instance_ids
@ -400,6 +406,9 @@ DESCRIBE_LOAD_BALANCERS_TEMPLATE = """<DescribeLoadBalancersResponse xmlns="http
{% for load_balancer in load_balancers %}
<member>
<SecurityGroups>
{% for security_group_id in load_balancer.security_groups %}
<member>{{ security_group_id }}</member>
{% endfor %}
</SecurityGroups>
<LoadBalancerName>{{ load_balancer.name }}</LoadBalancerName>
<CreatedTime>{{ load_balancer.created_time }}</CreatedTime>
@ -513,6 +522,19 @@ DESCRIBE_LOAD_BALANCERS_TEMPLATE = """<DescribeLoadBalancersResponse xmlns="http
</ResponseMetadata>
</DescribeLoadBalancersResponse>"""
APPLY_SECURITY_GROUPS_TEMPLATE = """<ApplySecurityGroupsToLoadBalancerResponse xmlns="http://elasticloadbalancing.amazonaws.com/doc/2012-06-01/">
<ApplySecurityGroupsToLoadBalancerResult>
<SecurityGroups>
{% for security_group_id in security_group_ids %}
<member>{{ security_group_id }}</member>
{% endfor %}
</SecurityGroups>
</ApplySecurityGroupsToLoadBalancerResult>
<ResponseMetadata>
<RequestId>f9880f01-7852-629d-a6c3-3ae2-666a409287e6dc0c</RequestId>
</ResponseMetadata>
</ApplySecurityGroupsToLoadBalancerResponse>"""
CONFIGURE_HEALTH_CHECK_TEMPLATE = """<ConfigureHealthCheckResponse xmlns="http://elasticloadbalancing.amazonaws.com/doc/2012-06-01/">
<ConfigureHealthCheckResult>
<HealthCheck>

View File

@ -18,17 +18,22 @@ from moto import mock_elb, mock_ec2, mock_elb_deprecated, mock_ec2_deprecated
@mock_elb_deprecated
@mock_ec2_deprecated
def test_create_load_balancer():
conn = boto.connect_elb()
ec2 = boto.connect_ec2('the_key', 'the_secret')
security_group = ec2.create_security_group('sg-abc987', 'description')
zones = ['us-east-1a', 'us-east-1b']
ports = [(80, 8080, 'http'), (443, 8443, 'tcp')]
conn.create_load_balancer('my-lb', zones, ports, scheme='internal')
conn.create_load_balancer('my-lb', zones, ports, scheme='internal', security_groups=[security_group.id])
balancers = conn.get_all_load_balancers()
balancer = balancers[0]
balancer.name.should.equal("my-lb")
balancer.scheme.should.equal("internal")
list(balancer.security_groups).should.equal([security_group.id])
set(balancer.availability_zones).should.equal(
set(['us-east-1a', 'us-east-1b']))
listener1 = balancer.listeners[0]
@ -139,9 +144,9 @@ def test_describe_paginated_balancers():
@mock_elb
@mock_ec2
def test_add_and_remove_security_groups():
def test_apply_security_groups_to_load_balancer():
client = boto3.client('elb', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-west-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
security_group = ec2.create_security_group(
@ -157,7 +162,17 @@ def test_add_and_remove_security_groups():
response = client.apply_security_groups_to_load_balancer(
LoadBalancerName='my-lb',
SecurityGroups=[security_group.id])
assert response['SecurityGroups'] == [security_group.id]
balancer = client.describe_load_balancers()['LoadBalancerDescriptions'][0]
assert balancer['SecurityGroups'] == [security_group.id]
# Usign a not-real security group raises an error
with assert_raises(ClientError) as error:
response = client.apply_security_groups_to_load_balancer(
LoadBalancerName='my-lb',
SecurityGroups=['not-really-a-security-group'])
assert "One or more of the specified security groups do not exist." in str(error.exception)
@mock_elb_deprecated