From bb6fb1200f8db61e0d39c9a4da5954469a403fb8 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Fri, 4 Feb 2022 20:59:23 -0100 Subject: [PATCH] ELBv2 - Allow IAM certificates when calling modify_listener (#4823) --- moto/elbv2/models.py | 39 ++++++++----- moto/iam/models.py | 6 ++ tests/test_elbv2/test_elbv2_integration.py | 68 ++++++++++++++++++++++ 3 files changed, 99 insertions(+), 14 deletions(-) create mode 100644 tests/test_elbv2/test_elbv2_integration.py diff --git a/moto/elbv2/models.py b/moto/elbv2/models.py index d02127aea..7d5c1552f 100644 --- a/moto/elbv2/models.py +++ b/moto/elbv2/models.py @@ -11,7 +11,6 @@ from moto.core.utils import ( BackendDict, ) from moto.ec2.models import ec2_backends -from moto.acm.models import acm_backends from .utils import make_arn_for_target_group from .utils import make_arn_for_load_balancer from .exceptions import ( @@ -582,16 +581,6 @@ class ELBv2Backend(BaseBackend): """ return ec2_backends[self.region_name] - @property - def acm_backend(self): - """ - ACM backend - - :return: ACM Backend - :rtype: moto.acm.models.AWSCertificateManagerBackend - """ - return acm_backends[self.region_name] - def reset(self): region_name = self.region_name self.__dict__ = {} @@ -1420,9 +1409,7 @@ Member must satisfy regular expression pattern: {}".format( if certificates: default_cert = certificates[0] default_cert_arn = default_cert["certificate_arn"] - try: - self.acm_backend.get_certificate(default_cert_arn) - except Exception: + if not self._certificate_exists(certificate_arn=default_cert_arn): raise RESTError( "CertificateNotFound", "Certificate {0} not found".format(default_cert_arn), @@ -1449,6 +1436,30 @@ Member must satisfy regular expression pattern: {}".format( return listener + def _certificate_exists(self, certificate_arn): + """ + Verify the provided certificate exists in either ACM or IAM + """ + from moto.acm import acm_backends + from moto.acm.models import AWSResourceNotFoundException + + try: + acm_backend = acm_backends[self.region_name] + acm_backend.get_certificate(certificate_arn) + return True + except AWSResourceNotFoundException: + pass + + from moto.iam import iam_backend + + cert = iam_backend.get_certificate_by_arn(certificate_arn) + if cert is not None: + return True + + # ACM threw an error, and IAM did not return a certificate + # Safe to assume it doesn't exist when we get here + return False + def _any_listener_using(self, target_group_arn): for load_balancer in self.load_balancers.values(): for listener in load_balancer.listeners.values(): diff --git a/moto/iam/models.py b/moto/iam/models.py index f58cf5ab8..e06891782 100644 --- a/moto/iam/models.py +++ b/moto/iam/models.py @@ -2054,6 +2054,12 @@ class IAMBackend(BaseBackend): "The Server Certificate with name {0} cannot be " "found.".format(name) ) + def get_certificate_by_arn(self, arn): + for cert in self.certificates.values(): + if arn == cert.arn: + return cert + return None + def delete_server_certificate(self, name): cert_id = None for key, cert in self.certificates.items(): diff --git a/tests/test_elbv2/test_elbv2_integration.py b/tests/test_elbv2/test_elbv2_integration.py new file mode 100644 index 000000000..d10a2c21d --- /dev/null +++ b/tests/test_elbv2/test_elbv2_integration.py @@ -0,0 +1,68 @@ +import boto3 + +from moto import mock_acm, mock_ec2, mock_elbv2, mock_iam + + +@mock_acm +@mock_iam +@mock_ec2 +@mock_elbv2 +def test_modify_listener_using_iam_certificate(): + # Verify we can add a listener for a TargetGroup that is already HTTPS + client = boto3.client("elbv2", region_name="eu-central-1") + acm = boto3.client("acm", region_name="eu-central-1") + ec2 = boto3.resource("ec2", region_name="eu-central-1") + iam = boto3.client("iam", region_name="us-east-1") + + security_group = ec2.create_security_group( + GroupName="a-security-group", Description="First One" + ) + vpc = ec2.create_vpc(CidrBlock="172.28.7.0/24", InstanceTenancy="default") + subnet1 = ec2.create_subnet( + VpcId=vpc.id, CidrBlock="172.28.7.192/26", AvailabilityZone="eu-central-1a" + ) + + response = client.create_load_balancer( + Name="my-lb", + Subnets=[subnet1.id], + SecurityGroups=[security_group.id], + Scheme="internal", + Tags=[{"Key": "key_name", "Value": "a_value"}], + ) + + load_balancer_arn = response.get("LoadBalancers")[0].get("LoadBalancerArn") + + response = client.create_target_group( + Name="a-target", Protocol="HTTPS", Port=8443, VpcId=vpc.id, + ) + target_group = response.get("TargetGroups")[0] + target_group_arn = target_group["TargetGroupArn"] + + # HTTPS listener + response = acm.request_certificate( + DomainName="google.com", SubjectAlternativeNames=["google.com"], + ) + google_arn = response["CertificateArn"] + response = client.create_listener( + LoadBalancerArn=load_balancer_arn, + Protocol="HTTPS", + Port=443, + Certificates=[{"CertificateArn": google_arn}], + DefaultActions=[{"Type": "forward", "TargetGroupArn": target_group_arn}], + ) + listener_arn = response["Listeners"][0]["ListenerArn"] + + # Now modify the HTTPS listener with an IAM certificate + resp = iam.upload_server_certificate( + ServerCertificateName="certname", + CertificateBody="certbody", + PrivateKey="privatekey", + ) + iam_arn = resp["ServerCertificateMetadata"]["Arn"] + + listener = client.modify_listener( + ListenerArn=listener_arn, + Certificates=[{"CertificateArn": iam_arn,},], + DefaultActions=[{"Type": "forward", "TargetGroupArn": target_group_arn}], + )["Listeners"][0] + listener["Certificates"].should.equal([{"CertificateArn": iam_arn}])