Implement ACM export_certificate function (#4300)
Co-authored-by: Paul Roberts <paroberts@guidewire.com>
This commit is contained in:
parent
cb43134d44
commit
f9654f62da
@ -1,5 +1,6 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import base64
|
||||
import re
|
||||
import datetime
|
||||
from moto.core import BaseBackend, BaseModel
|
||||
@ -17,32 +18,32 @@ from cryptography.hazmat.backends import default_backend
|
||||
from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID
|
||||
|
||||
|
||||
GOOGLE_ROOT_CA = b"""-----BEGIN CERTIFICATE-----
|
||||
MIIEKDCCAxCgAwIBAgIQAQAhJYiw+lmnd+8Fe2Yn3zANBgkqhkiG9w0BAQsFADBC
|
||||
MQswCQYDVQQGEwJVUzEWMBQGA1UEChMNR2VvVHJ1c3QgSW5jLjEbMBkGA1UEAxMS
|
||||
R2VvVHJ1c3QgR2xvYmFsIENBMB4XDTE3MDUyMjExMzIzN1oXDTE4MTIzMTIzNTk1
|
||||
OVowSTELMAkGA1UEBhMCVVMxEzARBgNVBAoTCkdvb2dsZSBJbmMxJTAjBgNVBAMT
|
||||
HEdvb2dsZSBJbnRlcm5ldCBBdXRob3JpdHkgRzIwggEiMA0GCSqGSIb3DQEBAQUA
|
||||
A4IBDwAwggEKAoIBAQCcKgR3XNhQkToGo4Lg2FBIvIk/8RlwGohGfuCPxfGJziHu
|
||||
Wv5hDbcyRImgdAtTT1WkzoJile7rWV/G4QWAEsRelD+8W0g49FP3JOb7kekVxM/0
|
||||
Uw30SvyfVN59vqBrb4fA0FAfKDADQNoIc1Fsf/86PKc3Bo69SxEE630k3ub5/DFx
|
||||
+5TVYPMuSq9C0svqxGoassxT3RVLix/IGWEfzZ2oPmMrhDVpZYTIGcVGIvhTlb7j
|
||||
gEoQxirsupcgEcc5mRAEoPBhepUljE5SdeK27QjKFPzOImqzTs9GA5eXA37Asd57
|
||||
r0Uzz7o+cbfe9CUlwg01iZ2d+w4ReYkeN8WvjnJpAgMBAAGjggERMIIBDTAfBgNV
|
||||
HSMEGDAWgBTAephojYn7qwVkDBF9qn1luMrMTjAdBgNVHQ4EFgQUSt0GFhu89mi1
|
||||
dvWBtrtiGrpagS8wDgYDVR0PAQH/BAQDAgEGMC4GCCsGAQUFBwEBBCIwIDAeBggr
|
||||
BgEFBQcwAYYSaHR0cDovL2cuc3ltY2QuY29tMBIGA1UdEwEB/wQIMAYBAf8CAQAw
|
||||
NQYDVR0fBC4wLDAqoCigJoYkaHR0cDovL2cuc3ltY2IuY29tL2NybHMvZ3RnbG9i
|
||||
YWwuY3JsMCEGA1UdIAQaMBgwDAYKKwYBBAHWeQIFATAIBgZngQwBAgIwHQYDVR0l
|
||||
BBYwFAYIKwYBBQUHAwEGCCsGAQUFBwMCMA0GCSqGSIb3DQEBCwUAA4IBAQDKSeWs
|
||||
12Rkd1u+cfrP9B4jx5ppY1Rf60zWGSgjZGaOHMeHgGRfBIsmr5jfCnC8vBk97nsz
|
||||
qX+99AXUcLsFJnnqmseYuQcZZTTMPOk/xQH6bwx+23pwXEz+LQDwyr4tjrSogPsB
|
||||
E4jLnD/lu3fKOmc2887VJwJyQ6C9bgLxRwVxPgFZ6RGeGvOED4Cmong1L7bHon8X
|
||||
fOGLVq7uZ4hRJzBgpWJSwzfVO+qFKgE4h6LPcK2kesnE58rF2rwjMvL+GMJ74N87
|
||||
L9TQEOaWTPtEtyFkDbkAlDASJodYmDkFOA/MgkgMCkdm7r+0X8T/cKjhf4t5K7hl
|
||||
MqO5tzHpCvX2HzLc
|
||||
-----END CERTIFICATE-----"""
|
||||
# Added google root CA as AWS returns chain you gave it + root CA (provided or not)
|
||||
AWS_ROOT_CA = b"""-----BEGIN CERTIFICATE-----
|
||||
MIIESTCCAzGgAwIBAgITBntQXCplJ7wevi2i0ZmY7bibLDANBgkqhkiG9w0BAQsF
|
||||
ADA5MQswCQYDVQQGEwJVUzEPMA0GA1UEChMGQW1hem9uMRkwFwYDVQQDExBBbWF6
|
||||
b24gUm9vdCBDQSAxMB4XDTE1MTAyMTIyMjQzNFoXDTQwMTAyMTIyMjQzNFowRjEL
|
||||
MAkGA1UEBhMCVVMxDzANBgNVBAoTBkFtYXpvbjEVMBMGA1UECxMMU2VydmVyIENB
|
||||
IDFCMQ8wDQYDVQQDEwZBbWF6b24wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
|
||||
AoIBAQDCThZn3c68asg3Wuw6MLAd5tES6BIoSMzoKcG5blPVo+sDORrMd4f2AbnZ
|
||||
cMzPa43j4wNxhplty6aUKk4T1qe9BOwKFjwK6zmxxLVYo7bHViXsPlJ6qOMpFge5
|
||||
blDP+18x+B26A0piiQOuPkfyDyeR4xQghfj66Yo19V+emU3nazfvpFA+ROz6WoVm
|
||||
B5x+F2pV8xeKNR7u6azDdU5YVX1TawprmxRC1+WsAYmz6qP+z8ArDITC2FMVy2fw
|
||||
0IjKOtEXc/VfmtTFch5+AfGYMGMqqvJ6LcXiAhqG5TI+Dr0RtM88k+8XUBCeQ8IG
|
||||
KuANaL7TiItKZYxK1MMuTJtV9IblAgMBAAGjggE7MIIBNzASBgNVHRMBAf8ECDAG
|
||||
AQH/AgEAMA4GA1UdDwEB/wQEAwIBhjAdBgNVHQ4EFgQUWaRmBlKge5WSPKOUByeW
|
||||
dFv5PdAwHwYDVR0jBBgwFoAUhBjMhTTsvAyUlC4IWZzHshBOCggwewYIKwYBBQUH
|
||||
AQEEbzBtMC8GCCsGAQUFBzABhiNodHRwOi8vb2NzcC5yb290Y2ExLmFtYXpvbnRy
|
||||
dXN0LmNvbTA6BggrBgEFBQcwAoYuaHR0cDovL2NybC5yb290Y2ExLmFtYXpvbnRy
|
||||
dXN0LmNvbS9yb290Y2ExLmNlcjA/BgNVHR8EODA2MDSgMqAwhi5odHRwOi8vY3Js
|
||||
LnJvb3RjYTEuYW1hem9udHJ1c3QuY29tL3Jvb3RjYTEuY3JsMBMGA1UdIAQMMAow
|
||||
CAYGZ4EMAQIBMA0GCSqGSIb3DQEBCwUAA4IBAQAfsaEKwn17DjAbi/Die0etn+PE
|
||||
gfY/I6s8NLWkxGAOUfW2o+vVowNARRVjaIGdrhAfeWHkZI6q2pI0x/IJYmymmcWa
|
||||
ZaW/2R7DvQDtxCkFkVaxUeHvENm6IyqVhf6Q5oN12kDSrJozzx7I7tHjhBK7V5Xo
|
||||
TyS4NU4EhSyzGgj2x6axDd1hHRjblEpJ80LoiXlmUDzputBXyO5mkcrplcVvlIJi
|
||||
WmKjrDn2zzKxDX5nwvkskpIjYlJcrQu4iCX1/YwZ1yNqF9LryjlilphHCACiHbhI
|
||||
RnGfN8j8KLDVmWyTYMk8V+6j0LI4+4zFh2upqGMQHL3VFVFWBek6vCDWhB/b
|
||||
-----END CERTIFICATE-----"""
|
||||
# Added aws root CA as AWS returns chain you gave it + root CA (provided or not)
|
||||
# so for now a cheap response is just give any old root CA
|
||||
|
||||
|
||||
@ -146,9 +147,9 @@ class CertBundle(BaseModel):
|
||||
|
||||
# AWS always returns your chain + root CA
|
||||
if self.chain is None:
|
||||
self.chain = GOOGLE_ROOT_CA
|
||||
self.chain = AWS_ROOT_CA
|
||||
else:
|
||||
self.chain += b"\n" + GOOGLE_ROOT_CA
|
||||
self.chain += b"\n" + AWS_ROOT_CA
|
||||
|
||||
# Takes care of PEM checking
|
||||
self.validate_pk()
|
||||
@ -386,6 +387,16 @@ class CertBundle(BaseModel):
|
||||
|
||||
return result
|
||||
|
||||
def serialize_pk(self, passphrase_bytes):
|
||||
pk_bytes = self._key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm=serialization.BestAvailableEncryption(
|
||||
passphrase_bytes
|
||||
),
|
||||
)
|
||||
return pk_bytes.decode("utf-8")
|
||||
|
||||
def __str__(self):
|
||||
return self.arn
|
||||
|
||||
@ -524,6 +535,16 @@ class AWSCertificateManagerBackend(BaseBackend):
|
||||
cert_bundle = self.get_certificate(arn)
|
||||
cert_bundle.tags.remove(tags)
|
||||
|
||||
def export_certificate(self, certificate_arn, passphrase):
|
||||
passphrase_bytes = base64.standard_b64decode(passphrase)
|
||||
cert_bundle = self.get_certificate(certificate_arn)
|
||||
|
||||
certificate = cert_bundle.cert.decode()
|
||||
certificate_chain = cert_bundle.chain.decode()
|
||||
private_key = cert_bundle.serialize_pk(passphrase_bytes)
|
||||
|
||||
return certificate, certificate_chain, private_key
|
||||
|
||||
|
||||
acm_backends = {}
|
||||
for region, ec2_backend in ec2_backends.items():
|
||||
|
@ -264,3 +264,32 @@ class AWSCertificateManagerResponse(BaseResponse):
|
||||
return err.response()
|
||||
|
||||
return ""
|
||||
|
||||
def export_certificate(self):
|
||||
certificate_arn = self._get_param("CertificateArn")
|
||||
passphrase = self._get_param("Passphrase")
|
||||
|
||||
if certificate_arn is None:
|
||||
msg = "A required parameter for the specified action is not supplied."
|
||||
return (
|
||||
json.dumps({"__type": "MissingParameter", "message": msg}),
|
||||
dict(status=400),
|
||||
)
|
||||
|
||||
try:
|
||||
(
|
||||
certificate,
|
||||
certificate_chain,
|
||||
private_key,
|
||||
) = self.acm_backend.export_certificate(
|
||||
certificate_arn=certificate_arn, passphrase=passphrase,
|
||||
)
|
||||
return json.dumps(
|
||||
dict(
|
||||
Certificate=certificate,
|
||||
CertificateChain=certificate_chain,
|
||||
PrivateKey=private_key,
|
||||
)
|
||||
)
|
||||
except AWSError as err:
|
||||
return err.response()
|
||||
|
@ -1,4 +1,5 @@
|
||||
from __future__ import unicode_literals
|
||||
from moto.acm.models import AWS_ROOT_CA
|
||||
|
||||
import os
|
||||
import uuid
|
||||
@ -7,6 +8,8 @@ import boto3
|
||||
import pytest
|
||||
import sure # noqa
|
||||
from botocore.exceptions import ClientError
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization, hashes
|
||||
from freezegun import freeze_time
|
||||
from moto import mock_acm, settings
|
||||
from moto.core import ACCOUNT_ID
|
||||
@ -168,12 +171,42 @@ def test_describe_certificate():
|
||||
def test_describe_certificate_with_bad_arn():
|
||||
client = boto3.client("acm", region_name="eu-central-1")
|
||||
|
||||
try:
|
||||
with pytest.raises(ClientError) as err:
|
||||
client.describe_certificate(CertificateArn=BAD_ARN)
|
||||
except ClientError as err:
|
||||
err.response["Error"]["Code"].should.equal("ResourceNotFoundException")
|
||||
else:
|
||||
raise RuntimeError("Should of raised ResourceNotFoundException")
|
||||
|
||||
err.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
|
||||
|
||||
|
||||
@mock_acm
|
||||
def test_export_certificate():
|
||||
client = boto3.client("acm", region_name="eu-central-1")
|
||||
arn = _import_cert(client)
|
||||
|
||||
resp = client.export_certificate(CertificateArn=arn, Passphrase="pass")
|
||||
resp["Certificate"].should.equal(SERVER_CRT.decode())
|
||||
resp["CertificateChain"].should.equal(CA_CRT.decode() + "\n" + AWS_ROOT_CA.decode())
|
||||
resp.should.have.key("PrivateKey")
|
||||
|
||||
key = serialization.load_pem_private_key(
|
||||
bytes(resp["PrivateKey"], "utf-8"), password=b"pass", backend=default_backend()
|
||||
)
|
||||
|
||||
private_key = key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
)
|
||||
private_key.should.equal(SERVER_KEY)
|
||||
|
||||
|
||||
@mock_acm
|
||||
def test_export_certificate_with_bad_arn():
|
||||
client = boto3.client("acm", region_name="eu-central-1")
|
||||
|
||||
with pytest.raises(ClientError) as err:
|
||||
client.export_certificate(CertificateArn=BAD_ARN, Passphrase="pass")
|
||||
|
||||
err.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
|
||||
|
||||
|
||||
# Also tests ListTagsForCertificate
|
||||
|
Loading…
Reference in New Issue
Block a user