Transform certificates in listener in expected XML (#4049)

This commit is contained in:
Sahil Shah 2021-07-01 11:25:40 -04:00 committed by GitHub
parent 8cc439444f
commit 3ae4c23c23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 39 deletions

View File

@ -265,6 +265,7 @@ class FakeListener(CloudFormationModel):
certificates = properties.get("Certificates") certificates = properties.get("Certificates")
default_actions = elbv2_backend.convert_and_validate_properties(properties) default_actions = elbv2_backend.convert_and_validate_properties(properties)
certificates = elbv2_backend.convert_and_validate_certificates(certificates)
listener = elbv2_backend.create_listener( listener = elbv2_backend.create_listener(
load_balancer_arn, protocol, port, ssl_policy, certificates, default_actions load_balancer_arn, protocol, port, ssl_policy, certificates, default_actions
) )
@ -283,6 +284,7 @@ class FakeListener(CloudFormationModel):
certificates = properties.get("Certificates") certificates = properties.get("Certificates")
default_actions = elbv2_backend.convert_and_validate_properties(properties) default_actions = elbv2_backend.convert_and_validate_properties(properties)
certificates = elbv2_backend.convert_and_validate_certificates(certificates)
listener = elbv2_backend.modify_listener( listener = elbv2_backend.modify_listener(
original_resource.arn, original_resource.arn,
port, port,
@ -833,6 +835,14 @@ Member must satisfy regular expression pattern: {}".format(
self.target_groups[target_group.arn] = target_group self.target_groups[target_group.arn] = target_group
return target_group return target_group
def convert_and_validate_certificates(self, certificates):
# transform default certificate to conform with the rest of the code and XML templates
for cert in certificates or []:
cert["certificate_arn"] = cert["CertificateArn"]
return certificates
def convert_and_validate_properties(self, properties): def convert_and_validate_properties(self, properties):
# transform default actions to confirm with the rest of the code and XML templates # transform default actions to confirm with the rest of the code and XML templates
@ -1308,41 +1318,24 @@ Member must satisfy regular expression pattern: {}".format(
# HTTPS checks # HTTPS checks
if protocol == "HTTPS": if protocol == "HTTPS":
# HTTPS
# Might already be HTTPS so may not provide certs
if certificates is None and listener.protocol != "HTTPS":
raise RESTError(
"InvalidConfigurationRequest",
"Certificates must be provided for HTTPS",
)
# Check certificates exist # Check certificates exist
if certificates is not None: if certificates:
default_cert = None default_cert = certificates[0]
all_certs = set() # for SNI default_cert_arn = default_cert["certificate_arn"]
for cert in certificates: try:
if cert["is_default"] == "true": self.acm_backend.get_certificate(default_cert_arn)
default_cert = cert["certificate_arn"] except Exception:
try:
self.acm_backend.get_certificate(cert["certificate_arn"])
except Exception:
raise RESTError(
"CertificateNotFound",
"Certificate {0} not found".format(
cert["certificate_arn"]
),
)
all_certs.add(cert["certificate_arn"])
if default_cert is None:
raise RESTError( raise RESTError(
"InvalidConfigurationRequest", "No default certificate" "CertificateNotFound",
"Certificate {0} not found".format(default_cert_arn),
) )
listener.certificate = default_cert_arn
listener.certificate = default_cert listener.certificates = certificates
listener.certificates = list(all_certs) else:
raise RESTError(
"CertificateWereNotPassed",
"You must provide a list containing exactly one certificate if the listener protocol is HTTPS.",
)
listener.protocol = protocol listener.protocol = protocol

View File

@ -1767,10 +1767,7 @@ def test_modify_listener_http_to_https():
Port=443, Port=443,
Protocol="HTTPS", Protocol="HTTPS",
SslPolicy="ELBSecurityPolicy-TLS-1-2-2017-01", SslPolicy="ELBSecurityPolicy-TLS-1-2-2017-01",
Certificates=[ Certificates=[{"CertificateArn": yahoo_arn,},],
{"CertificateArn": google_arn, "IsDefault": False},
{"CertificateArn": yahoo_arn, "IsDefault": True},
],
DefaultActions=[{"Type": "forward", "TargetGroupArn": target_group_arn}], DefaultActions=[{"Type": "forward", "TargetGroupArn": target_group_arn}],
) )
response["Listeners"][0]["Port"].should.equal(443) response["Listeners"][0]["Port"].should.equal(443)
@ -1778,7 +1775,7 @@ def test_modify_listener_http_to_https():
response["Listeners"][0]["SslPolicy"].should.equal( response["Listeners"][0]["SslPolicy"].should.equal(
"ELBSecurityPolicy-TLS-1-2-2017-01" "ELBSecurityPolicy-TLS-1-2-2017-01"
) )
len(response["Listeners"][0]["Certificates"]).should.equal(2) len(response["Listeners"][0]["Certificates"]).should.equal(1)
# Check default cert, can't do this in server mode # Check default cert, can't do this in server mode
if os.environ.get("TEST_SERVER_MODE", "false").lower() == "false": if os.environ.get("TEST_SERVER_MODE", "false").lower() == "false":
@ -1790,15 +1787,20 @@ def test_modify_listener_http_to_https():
listener.certificate.should.equal(yahoo_arn) listener.certificate.should.equal(yahoo_arn)
# No default cert # No default cert
with pytest.raises(ClientError): with pytest.raises(ClientError) as ex:
client.modify_listener( client.modify_listener(
ListenerArn=listener_arn, ListenerArn=listener_arn,
Port=443, Port=443,
Protocol="HTTPS", Protocol="HTTPS",
SslPolicy="ELBSecurityPolicy-TLS-1-2-2017-01", SslPolicy="ELBSecurityPolicy-TLS-1-2-2017-01",
Certificates=[{"CertificateArn": google_arn, "IsDefault": False}], Certificates=[],
DefaultActions=[{"Type": "forward", "TargetGroupArn": target_group_arn}], DefaultActions=[{"Type": "forward", "TargetGroupArn": target_group_arn}],
) )
err = ex.value.response["Error"]
err["Code"].should.equal("CertificateWereNotPassed")
err["Message"].should.equal(
"You must provide a list containing exactly one certificate if the listener protocol is HTTPS."
)
# Bad cert # Bad cert
with pytest.raises(ClientError): with pytest.raises(ClientError):