Feature: ACM-PCA (#5712)

This commit is contained in:
Bert Blommers 2022-11-28 12:22:48 -01:00 committed by GitHub
parent 22539585e7
commit 672c95384a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 865 additions and 7 deletions

View File

@ -16,6 +16,7 @@ def lazy_load(module_name, element, boto3_name=None, backend=None):
mock_acm = lazy_load(".acm", "mock_acm")
mock_acmpca = lazy_load(".acmpca", "mock_acmpca", boto3_name="acm-pca")
mock_amp = lazy_load(".amp", "mock_amp")
mock_apigateway = lazy_load(".apigateway", "mock_apigateway")
mock_apigatewayv2 = lazy_load(".apigatewayv2", "mock_apigatewayv2")
@ -78,6 +79,9 @@ mock_emr = lazy_load(".emr", "mock_emr")
mock_emrcontainers = lazy_load(
".emrcontainers", "mock_emrcontainers", boto3_name="emr-containers"
)
mock_emrserverless = lazy_load(
".emrserverless", "mock_emrserverless", boto3_name="emr-serverless"
)
mock_es = lazy_load(".es", "mock_es")
mock_events = lazy_load(".events", "mock_events")
mock_firehose = lazy_load(".firehose", "mock_firehose")
@ -163,9 +167,6 @@ mock_xray = lazy_load(".xray", "mock_xray")
mock_xray_client = lazy_load(".xray", "mock_xray_client")
mock_wafv2 = lazy_load(".wafv2", "mock_wafv2")
mock_textract = lazy_load(".textract", "mock_textract")
mock_emrserverless = lazy_load(
".emrserverless", "mock_emrserverless", boto3_name="emr-serverless"
)
class MockAll(ContextDecorator):

5
moto/acmpca/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""acmpca module initialization; sets value for base decorator."""
from .models import acmpca_backends
from ..core.models import base_decorator
mock_acmpca = base_decorator(acmpca_backends)

View File

@ -0,0 +1,7 @@
"""Exceptions raised by the acmpca service."""
from moto.core.exceptions import JsonRESTError
class ResourceNotFoundException(JsonRESTError):
def __init__(self, arn: str):
super().__init__("ResourceNotFoundException", f"Resource {arn} not found")

305
moto/acmpca/models.py Normal file
View File

@ -0,0 +1,305 @@
"""ACMPCABackend class with methods for supported APIs."""
import base64
from .exceptions import ResourceNotFoundException
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.core.utils import unix_time
from moto.moto_api._internal import mock_random
from moto.utilities.tagging_service import TaggingService
import datetime
import cryptography.x509
from cryptography.x509 import NameOID, load_pem_x509_certificate, Certificate
import cryptography.hazmat.primitives.asymmetric.rsa
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.backends import default_backend
from typing import Any, Dict, List, Optional, Tuple
class CertificateAuthority(BaseModel):
def __init__(
self,
region: str,
account_id: str,
certificate_authority_configuration: Dict[str, Any],
certificate_authority_type: str,
revocation_configuration: Dict[str, Any],
):
self.id = mock_random.uuid4()
self.arn = (
f"arn:aws:acm-pca:{region}:{account_id}:certificate-authority/{self.id}"
)
self.account_id = account_id
self.region_name = region
self.certificate_authority_configuration = certificate_authority_configuration
self.certificate_authority_type = certificate_authority_type
self.revocation_configuration: Dict[str, Any] = {
"CrlConfiguration": {"Enabled": False}
}
self.set_revocation_configuration(revocation_configuration)
self.created_at = unix_time()
self.updated_at: Optional[float] = None
self.status = "PENDING_CERTIFICATE"
common_name = self.certificate_authority_configuration.get("Subject", {}).get(
"CommonName", "Moto.org"
)
self.key = cryptography.hazmat.primitives.asymmetric.rsa.generate_private_key(
public_exponent=65537, key_size=2048
)
self.password = str(mock_random.uuid4()).encode("utf-8")
self.private_bytes = self.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.BestAvailableEncryption(self.password),
)
self.certificate: Optional[Certificate] = None
self.certificate_chain: Optional[bytes] = None
self.csr = self.generate_csr(common_name)
self.issued_certificates: Dict[str, bytes] = dict()
def generate_cert(self, common_name: str, subject: cryptography.x509.Name) -> bytes:
issuer = cryptography.x509.Name(
[ # C = US, O = Amazon, OU = Server CA 1B, CN = Amazon
cryptography.x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
cryptography.x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Amazon"),
cryptography.x509.NameAttribute(
NameOID.ORGANIZATIONAL_UNIT_NAME, "Server CA 1B"
),
cryptography.x509.NameAttribute(NameOID.COMMON_NAME, common_name),
]
)
cert = (
cryptography.x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(self.key.public_key())
.serial_number(cryptography.x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=365))
.sign(self.key, hashes.SHA512(), default_backend())
)
return cert.public_bytes(serialization.Encoding.PEM)
def generate_csr(self, common_name: str) -> bytes:
csr = (
cryptography.x509.CertificateSigningRequestBuilder()
.subject_name(
cryptography.x509.Name(
[
cryptography.x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
cryptography.x509.NameAttribute(
NameOID.STATE_OR_PROVINCE_NAME, "California"
),
cryptography.x509.NameAttribute(
NameOID.LOCALITY_NAME, "San Francisco"
),
cryptography.x509.NameAttribute(
NameOID.ORGANIZATION_NAME, "My Company"
),
cryptography.x509.NameAttribute(
NameOID.COMMON_NAME, common_name
),
]
)
)
.sign(self.key, hashes.SHA256())
)
return csr.public_bytes(serialization.Encoding.PEM)
def issue_certificate(self, csr_bytes: bytes) -> str:
cert = cryptography.x509.load_pem_x509_csr(base64.b64decode(csr_bytes))
new_cert = self.generate_cert(common_name="", subject=cert.subject)
cert_id = str(mock_random.uuid4()).replace("-", "")
cert_arn = f"arn:aws:acm-pca:{self.region_name}:{self.account_id}:certificate-authority/{self.id}/certificate/{cert_id}"
self.issued_certificates[cert_arn] = new_cert
return cert_arn
def get_certificate(self, certificate_arn: str) -> bytes:
return self.issued_certificates[certificate_arn]
def set_revocation_configuration(
self, revocation_configuration: Optional[Dict[str, Any]]
) -> None:
if revocation_configuration is not None:
self.revocation_configuration = revocation_configuration
if "CrlConfiguration" in self.revocation_configuration:
if (
"S3ObjectAcl"
not in self.revocation_configuration["CrlConfiguration"]
):
self.revocation_configuration["CrlConfiguration"][
"S3ObjectAcl"
] = "PUBLIC_READ"
@property
def certificate_bytes(self) -> bytes:
if self.certificate:
return self.certificate.public_bytes(serialization.Encoding.PEM)
return b""
@property
def not_valid_after(self) -> Optional[float]:
if self.certificate is None:
return None
return unix_time(self.certificate.not_valid_after)
@property
def not_valid_before(self) -> Optional[float]:
if self.certificate is None:
return None
return unix_time(self.certificate.not_valid_before)
def import_certificate_authority_certificate(
self, certificate: bytes, certificate_chain: Optional[bytes]
) -> None:
self.certificate = load_pem_x509_certificate(certificate)
self.certificate_chain = certificate_chain
self.status = "ACTIVE"
self.updated_at = unix_time()
def to_json(self) -> Dict[str, Any]:
dct = {
"Arn": self.arn,
"OwnerAccount": self.account_id,
"CertificateAuthorityConfiguration": self.certificate_authority_configuration,
"Type": self.certificate_authority_type,
"RevocationConfiguration": self.revocation_configuration,
"CreatedAt": self.created_at,
"Status": self.status,
}
if self.updated_at:
dct["LastStateChangeAt"] = self.updated_at
if self.certificate:
dct.update(
{
"NotBefore": self.not_valid_before,
"NotAfter": self.not_valid_after,
}
)
return dct
class ACMPCABackend(BaseBackend):
"""Implementation of ACMPCA APIs."""
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.certificate_authorities: Dict[str, CertificateAuthority] = dict()
self.tagger = TaggingService()
def create_certificate_authority(
self,
certificate_authority_configuration: Dict[str, Any],
revocation_configuration: Dict[str, Any],
certificate_authority_type: str,
tags: List[Dict[str, str]],
) -> str:
"""
The following parameters are not yet implemented: IdempotencyToken, KeyStorageSecurityStandard, UsageMode
"""
authority = CertificateAuthority(
region=self.region_name,
account_id=self.account_id,
certificate_authority_configuration=certificate_authority_configuration,
certificate_authority_type=certificate_authority_type,
revocation_configuration=revocation_configuration,
)
self.certificate_authorities[authority.arn] = authority
if tags:
self.tagger.tag_resource(authority.arn, tags)
return authority.arn
def describe_certificate_authority(
self, certificate_authority_arn: str
) -> CertificateAuthority:
if certificate_authority_arn not in self.certificate_authorities:
raise ResourceNotFoundException(certificate_authority_arn)
return self.certificate_authorities[certificate_authority_arn]
def get_certificate_authority_certificate(
self, certificate_authority_arn: str
) -> Tuple[bytes, Optional[bytes]]:
ca = self.describe_certificate_authority(certificate_authority_arn)
return ca.certificate_bytes, ca.certificate_chain
def get_certificate_authority_csr(self, certificate_authority_arn: str) -> bytes:
ca = self.describe_certificate_authority(certificate_authority_arn)
return ca.csr
def list_tags(
self, certificate_authority_arn: str
) -> Dict[str, List[Dict[str, str]]]:
"""
Pagination is not yet implemented
"""
return self.tagger.list_tags_for_resource(certificate_authority_arn)
def update_certificate_authority(
self,
certificate_authority_arn: str,
revocation_configuration: Dict[str, Any],
status: str,
) -> None:
ca = self.describe_certificate_authority(certificate_authority_arn)
if status is not None:
ca.status = status
ca.set_revocation_configuration(revocation_configuration)
ca.updated_at = unix_time()
def delete_certificate_authority(self, certificate_authority_arn: str) -> None:
ca = self.describe_certificate_authority(certificate_authority_arn)
ca.status = "DELETED"
def issue_certificate(self, certificate_authority_arn: str, csr: bytes) -> str:
"""
The following parameters are not yet implemented: ApiPassthrough, SigningAlgorithm, TemplateArn, Validity, ValidityNotBefore, IdempotencyToken
Some fields of the resulting certificate will have default values, instead of using the CSR
"""
ca = self.describe_certificate_authority(certificate_authority_arn)
certificate_arn = ca.issue_certificate(csr)
return certificate_arn
def get_certificate(
self, certificate_authority_arn: str, certificate_arn: str
) -> Tuple[bytes, Optional[str]]:
"""
The CertificateChain will always return None for now
"""
ca = self.describe_certificate_authority(certificate_authority_arn)
certificate = ca.get_certificate(certificate_arn)
certificate_chain = None
return certificate, certificate_chain
def import_certificate_authority_certificate(
self,
certificate_authority_arn: str,
certificate: bytes,
certificate_chain: Optional[bytes],
) -> None:
ca = self.describe_certificate_authority(certificate_authority_arn)
ca.import_certificate_authority_certificate(certificate, certificate_chain)
def revoke_certificate(
self,
certificate_authority_arn: str,
certificate_serial: str,
revocation_reason: str,
) -> None:
"""
This is currently a NO-OP
"""
def tag_certificate_authority(
self, certificate_authority_arn: str, tags: List[Dict[str, str]]
) -> None:
self.tagger.tag_resource(certificate_authority_arn, tags)
def untag_certificate_authority(
self, certificate_authority_arn: str, tags: List[Dict[str, str]]
) -> None:
self.tagger.untag_resource_using_tags(certificate_authority_arn, tags)
acmpca_backends = BackendDict(ACMPCABackend, "acm-pca")

164
moto/acmpca/responses.py Normal file
View File

@ -0,0 +1,164 @@
"""Handles incoming acmpca requests, invokes methods, returns responses."""
import base64
import json
from moto.core.responses import BaseResponse
from .models import acmpca_backends, ACMPCABackend
class ACMPCAResponse(BaseResponse):
"""Handler for ACMPCA requests and responses."""
def __init__(self) -> None:
super().__init__(service_name="acmpca")
@property
def acmpca_backend(self) -> ACMPCABackend:
"""Return backend instance specific for this region."""
return acmpca_backends[self.current_account][self.region]
def create_certificate_authority(self) -> str:
params = json.loads(self.body)
certificate_authority_configuration = params.get(
"CertificateAuthorityConfiguration"
)
revocation_configuration = params.get("RevocationConfiguration")
certificate_authority_type = params.get("CertificateAuthorityType")
tags = params.get("Tags")
certificate_authority_arn = self.acmpca_backend.create_certificate_authority(
certificate_authority_configuration=certificate_authority_configuration,
revocation_configuration=revocation_configuration,
certificate_authority_type=certificate_authority_type,
tags=tags,
)
return json.dumps(dict(CertificateAuthorityArn=certificate_authority_arn))
def describe_certificate_authority(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
certificate_authority = self.acmpca_backend.describe_certificate_authority(
certificate_authority_arn=certificate_authority_arn,
)
return json.dumps(dict(CertificateAuthority=certificate_authority.to_json()))
def get_certificate_authority_certificate(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
(
certificate,
certificate_chain,
) = self.acmpca_backend.get_certificate_authority_certificate(
certificate_authority_arn=certificate_authority_arn,
)
return json.dumps(
dict(
Certificate=certificate.decode("utf-8"),
CertificateChain=certificate_chain,
)
)
def get_certificate_authority_csr(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
csr = self.acmpca_backend.get_certificate_authority_csr(
certificate_authority_arn=certificate_authority_arn,
)
return json.dumps(dict(Csr=csr.decode("utf-8")))
def list_tags(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
tags = self.acmpca_backend.list_tags(
certificate_authority_arn=certificate_authority_arn
)
return json.dumps(tags)
def update_certificate_authority(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
revocation_configuration = params.get("RevocationConfiguration")
status = params.get("Status")
self.acmpca_backend.update_certificate_authority(
certificate_authority_arn=certificate_authority_arn,
revocation_configuration=revocation_configuration,
status=status,
)
return "{}"
def delete_certificate_authority(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
self.acmpca_backend.delete_certificate_authority(
certificate_authority_arn=certificate_authority_arn
)
return "{}"
def issue_certificate(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
csr = params.get("Csr").encode("utf-8")
certificate_arn = self.acmpca_backend.issue_certificate(
certificate_authority_arn=certificate_authority_arn,
csr=csr,
)
return json.dumps(dict(CertificateArn=certificate_arn))
def get_certificate(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
certificate_arn = params.get("CertificateArn")
certificate, certificate_chain = self.acmpca_backend.get_certificate(
certificate_authority_arn=certificate_authority_arn,
certificate_arn=certificate_arn,
)
return json.dumps(
dict(
Certificate=certificate.decode("utf-8"),
CertificateChain=certificate_chain,
)
)
def import_certificate_authority_certificate(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
certificate = params.get("Certificate")
certificate_bytes = base64.b64decode(certificate)
certificate_chain = params.get("CertificateChain")
self.acmpca_backend.import_certificate_authority_certificate(
certificate_authority_arn=certificate_authority_arn,
certificate=certificate_bytes,
certificate_chain=certificate_chain,
)
return "{}"
def revoke_certificate(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
certificate_serial = params.get("CertificateSerial")
revocation_reason = params.get("RevocationReason")
self.acmpca_backend.revoke_certificate(
certificate_authority_arn=certificate_authority_arn,
certificate_serial=certificate_serial,
revocation_reason=revocation_reason,
)
return "{}"
def tag_certificate_authority(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
tags = params.get("Tags")
self.acmpca_backend.tag_certificate_authority(
certificate_authority_arn=certificate_authority_arn,
tags=tags,
)
return "{}"
def untag_certificate_authority(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
tags = params.get("Tags")
self.acmpca_backend.untag_certificate_authority(
certificate_authority_arn=certificate_authority_arn,
tags=tags,
)
return "{}"

11
moto/acmpca/urls.py Normal file
View File

@ -0,0 +1,11 @@
"""acmpca base URL and path."""
from .responses import ACMPCAResponse
url_bases = [
r"https?://acm-pca\.(.+)\.amazonaws\.com",
]
url_paths = {
"{0}/$": ACMPCAResponse.dispatch,
}

View File

@ -3,6 +3,7 @@ import re
backend_url_patterns = [
("acm", re.compile("https?://acm\\.(.+)\\.amazonaws\\.com")),
("acm-pca", re.compile("https?://acm-pca\\.(.+)\\.amazonaws\\.com")),
("amp", re.compile("https?://aps\\.(.+)\\.amazonaws\\.com")),
("apigateway", re.compile("https?://apigateway\\.(.+)\\.amazonaws.com")),
(

View File

@ -149,12 +149,12 @@ def append_mock_to_init_py(service):
with open(path, encoding="utf-8") as fhandle:
lines = [_.replace("\n", "") for _ in fhandle.readlines()]
if any(_ for _ in lines if re.match(f"^mock_{service}.*lazy_load(.*)$", _)):
escaped_service = get_escaped_service(service)
if any(_ for _ in lines if _.startswith(f"^mock_{escaped_service} = lazy_load")):
return
filtered_lines = [_ for _ in lines if re.match("^mock_.*lazy_load(.*)$", _)]
last_import_line_index = lines.index(filtered_lines[-1])
escaped_service = get_escaped_service(service)
new_line = (
f"mock_{escaped_service} = lazy_load("
f'".{escaped_service}", "mock_{escaped_service}", boto3_name="{service}")'

View File

@ -1,7 +1,6 @@
"""{{ service_class }}Backend class with methods for supported APIs."""
from moto.core import BaseBackend, BaseModel
from moto.core.utils import BackendDict
from moto.core import BaseBackend, BackendDict, BaseModel
class {{ service_class }}Backend(BaseBackend):

View File

@ -1,5 +1,10 @@
acm:
- TestAccACMCertificateDataSource
acmpca:
- TestAccACMPCACertificateAuthority_
- TestAccACMPCACertificateAuthorityDataSource
- TestAccACMPCACertificateAuthorityCertificate
- TestAccACMPCACertificateDataSource
amp:
- TestAccAMPWorkspace
- TestAccAMPRuleGroupNamespace

View File

View File

@ -0,0 +1,360 @@
"""Unit tests for acmpca-supported APIs."""
import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
from moto import mock_acmpca
from moto.core import DEFAULT_ACCOUNT_ID
import datetime
import cryptography.x509
from cryptography.x509 import NameOID
import cryptography.hazmat.primitives.asymmetric.rsa
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.backends import default_backend
# See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
@mock_acmpca
def test_create_certificate_authority():
client = boto3.client("acm-pca", region_name="eu-west-1")
resp = client.create_certificate_authority(
CertificateAuthorityConfiguration={
"KeyAlgorithm": "RSA_4096",
"SigningAlgorithm": "SHA512WITHRSA",
"Subject": {"CommonName": "yscb41lw.test"},
},
CertificateAuthorityType="SUBORDINATE",
IdempotencyToken="terraform-20221125230308947400000001",
)
resp.should.have.key("CertificateAuthorityArn").match(
f"^arn:aws:acm-pca:eu-west-1:{DEFAULT_ACCOUNT_ID}:certificate-authority/"
)
@mock_acmpca
def test_describe_certificate_authority():
client = boto3.client("acm-pca", region_name="ap-southeast-1")
ca_arn = client.create_certificate_authority(
CertificateAuthorityConfiguration={
"KeyAlgorithm": "RSA_4096",
"SigningAlgorithm": "SHA512WITHRSA",
"Subject": {"CommonName": "yscb41lw.test"},
},
CertificateAuthorityType="SUBORDINATE",
IdempotencyToken="terraform-20221125230308947400000001",
)["CertificateAuthorityArn"]
ca = client.describe_certificate_authority(CertificateAuthorityArn=ca_arn)[
"CertificateAuthority"
]
ca.should.have.key("Arn").equals(ca_arn)
ca.should.have.key("OwnerAccount").equals(DEFAULT_ACCOUNT_ID)
ca.should.have.key("CreatedAt")
ca.should.have.key("Type").equals("SUBORDINATE")
ca.should.have.key("Status").equals("PENDING_CERTIFICATE")
ca.should.have.key("CertificateAuthorityConfiguration").equals(
{
"KeyAlgorithm": "RSA_4096",
"SigningAlgorithm": "SHA512WITHRSA",
"Subject": {"CommonName": "yscb41lw.test"},
}
)
@mock_acmpca
def test_describe_unknown_certificate_authority():
client = boto3.client("acm-pca", region_name="ap-southeast-1")
with pytest.raises(ClientError) as exc:
client.describe_certificate_authority(CertificateAuthorityArn="unknown")
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
@mock_acmpca
def test_get_certificate_authority_certificate():
client = boto3.client("acm-pca", region_name="ap-southeast-1")
ca_arn = client.create_certificate_authority(
CertificateAuthorityConfiguration={
"KeyAlgorithm": "RSA_4096",
"SigningAlgorithm": "SHA512WITHRSA",
"Subject": {"CommonName": "yscb41lw.test"},
},
CertificateAuthorityType="SUBORDINATE",
IdempotencyToken="terraform-20221125230308947400000001",
)["CertificateAuthorityArn"]
resp = client.get_certificate_authority_certificate(CertificateAuthorityArn=ca_arn)
# Certificate is empty for now, until we call import_certificate_authority_certificate
resp.should.have.key("Certificate").equals("")
@mock_acmpca
def test_get_certificate_authority_csr():
client = boto3.client("acm-pca", region_name="us-east-2")
ca_arn = client.create_certificate_authority(
CertificateAuthorityConfiguration={
"KeyAlgorithm": "RSA_4096",
"SigningAlgorithm": "SHA512WITHRSA",
"Subject": {"CommonName": "yscb41lw.test"},
},
CertificateAuthorityType="SUBORDINATE",
IdempotencyToken="terraform-20221125230308947400000001",
)["CertificateAuthorityArn"]
resp = client.get_certificate_authority_csr(CertificateAuthorityArn=ca_arn)
resp.should.have.key("Csr")
@mock_acmpca
def test_list_tags_when_ca_has_no_tags():
client = boto3.client("acm-pca", region_name="us-east-2")
ca_arn = client.create_certificate_authority(
CertificateAuthorityConfiguration={
"KeyAlgorithm": "RSA_4096",
"SigningAlgorithm": "SHA512WITHRSA",
"Subject": {"CommonName": "yscb41lw.test"},
},
CertificateAuthorityType="SUBORDINATE",
IdempotencyToken="terraform-20221125230308947400000001",
)["CertificateAuthorityArn"]
resp = client.list_tags(CertificateAuthorityArn=ca_arn)
resp.should.have.key("Tags").equals([])
@mock_acmpca
def test_list_tags():
client = boto3.client("acm-pca", region_name="us-east-2")
ca_arn = client.create_certificate_authority(
CertificateAuthorityConfiguration={
"KeyAlgorithm": "RSA_4096",
"SigningAlgorithm": "SHA512WITHRSA",
"Subject": {"CommonName": "yscb41lw.test"},
},
CertificateAuthorityType="SUBORDINATE",
IdempotencyToken="terraform-20221125230308947400000001",
Tags=[{"Key": "t1", "Value": "v1"}, {"Key": "t2", "Value": "v2"}],
)["CertificateAuthorityArn"]
resp = client.list_tags(CertificateAuthorityArn=ca_arn)
resp.should.have.key("Tags").equals(
[{"Key": "t1", "Value": "v1"}, {"Key": "t2", "Value": "v2"}]
)
@mock_acmpca
def test_update_certificate_authority():
client = boto3.client("acm-pca", region_name="eu-west-1")
ca_arn = client.create_certificate_authority(
CertificateAuthorityConfiguration={
"KeyAlgorithm": "RSA_4096",
"SigningAlgorithm": "SHA512WITHRSA",
"Subject": {"CommonName": "yscb41lw.test"},
},
CertificateAuthorityType="SUBORDINATE",
)["CertificateAuthorityArn"]
client.update_certificate_authority(
CertificateAuthorityArn=ca_arn,
Status="DISABLED",
)
ca = client.describe_certificate_authority(CertificateAuthorityArn=ca_arn)[
"CertificateAuthority"
]
ca.should.have.key("Status").equals("DISABLED")
ca.should.have.key("LastStateChangeAt")
@mock_acmpca
def test_delete_certificate_authority():
client = boto3.client("acm-pca", region_name="ap-southeast-1")
ca_arn = client.create_certificate_authority(
CertificateAuthorityConfiguration={
"KeyAlgorithm": "RSA_4096",
"SigningAlgorithm": "SHA512WITHRSA",
"Subject": {"CommonName": "yscb41lw.test"},
},
CertificateAuthorityType="SUBORDINATE",
)["CertificateAuthorityArn"]
client.delete_certificate_authority(CertificateAuthorityArn=ca_arn)
ca = client.describe_certificate_authority(CertificateAuthorityArn=ca_arn)[
"CertificateAuthority"
]
ca.should.have.key("Status").equals("DELETED")
@mock_acmpca
def test_issue_certificate():
client = boto3.client("acm-pca", region_name="ap-southeast-1")
ca_arn = client.create_certificate_authority(
CertificateAuthorityConfiguration={
"KeyAlgorithm": "RSA_4096",
"SigningAlgorithm": "SHA512WITHRSA",
"Subject": {"CommonName": "t8fzth32.test"},
},
CertificateAuthorityType="ROOT",
)["CertificateAuthorityArn"]
csr = client.get_certificate_authority_csr(CertificateAuthorityArn=ca_arn)["Csr"]
resp = client.issue_certificate(
CertificateAuthorityArn=ca_arn,
Csr=csr,
SigningAlgorithm="SHA512WITHRSA",
Validity={"Type": "YEARS", "Value": 10},
)
resp.should.have.key("CertificateArn")
@mock_acmpca
def test_get_certificate():
client = boto3.client("acm-pca", region_name="us-east-2")
ca_arn = client.create_certificate_authority(
CertificateAuthorityConfiguration={
"KeyAlgorithm": "RSA_4096",
"SigningAlgorithm": "SHA512WITHRSA",
"Subject": {"CommonName": "t8fzth32.test"},
},
CertificateAuthorityType="ROOT",
)["CertificateAuthorityArn"]
csr = client.get_certificate_authority_csr(CertificateAuthorityArn=ca_arn)["Csr"]
certificate_arn = client.issue_certificate(
CertificateAuthorityArn=ca_arn,
Csr=csr,
SigningAlgorithm="SHA512WITHRSA",
Validity={"Type": "YEARS", "Value": 10},
)["CertificateArn"]
resp = client.get_certificate(
CertificateAuthorityArn=ca_arn, CertificateArn=certificate_arn
)
resp.should.have.key("Certificate")
@mock_acmpca
def test_import_certificate_authority_certificate():
client = boto3.client("acm-pca", region_name="eu-west-1")
ca_arn = client.create_certificate_authority(
CertificateAuthorityConfiguration={
"KeyAlgorithm": "RSA_4096",
"SigningAlgorithm": "SHA512WITHRSA",
"Subject": {"CommonName": "yscb41lw.test"},
},
CertificateAuthorityType="SUBORDINATE",
)["CertificateAuthorityArn"]
cert = create_cert()
client.import_certificate_authority_certificate(
CertificateAuthorityArn=ca_arn,
Certificate=cert,
)
ca = client.describe_certificate_authority(CertificateAuthorityArn=ca_arn)[
"CertificateAuthority"
]
ca.should.have.key("Status").equals("ACTIVE")
ca.should.have.key("NotBefore")
ca.should.have.key("NotAfter")
resp = client.get_certificate_authority_certificate(CertificateAuthorityArn=ca_arn)
resp.should.have.key("Certificate").match("^-----BEGIN CERTIFICATE-----")
@mock_acmpca
def test_tag_certificate_authority():
client = boto3.client("acm-pca", region_name="eu-west-1")
ca_arn = client.create_certificate_authority(
CertificateAuthorityConfiguration={
"KeyAlgorithm": "RSA_4096",
"SigningAlgorithm": "SHA512WITHRSA",
"Subject": {"CommonName": "yscb41lw.test"},
},
CertificateAuthorityType="SUBORDINATE",
)["CertificateAuthorityArn"]
client.tag_certificate_authority(
CertificateAuthorityArn=ca_arn,
Tags=[{"Key": "t1", "Value": "v1"}, {"Key": "t2", "Value": "v2"}],
)
resp = client.list_tags(CertificateAuthorityArn=ca_arn)
resp.should.have.key("Tags").equals(
[{"Key": "t1", "Value": "v1"}, {"Key": "t2", "Value": "v2"}]
)
@mock_acmpca
def test_untag_certificate_authority():
client = boto3.client("acm-pca", region_name="eu-west-1")
ca_arn = client.create_certificate_authority(
CertificateAuthorityConfiguration={
"KeyAlgorithm": "RSA_4096",
"SigningAlgorithm": "SHA512WITHRSA",
"Subject": {"CommonName": "yscb41lw.test"},
},
CertificateAuthorityType="SUBORDINATE",
)["CertificateAuthorityArn"]
client.tag_certificate_authority(
CertificateAuthorityArn=ca_arn,
Tags=[{"Key": "t1", "Value": "v1"}, {"Key": "t2", "Value": "v2"}],
)
client.untag_certificate_authority(
CertificateAuthorityArn=ca_arn, Tags=[{"Key": "t1", "Value": "v1"}]
)
resp = client.list_tags(CertificateAuthorityArn=ca_arn)
resp.should.have.key("Tags").equals([{"Key": "t2", "Value": "v2"}])
def create_cert():
serial_number = cryptography.x509.random_serial_number()
subject = cryptography.x509.Name(
[
cryptography.x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
cryptography.x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "CA"),
cryptography.x509.NameAttribute(NameOID.LOCALITY_NAME, "Test Francisco"),
cryptography.x509.NameAttribute(NameOID.ORGANIZATION_NAME, "TestCompany"),
cryptography.x509.NameAttribute(NameOID.COMMON_NAME, "testcert.io"),
]
)
issuer = cryptography.x509.Name(
[ # C = US, O = Amazon, OU = Server CA 1B, CN = Amazon
cryptography.x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
cryptography.x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Amazon"),
cryptography.x509.NameAttribute(
NameOID.ORGANIZATIONAL_UNIT_NAME, "Server CA 1B"
),
cryptography.x509.NameAttribute(NameOID.COMMON_NAME, "TestCert"),
]
)
key = cryptography.hazmat.primitives.asymmetric.rsa.generate_private_key(
public_exponent=65537, key_size=2048
)
cert = (
cryptography.x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(serial_number)
.not_valid_before(datetime.datetime.utcnow() - datetime.timedelta(days=10))
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=10))
.sign(key, hashes.SHA512(), default_backend())
)
return cert.public_bytes(serialization.Encoding.PEM)