Feature: ACM: in_use_by (#4414)

This commit is contained in:
Bert Blommers 2021-10-14 21:43:10 +00:00 committed by GitHub
parent d916fd636f
commit 1f13d6c406
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 192 additions and 57 deletions

View File

@ -144,6 +144,7 @@ class CertBundle(BaseModel):
self._chain = None
self.type = cert_type # Should really be an enum
self.status = cert_status # Should really be an enum
self.in_use_by = []
# AWS always returns your chain + root CA
if self.chain is None:
@ -361,7 +362,7 @@ class CertBundle(BaseModel):
"Certificate": {
"CertificateArn": self.arn,
"DomainName": self.common_name,
"InUseBy": [],
"InUseBy": self.in_use_by,
"Issuer": self._cert.issuer.get_attributes_for_oid(
cryptography.x509.OID_COMMON_NAME
)[0].value,
@ -430,6 +431,13 @@ class AWSCertificateManagerBackend(BaseBackend):
)
return AWSResourceNotFoundException(msg)
def set_certificate_in_use_by(self, arn, load_balancer_name):
if arn not in self._certificates:
raise self._arn_not_found(arn)
cert_bundle = self._certificates[arn]
cert_bundle.in_use_by.append(load_balancer_name)
def _get_arn_from_idempotency_token(self, token):
"""
If token doesnt exist, return None, later it will be

View File

@ -13,6 +13,13 @@ class DuplicateTagKeysError(ELBClientError):
)
class CertificateNotFoundException(ELBClientError):
def __init__(self):
super(CertificateNotFoundException, self).__init__(
"CertificateNotFoundException", "Supplied certificate was not found"
)
class LoadBalancerNotFoundError(ELBClientError):
def __init__(self, cidr):
super(LoadBalancerNotFoundError, self).__init__(

View File

@ -23,6 +23,7 @@ from .exceptions import (
InvalidSecurityGroupError,
LoadBalancerNotFoundError,
TooManyTagsError,
CertificateNotFoundException,
)
@ -79,6 +80,7 @@ class FakeLoadBalancer(CloudFormationModel):
vpc_id=None,
subnets=None,
security_groups=None,
elb_backend=None,
):
self.name = name
self.health_check = None
@ -111,6 +113,11 @@ class FakeLoadBalancer(CloudFormationModel):
"ssl_certificate_id", port.get("SSLCertificateId")
),
)
if listener.ssl_certificate_id:
elb_backend._register_certificate(
listener.ssl_certificate_id, self.dns_name
)
self.listeners.append(listener)
# it is unclear per the AWS documentation as to when or how backend
@ -310,6 +317,7 @@ class ELBBackend(BaseBackend):
subnets=subnets,
security_groups=security_groups,
vpc_id=vpc_id,
elb_backend=self,
)
self.load_balancers[name] = new_load_balancer
return new_load_balancer
@ -332,6 +340,10 @@ class ELBBackend(BaseBackend):
raise DuplicateListenerError(name, lb_port)
break
else:
if ssl_certificate_id:
self._register_certificate(
ssl_certificate_id, balancer.dns_name
)
balancer.listeners.append(
FakeListener(
lb_port, instance_port, protocol, ssl_certificate_id
@ -406,7 +418,10 @@ class ELBBackend(BaseBackend):
for idx, listener in enumerate(balancer.listeners):
if lb_port == listener.load_balancer_port:
balancer.listeners[idx].ssl_certificate_id = ssl_certificate_id
if ssl_certificate_id:
self._register_certificate(
ssl_certificate_id, balancer.dns_name
)
return balancer
def register_instances(
@ -509,6 +524,15 @@ class ELBBackend(BaseBackend):
load_balancer.listeners[listener_idx] = listener
return load_balancer
def _register_certificate(self, ssl_certificate_id, dns_name):
from moto.acm.models import acm_backends, AWSResourceNotFoundException
acm_backend = acm_backends[self.region_name]
try:
acm_backend.set_certificate_in_use_by(ssl_certificate_id, dns_name)
except AWSResourceNotFoundException:
raise CertificateNotFoundException()
elb_backends = {}
for region in ec2_backends.keys():

View File

@ -11,11 +11,11 @@ from botocore.exceptions import ClientError
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from freezegun import freeze_time
from moto import mock_acm, settings
from moto import mock_acm, mock_elb, settings
from moto.core import ACCOUNT_ID
from unittest import SkipTest, mock
RESOURCE_FOLDER = os.path.join(os.path.dirname(__file__), "resources")
_GET_RESOURCE = lambda x: open(os.path.join(RESOURCE_FOLDER, x), "rb").read()
CA_CRT = _GET_RESOURCE("ca.pem")
@ -640,3 +640,38 @@ def test_request_certificate_with_mutiple_times():
)
arn = resp["CertificateArn"]
arn.should_not.equal(original_arn)
@mock_acm
@mock_elb
def test_elb_acm_in_use_by():
acm_client = boto3.client("acm", region_name="us-west-2")
elb_client = boto3.client("elb", region_name="us-west-2")
acm_request_response = acm_client.request_certificate(
DomainName="fake.domain.com",
DomainValidationOptions=[
{"DomainName": "fake.domain.com", "ValidationDomain": "domain.com"}
],
)
certificate_arn = acm_request_response["CertificateArn"]
create_load_balancer_request = elb_client.create_load_balancer(
LoadBalancerName="test",
Listeners=[
{
"Protocol": "https",
"LoadBalancerPort": 443,
"InstanceProtocol": "http",
"InstancePort": 80,
"SSLCertificateId": certificate_arn,
}
],
)
response = acm_client.describe_certificate(CertificateArn=certificate_arn)
response["Certificate"]["InUseBy"].should.equal(
[create_load_balancer_request["DNSName"]]
)

View File

@ -14,7 +14,7 @@ from boto.exception import BotoServerError
import pytest
import sure # noqa
from moto import mock_elb, mock_ec2, mock_elb_deprecated, mock_ec2_deprecated
from moto import mock_acm, mock_elb, mock_ec2, mock_elb_deprecated, mock_ec2_deprecated
from moto.core import ACCOUNT_ID
from tests import EXAMPLE_AMI_ID
from uuid import uuid4
@ -201,42 +201,21 @@ def test_create_elb_in_multiple_region_boto3():
west_names.shouldnt.contain(name_east)
# Has boto3 equivalent
@mock_elb_deprecated
def test_create_load_balancer_with_certificate():
conn = boto.connect_elb()
zones = ["us-east-1a"]
ports = [
(
443,
8443,
"https",
"arn:aws:iam:{}:server-certificate/test-cert".format(ACCOUNT_ID),
)
]
conn.create_load_balancer("my-lb", zones, ports)
balancers = conn.get_all_load_balancers()
balancer = balancers[0]
balancer.name.should.equal("my-lb")
balancer.scheme.should.equal("internet-facing")
set(balancer.availability_zones).should.equal(set(["us-east-1a"]))
listener = balancer.listeners[0]
listener.load_balancer_port.should.equal(443)
listener.instance_port.should.equal(8443)
listener.protocol.should.equal("HTTPS")
listener.ssl_certificate_id.should.equal(
"arn:aws:iam:{}:server-certificate/test-cert".format(ACCOUNT_ID)
)
@mock_acm
@mock_elb
def test_create_load_balancer_with_certificate_boto3():
acm_client = boto3.client("acm", region_name="us-east-2")
acm_request_response = acm_client.request_certificate(
DomainName="fake.domain.com",
DomainValidationOptions=[
{"DomainName": "fake.domain.com", "ValidationDomain": "domain.com"},
],
)
certificate_arn = acm_request_response["CertificateArn"]
client = boto3.client("elb", region_name="us-east-2")
name = str(uuid4())[0:6]
cert_id = "arn:aws:iam:{}:server-certificate/test-cert".format(ACCOUNT_ID)
client.create_load_balancer(
LoadBalancerName=name,
@ -245,7 +224,7 @@ def test_create_load_balancer_with_certificate_boto3():
"Protocol": "https",
"LoadBalancerPort": 8443,
"InstancePort": 443,
"SSLCertificateId": cert_id,
"SSLCertificateId": certificate_arn,
}
],
AvailabilityZones=["us-east-1a"],
@ -257,7 +236,30 @@ def test_create_load_balancer_with_certificate_boto3():
listener = describe["ListenerDescriptions"][0]["Listener"]
listener.should.have.key("Protocol").equal("HTTPS")
listener.should.have.key("SSLCertificateId").equals(cert_id)
listener.should.have.key("SSLCertificateId").equals(certificate_arn)
@mock_elb
def test_create_load_balancer_with_invalid_certificate():
client = boto3.client("elb", region_name="us-east-2")
name = str(uuid4())[0:6]
with pytest.raises(ClientError) as exc:
client.create_load_balancer(
LoadBalancerName=name,
Listeners=[
{
"Protocol": "https",
"LoadBalancerPort": 8443,
"InstancePort": 443,
"SSLCertificateId": "invalid_arn",
}
],
AvailabilityZones=["us-east-1a"],
)
err = exc.value.response["Error"]
err["Code"].should.equal("CertificateNotFoundException")
@mock_elb
@ -433,26 +435,87 @@ def test_create_and_delete_listener_boto3_support():
list(balancer["ListenerDescriptions"]).should.have.length_of(1)
# Has boto3 equivalent
@mock_elb_deprecated
def test_set_sslcertificate():
conn = boto.connect_elb()
@mock_acm
@mock_elb
def test_create_lb_listener_with_ssl_certificate():
acm_client = boto3.client("acm", region_name="eu-west-1")
acm_request_response = acm_client.request_certificate(
DomainName="fake.domain.com",
DomainValidationOptions=[
{"DomainName": "fake.domain.com", "ValidationDomain": "domain.com"},
],
)
certificate_arn = acm_request_response["CertificateArn"]
zones = ["us-east-1a", "us-east-1b"]
ports = [(443, 8443, "tcp")]
conn.create_load_balancer("my-lb", zones, ports)
conn.set_lb_listener_SSL_certificate("my-lb", "443", "arn:certificate")
balancers = conn.get_all_load_balancers()
balancer = balancers[0]
listener1 = balancer.listeners[0]
listener1.load_balancer_port.should.equal(443)
listener1.instance_port.should.equal(8443)
listener1.protocol.should.equal("TCP")
listener1.ssl_certificate_id.should.equal("arn:certificate")
client = boto3.client("elb", region_name="eu-west-1")
client.create_load_balancer(
LoadBalancerName="my-lb",
Listeners=[{"Protocol": "http", "LoadBalancerPort": 80, "InstancePort": 8080}],
AvailabilityZones=["us-east-1a", "us-east-1b"],
)
client.create_load_balancer_listeners(
LoadBalancerName="my-lb",
Listeners=[
{
"Protocol": "tcp",
"LoadBalancerPort": 443,
"InstancePort": 8443,
"SSLCertificateId": certificate_arn,
}
],
)
balancer = client.describe_load_balancers()["LoadBalancerDescriptions"][0]
listeners = balancer["ListenerDescriptions"]
listeners.should.have.length_of(2)
listeners[0]["Listener"]["Protocol"].should.equal("HTTP")
listeners[0]["Listener"]["SSLCertificateId"].should.equal("None")
listeners[1]["Listener"]["Protocol"].should.equal("TCP")
listeners[1]["Listener"]["SSLCertificateId"].should.equal(certificate_arn)
@mock_acm
@mock_elb
def test_create_lb_listener_with_invalid_ssl_certificate():
client = boto3.client("elb", region_name="eu-west-1")
client.create_load_balancer(
LoadBalancerName="my-lb",
Listeners=[{"Protocol": "http", "LoadBalancerPort": 80, "InstancePort": 8080}],
AvailabilityZones=["us-east-1a", "us-east-1b"],
)
with pytest.raises(ClientError) as exc:
client.create_load_balancer_listeners(
LoadBalancerName="my-lb",
Listeners=[
{
"Protocol": "tcp",
"LoadBalancerPort": 443,
"InstancePort": 8443,
"SSLCertificateId": "unknownarn",
}
],
)
err = exc.value.response["Error"]
err["Code"].should.equal("CertificateNotFoundException")
@mock_acm
@mock_elb
def test_set_sslcertificate_boto3():
acm_client = boto3.client("acm", region_name="us-east-1")
acm_request_response = acm_client.request_certificate(
DomainName="fake.domain.com",
DomainValidationOptions=[
{"DomainName": "fake.domain.com", "ValidationDomain": "domain.com"},
],
)
certificate_arn = acm_request_response["CertificateArn"]
client = boto3.client("elb", region_name="us-east-1")
lb_name = str(uuid4())[0:6]
@ -466,9 +529,7 @@ def test_set_sslcertificate_boto3():
)
client.set_load_balancer_listener_ssl_certificate(
LoadBalancerName=lb_name,
LoadBalancerPort=81,
SSLCertificateId="arn:certificate",
LoadBalancerName=lb_name, LoadBalancerPort=81, SSLCertificateId=certificate_arn,
)
elb = client.describe_load_balancers()["LoadBalancerDescriptions"][0]
@ -479,7 +540,7 @@ def test_set_sslcertificate_boto3():
listener = elb["ListenerDescriptions"][1]["Listener"]
listener.should.have.key("LoadBalancerPort").equals(81)
listener.should.have.key("SSLCertificateId").equals("arn:certificate")
listener.should.have.key("SSLCertificateId").equals(certificate_arn)
# Has boto3 equivalent