ELBv2 - Allow IAM certificates when calling modify_listener (#4823)

This commit is contained in:
Bert Blommers 2022-02-04 20:59:23 -01:00 committed by GitHub
parent e1ffd27201
commit bb6fb1200f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 99 additions and 14 deletions

View File

@ -11,7 +11,6 @@ from moto.core.utils import (
BackendDict,
)
from moto.ec2.models import ec2_backends
from moto.acm.models import acm_backends
from .utils import make_arn_for_target_group
from .utils import make_arn_for_load_balancer
from .exceptions import (
@ -582,16 +581,6 @@ class ELBv2Backend(BaseBackend):
"""
return ec2_backends[self.region_name]
@property
def acm_backend(self):
"""
ACM backend
:return: ACM Backend
:rtype: moto.acm.models.AWSCertificateManagerBackend
"""
return acm_backends[self.region_name]
def reset(self):
region_name = self.region_name
self.__dict__ = {}
@ -1420,9 +1409,7 @@ Member must satisfy regular expression pattern: {}".format(
if certificates:
default_cert = certificates[0]
default_cert_arn = default_cert["certificate_arn"]
try:
self.acm_backend.get_certificate(default_cert_arn)
except Exception:
if not self._certificate_exists(certificate_arn=default_cert_arn):
raise RESTError(
"CertificateNotFound",
"Certificate {0} not found".format(default_cert_arn),
@ -1449,6 +1436,30 @@ Member must satisfy regular expression pattern: {}".format(
return listener
def _certificate_exists(self, certificate_arn):
"""
Verify the provided certificate exists in either ACM or IAM
"""
from moto.acm import acm_backends
from moto.acm.models import AWSResourceNotFoundException
try:
acm_backend = acm_backends[self.region_name]
acm_backend.get_certificate(certificate_arn)
return True
except AWSResourceNotFoundException:
pass
from moto.iam import iam_backend
cert = iam_backend.get_certificate_by_arn(certificate_arn)
if cert is not None:
return True
# ACM threw an error, and IAM did not return a certificate
# Safe to assume it doesn't exist when we get here
return False
def _any_listener_using(self, target_group_arn):
for load_balancer in self.load_balancers.values():
for listener in load_balancer.listeners.values():

View File

@ -2054,6 +2054,12 @@ class IAMBackend(BaseBackend):
"The Server Certificate with name {0} cannot be " "found.".format(name)
)
def get_certificate_by_arn(self, arn):
for cert in self.certificates.values():
if arn == cert.arn:
return cert
return None
def delete_server_certificate(self, name):
cert_id = None
for key, cert in self.certificates.items():

View File

@ -0,0 +1,68 @@
import boto3
from moto import mock_acm, mock_ec2, mock_elbv2, mock_iam
@mock_acm
@mock_iam
@mock_ec2
@mock_elbv2
def test_modify_listener_using_iam_certificate():
# Verify we can add a listener for a TargetGroup that is already HTTPS
client = boto3.client("elbv2", region_name="eu-central-1")
acm = boto3.client("acm", region_name="eu-central-1")
ec2 = boto3.resource("ec2", region_name="eu-central-1")
iam = boto3.client("iam", region_name="us-east-1")
security_group = ec2.create_security_group(
GroupName="a-security-group", Description="First One"
)
vpc = ec2.create_vpc(CidrBlock="172.28.7.0/24", InstanceTenancy="default")
subnet1 = ec2.create_subnet(
VpcId=vpc.id, CidrBlock="172.28.7.192/26", AvailabilityZone="eu-central-1a"
)
response = client.create_load_balancer(
Name="my-lb",
Subnets=[subnet1.id],
SecurityGroups=[security_group.id],
Scheme="internal",
Tags=[{"Key": "key_name", "Value": "a_value"}],
)
load_balancer_arn = response.get("LoadBalancers")[0].get("LoadBalancerArn")
response = client.create_target_group(
Name="a-target", Protocol="HTTPS", Port=8443, VpcId=vpc.id,
)
target_group = response.get("TargetGroups")[0]
target_group_arn = target_group["TargetGroupArn"]
# HTTPS listener
response = acm.request_certificate(
DomainName="google.com", SubjectAlternativeNames=["google.com"],
)
google_arn = response["CertificateArn"]
response = client.create_listener(
LoadBalancerArn=load_balancer_arn,
Protocol="HTTPS",
Port=443,
Certificates=[{"CertificateArn": google_arn}],
DefaultActions=[{"Type": "forward", "TargetGroupArn": target_group_arn}],
)
listener_arn = response["Listeners"][0]["ListenerArn"]
# Now modify the HTTPS listener with an IAM certificate
resp = iam.upload_server_certificate(
ServerCertificateName="certname",
CertificateBody="certbody",
PrivateKey="privatekey",
)
iam_arn = resp["ServerCertificateMetadata"]["Arn"]
listener = client.modify_listener(
ListenerArn=listener_arn,
Certificates=[{"CertificateArn": iam_arn,},],
DefaultActions=[{"Type": "forward", "TargetGroupArn": target_group_arn}],
)["Listeners"][0]
listener["Certificates"].should.equal([{"CertificateArn": iam_arn}])