diff --git a/moto/acm/models.py b/moto/acm/models.py index 9dba350c9..03143a950 100644 --- a/moto/acm/models.py +++ b/moto/acm/models.py @@ -144,6 +144,7 @@ class CertBundle(BaseModel): self._chain = None self.type = cert_type # Should really be an enum self.status = cert_status # Should really be an enum + self.in_use_by = [] # AWS always returns your chain + root CA if self.chain is None: @@ -361,7 +362,7 @@ class CertBundle(BaseModel): "Certificate": { "CertificateArn": self.arn, "DomainName": self.common_name, - "InUseBy": [], + "InUseBy": self.in_use_by, "Issuer": self._cert.issuer.get_attributes_for_oid( cryptography.x509.OID_COMMON_NAME )[0].value, @@ -430,6 +431,13 @@ class AWSCertificateManagerBackend(BaseBackend): ) return AWSResourceNotFoundException(msg) + def set_certificate_in_use_by(self, arn, load_balancer_name): + if arn not in self._certificates: + raise self._arn_not_found(arn) + + cert_bundle = self._certificates[arn] + cert_bundle.in_use_by.append(load_balancer_name) + def _get_arn_from_idempotency_token(self, token): """ If token doesnt exist, return None, later it will be diff --git a/moto/elb/exceptions.py b/moto/elb/exceptions.py index d41a66e3f..14405df2c 100644 --- a/moto/elb/exceptions.py +++ b/moto/elb/exceptions.py @@ -13,6 +13,13 @@ class DuplicateTagKeysError(ELBClientError): ) +class CertificateNotFoundException(ELBClientError): + def __init__(self): + super(CertificateNotFoundException, self).__init__( + "CertificateNotFoundException", "Supplied certificate was not found" + ) + + class LoadBalancerNotFoundError(ELBClientError): def __init__(self, cidr): super(LoadBalancerNotFoundError, self).__init__( diff --git a/moto/elb/models.py b/moto/elb/models.py index 196ae5a4f..4b479dbe2 100644 --- a/moto/elb/models.py +++ b/moto/elb/models.py @@ -23,6 +23,7 @@ from .exceptions import ( InvalidSecurityGroupError, LoadBalancerNotFoundError, TooManyTagsError, + CertificateNotFoundException, ) @@ -79,6 +80,7 @@ class FakeLoadBalancer(CloudFormationModel): vpc_id=None, subnets=None, security_groups=None, + elb_backend=None, ): self.name = name self.health_check = None @@ -111,6 +113,11 @@ class FakeLoadBalancer(CloudFormationModel): "ssl_certificate_id", port.get("SSLCertificateId") ), ) + if listener.ssl_certificate_id: + elb_backend._register_certificate( + listener.ssl_certificate_id, self.dns_name + ) + self.listeners.append(listener) # it is unclear per the AWS documentation as to when or how backend @@ -310,6 +317,7 @@ class ELBBackend(BaseBackend): subnets=subnets, security_groups=security_groups, vpc_id=vpc_id, + elb_backend=self, ) self.load_balancers[name] = new_load_balancer return new_load_balancer @@ -332,6 +340,10 @@ class ELBBackend(BaseBackend): raise DuplicateListenerError(name, lb_port) break else: + if ssl_certificate_id: + self._register_certificate( + ssl_certificate_id, balancer.dns_name + ) balancer.listeners.append( FakeListener( lb_port, instance_port, protocol, ssl_certificate_id @@ -406,7 +418,10 @@ class ELBBackend(BaseBackend): for idx, listener in enumerate(balancer.listeners): if lb_port == listener.load_balancer_port: balancer.listeners[idx].ssl_certificate_id = ssl_certificate_id - + if ssl_certificate_id: + self._register_certificate( + ssl_certificate_id, balancer.dns_name + ) return balancer def register_instances( @@ -509,6 +524,15 @@ class ELBBackend(BaseBackend): load_balancer.listeners[listener_idx] = listener return load_balancer + def _register_certificate(self, ssl_certificate_id, dns_name): + from moto.acm.models import acm_backends, AWSResourceNotFoundException + + acm_backend = acm_backends[self.region_name] + try: + acm_backend.set_certificate_in_use_by(ssl_certificate_id, dns_name) + except AWSResourceNotFoundException: + raise CertificateNotFoundException() + elb_backends = {} for region in ec2_backends.keys(): diff --git a/tests/test_acm/test_acm.py b/tests/test_acm/test_acm.py index 3d5905ce1..5c5b566f6 100644 --- a/tests/test_acm/test_acm.py +++ b/tests/test_acm/test_acm.py @@ -11,11 +11,11 @@ from botocore.exceptions import ClientError from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization, hashes from freezegun import freeze_time -from moto import mock_acm, settings +from moto import mock_acm, mock_elb, settings from moto.core import ACCOUNT_ID - from unittest import SkipTest, mock + RESOURCE_FOLDER = os.path.join(os.path.dirname(__file__), "resources") _GET_RESOURCE = lambda x: open(os.path.join(RESOURCE_FOLDER, x), "rb").read() CA_CRT = _GET_RESOURCE("ca.pem") @@ -640,3 +640,38 @@ def test_request_certificate_with_mutiple_times(): ) arn = resp["CertificateArn"] arn.should_not.equal(original_arn) + + +@mock_acm +@mock_elb +def test_elb_acm_in_use_by(): + acm_client = boto3.client("acm", region_name="us-west-2") + elb_client = boto3.client("elb", region_name="us-west-2") + + acm_request_response = acm_client.request_certificate( + DomainName="fake.domain.com", + DomainValidationOptions=[ + {"DomainName": "fake.domain.com", "ValidationDomain": "domain.com"} + ], + ) + + certificate_arn = acm_request_response["CertificateArn"] + + create_load_balancer_request = elb_client.create_load_balancer( + LoadBalancerName="test", + Listeners=[ + { + "Protocol": "https", + "LoadBalancerPort": 443, + "InstanceProtocol": "http", + "InstancePort": 80, + "SSLCertificateId": certificate_arn, + } + ], + ) + + response = acm_client.describe_certificate(CertificateArn=certificate_arn) + + response["Certificate"]["InUseBy"].should.equal( + [create_load_balancer_request["DNSName"]] + ) diff --git a/tests/test_elb/test_elb.py b/tests/test_elb/test_elb.py index ddae80194..3497face9 100644 --- a/tests/test_elb/test_elb.py +++ b/tests/test_elb/test_elb.py @@ -14,7 +14,7 @@ from boto.exception import BotoServerError import pytest import sure # noqa -from moto import mock_elb, mock_ec2, mock_elb_deprecated, mock_ec2_deprecated +from moto import mock_acm, mock_elb, mock_ec2, mock_elb_deprecated, mock_ec2_deprecated from moto.core import ACCOUNT_ID from tests import EXAMPLE_AMI_ID from uuid import uuid4 @@ -201,42 +201,21 @@ def test_create_elb_in_multiple_region_boto3(): west_names.shouldnt.contain(name_east) -# Has boto3 equivalent -@mock_elb_deprecated -def test_create_load_balancer_with_certificate(): - conn = boto.connect_elb() - - zones = ["us-east-1a"] - ports = [ - ( - 443, - 8443, - "https", - "arn:aws:iam:{}:server-certificate/test-cert".format(ACCOUNT_ID), - ) - ] - conn.create_load_balancer("my-lb", zones, ports) - - balancers = conn.get_all_load_balancers() - balancer = balancers[0] - balancer.name.should.equal("my-lb") - balancer.scheme.should.equal("internet-facing") - set(balancer.availability_zones).should.equal(set(["us-east-1a"])) - listener = balancer.listeners[0] - listener.load_balancer_port.should.equal(443) - listener.instance_port.should.equal(8443) - listener.protocol.should.equal("HTTPS") - listener.ssl_certificate_id.should.equal( - "arn:aws:iam:{}:server-certificate/test-cert".format(ACCOUNT_ID) - ) - - +@mock_acm @mock_elb def test_create_load_balancer_with_certificate_boto3(): + acm_client = boto3.client("acm", region_name="us-east-2") + acm_request_response = acm_client.request_certificate( + DomainName="fake.domain.com", + DomainValidationOptions=[ + {"DomainName": "fake.domain.com", "ValidationDomain": "domain.com"}, + ], + ) + certificate_arn = acm_request_response["CertificateArn"] + client = boto3.client("elb", region_name="us-east-2") name = str(uuid4())[0:6] - cert_id = "arn:aws:iam:{}:server-certificate/test-cert".format(ACCOUNT_ID) client.create_load_balancer( LoadBalancerName=name, @@ -245,7 +224,7 @@ def test_create_load_balancer_with_certificate_boto3(): "Protocol": "https", "LoadBalancerPort": 8443, "InstancePort": 443, - "SSLCertificateId": cert_id, + "SSLCertificateId": certificate_arn, } ], AvailabilityZones=["us-east-1a"], @@ -257,7 +236,30 @@ def test_create_load_balancer_with_certificate_boto3(): listener = describe["ListenerDescriptions"][0]["Listener"] listener.should.have.key("Protocol").equal("HTTPS") - listener.should.have.key("SSLCertificateId").equals(cert_id) + listener.should.have.key("SSLCertificateId").equals(certificate_arn) + + +@mock_elb +def test_create_load_balancer_with_invalid_certificate(): + client = boto3.client("elb", region_name="us-east-2") + + name = str(uuid4())[0:6] + + with pytest.raises(ClientError) as exc: + client.create_load_balancer( + LoadBalancerName=name, + Listeners=[ + { + "Protocol": "https", + "LoadBalancerPort": 8443, + "InstancePort": 443, + "SSLCertificateId": "invalid_arn", + } + ], + AvailabilityZones=["us-east-1a"], + ) + err = exc.value.response["Error"] + err["Code"].should.equal("CertificateNotFoundException") @mock_elb @@ -433,26 +435,87 @@ def test_create_and_delete_listener_boto3_support(): list(balancer["ListenerDescriptions"]).should.have.length_of(1) -# Has boto3 equivalent -@mock_elb_deprecated -def test_set_sslcertificate(): - conn = boto.connect_elb() +@mock_acm +@mock_elb +def test_create_lb_listener_with_ssl_certificate(): + acm_client = boto3.client("acm", region_name="eu-west-1") + acm_request_response = acm_client.request_certificate( + DomainName="fake.domain.com", + DomainValidationOptions=[ + {"DomainName": "fake.domain.com", "ValidationDomain": "domain.com"}, + ], + ) + certificate_arn = acm_request_response["CertificateArn"] - zones = ["us-east-1a", "us-east-1b"] - ports = [(443, 8443, "tcp")] - conn.create_load_balancer("my-lb", zones, ports) - conn.set_lb_listener_SSL_certificate("my-lb", "443", "arn:certificate") - balancers = conn.get_all_load_balancers() - balancer = balancers[0] - listener1 = balancer.listeners[0] - listener1.load_balancer_port.should.equal(443) - listener1.instance_port.should.equal(8443) - listener1.protocol.should.equal("TCP") - listener1.ssl_certificate_id.should.equal("arn:certificate") + client = boto3.client("elb", region_name="eu-west-1") + + client.create_load_balancer( + LoadBalancerName="my-lb", + Listeners=[{"Protocol": "http", "LoadBalancerPort": 80, "InstancePort": 8080}], + AvailabilityZones=["us-east-1a", "us-east-1b"], + ) + + client.create_load_balancer_listeners( + LoadBalancerName="my-lb", + Listeners=[ + { + "Protocol": "tcp", + "LoadBalancerPort": 443, + "InstancePort": 8443, + "SSLCertificateId": certificate_arn, + } + ], + ) + balancer = client.describe_load_balancers()["LoadBalancerDescriptions"][0] + listeners = balancer["ListenerDescriptions"] + listeners.should.have.length_of(2) + + listeners[0]["Listener"]["Protocol"].should.equal("HTTP") + listeners[0]["Listener"]["SSLCertificateId"].should.equal("None") + + listeners[1]["Listener"]["Protocol"].should.equal("TCP") + listeners[1]["Listener"]["SSLCertificateId"].should.equal(certificate_arn) +@mock_acm +@mock_elb +def test_create_lb_listener_with_invalid_ssl_certificate(): + client = boto3.client("elb", region_name="eu-west-1") + + client.create_load_balancer( + LoadBalancerName="my-lb", + Listeners=[{"Protocol": "http", "LoadBalancerPort": 80, "InstancePort": 8080}], + AvailabilityZones=["us-east-1a", "us-east-1b"], + ) + + with pytest.raises(ClientError) as exc: + client.create_load_balancer_listeners( + LoadBalancerName="my-lb", + Listeners=[ + { + "Protocol": "tcp", + "LoadBalancerPort": 443, + "InstancePort": 8443, + "SSLCertificateId": "unknownarn", + } + ], + ) + err = exc.value.response["Error"] + err["Code"].should.equal("CertificateNotFoundException") + + +@mock_acm @mock_elb def test_set_sslcertificate_boto3(): + acm_client = boto3.client("acm", region_name="us-east-1") + acm_request_response = acm_client.request_certificate( + DomainName="fake.domain.com", + DomainValidationOptions=[ + {"DomainName": "fake.domain.com", "ValidationDomain": "domain.com"}, + ], + ) + certificate_arn = acm_request_response["CertificateArn"] + client = boto3.client("elb", region_name="us-east-1") lb_name = str(uuid4())[0:6] @@ -466,9 +529,7 @@ def test_set_sslcertificate_boto3(): ) client.set_load_balancer_listener_ssl_certificate( - LoadBalancerName=lb_name, - LoadBalancerPort=81, - SSLCertificateId="arn:certificate", + LoadBalancerName=lb_name, LoadBalancerPort=81, SSLCertificateId=certificate_arn, ) elb = client.describe_load_balancers()["LoadBalancerDescriptions"][0] @@ -479,7 +540,7 @@ def test_set_sslcertificate_boto3(): listener = elb["ListenerDescriptions"][1]["Listener"] listener.should.have.key("LoadBalancerPort").equals(81) - listener.should.have.key("SSLCertificateId").equals("arn:certificate") + listener.should.have.key("SSLCertificateId").equals(certificate_arn) # Has boto3 equivalent