diff --git a/moto/__init__.py b/moto/__init__.py index 1eeb9aa77..3ae66ee46 100644 --- a/moto/__init__.py +++ b/moto/__init__.py @@ -16,6 +16,7 @@ def lazy_load(module_name, element, boto3_name=None, backend=None): mock_acm = lazy_load(".acm", "mock_acm") +mock_acmpca = lazy_load(".acmpca", "mock_acmpca", boto3_name="acm-pca") mock_amp = lazy_load(".amp", "mock_amp") mock_apigateway = lazy_load(".apigateway", "mock_apigateway") mock_apigatewayv2 = lazy_load(".apigatewayv2", "mock_apigatewayv2") @@ -78,6 +79,9 @@ mock_emr = lazy_load(".emr", "mock_emr") mock_emrcontainers = lazy_load( ".emrcontainers", "mock_emrcontainers", boto3_name="emr-containers" ) +mock_emrserverless = lazy_load( + ".emrserverless", "mock_emrserverless", boto3_name="emr-serverless" +) mock_es = lazy_load(".es", "mock_es") mock_events = lazy_load(".events", "mock_events") mock_firehose = lazy_load(".firehose", "mock_firehose") @@ -163,9 +167,6 @@ mock_xray = lazy_load(".xray", "mock_xray") mock_xray_client = lazy_load(".xray", "mock_xray_client") mock_wafv2 = lazy_load(".wafv2", "mock_wafv2") mock_textract = lazy_load(".textract", "mock_textract") -mock_emrserverless = lazy_load( - ".emrserverless", "mock_emrserverless", boto3_name="emr-serverless" -) class MockAll(ContextDecorator): diff --git a/moto/acmpca/__init__.py b/moto/acmpca/__init__.py new file mode 100644 index 000000000..8059ab38b --- /dev/null +++ b/moto/acmpca/__init__.py @@ -0,0 +1,5 @@ +"""acmpca module initialization; sets value for base decorator.""" +from .models import acmpca_backends +from ..core.models import base_decorator + +mock_acmpca = base_decorator(acmpca_backends) diff --git a/moto/acmpca/exceptions.py b/moto/acmpca/exceptions.py new file mode 100644 index 000000000..2fdcb935f --- /dev/null +++ b/moto/acmpca/exceptions.py @@ -0,0 +1,7 @@ +"""Exceptions raised by the acmpca service.""" +from moto.core.exceptions import JsonRESTError + + +class ResourceNotFoundException(JsonRESTError): + def __init__(self, arn: str): + super().__init__("ResourceNotFoundException", f"Resource {arn} not found") diff --git a/moto/acmpca/models.py b/moto/acmpca/models.py new file mode 100644 index 000000000..bf4394ff3 --- /dev/null +++ b/moto/acmpca/models.py @@ -0,0 +1,305 @@ +"""ACMPCABackend class with methods for supported APIs.""" +import base64 +from .exceptions import ResourceNotFoundException +from moto.core import BaseBackend, BackendDict, BaseModel +from moto.core.utils import unix_time +from moto.moto_api._internal import mock_random +from moto.utilities.tagging_service import TaggingService + +import datetime +import cryptography.x509 +from cryptography.x509 import NameOID, load_pem_x509_certificate, Certificate +import cryptography.hazmat.primitives.asymmetric.rsa +from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.backends import default_backend +from typing import Any, Dict, List, Optional, Tuple + + +class CertificateAuthority(BaseModel): + def __init__( + self, + region: str, + account_id: str, + certificate_authority_configuration: Dict[str, Any], + certificate_authority_type: str, + revocation_configuration: Dict[str, Any], + ): + self.id = mock_random.uuid4() + self.arn = ( + f"arn:aws:acm-pca:{region}:{account_id}:certificate-authority/{self.id}" + ) + self.account_id = account_id + self.region_name = region + self.certificate_authority_configuration = certificate_authority_configuration + self.certificate_authority_type = certificate_authority_type + self.revocation_configuration: Dict[str, Any] = { + "CrlConfiguration": {"Enabled": False} + } + self.set_revocation_configuration(revocation_configuration) + self.created_at = unix_time() + self.updated_at: Optional[float] = None + self.status = "PENDING_CERTIFICATE" + + common_name = self.certificate_authority_configuration.get("Subject", {}).get( + "CommonName", "Moto.org" + ) + self.key = cryptography.hazmat.primitives.asymmetric.rsa.generate_private_key( + public_exponent=65537, key_size=2048 + ) + self.password = str(mock_random.uuid4()).encode("utf-8") + self.private_bytes = self.key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.BestAvailableEncryption(self.password), + ) + self.certificate: Optional[Certificate] = None + self.certificate_chain: Optional[bytes] = None + self.csr = self.generate_csr(common_name) + + self.issued_certificates: Dict[str, bytes] = dict() + + def generate_cert(self, common_name: str, subject: cryptography.x509.Name) -> bytes: + issuer = cryptography.x509.Name( + [ # C = US, O = Amazon, OU = Server CA 1B, CN = Amazon + cryptography.x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), + cryptography.x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Amazon"), + cryptography.x509.NameAttribute( + NameOID.ORGANIZATIONAL_UNIT_NAME, "Server CA 1B" + ), + cryptography.x509.NameAttribute(NameOID.COMMON_NAME, common_name), + ] + ) + cert = ( + cryptography.x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(self.key.public_key()) + .serial_number(cryptography.x509.random_serial_number()) + .not_valid_before(datetime.datetime.utcnow()) + .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=365)) + .sign(self.key, hashes.SHA512(), default_backend()) + ) + + return cert.public_bytes(serialization.Encoding.PEM) + + def generate_csr(self, common_name: str) -> bytes: + csr = ( + cryptography.x509.CertificateSigningRequestBuilder() + .subject_name( + cryptography.x509.Name( + [ + cryptography.x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), + cryptography.x509.NameAttribute( + NameOID.STATE_OR_PROVINCE_NAME, "California" + ), + cryptography.x509.NameAttribute( + NameOID.LOCALITY_NAME, "San Francisco" + ), + cryptography.x509.NameAttribute( + NameOID.ORGANIZATION_NAME, "My Company" + ), + cryptography.x509.NameAttribute( + NameOID.COMMON_NAME, common_name + ), + ] + ) + ) + .sign(self.key, hashes.SHA256()) + ) + return csr.public_bytes(serialization.Encoding.PEM) + + def issue_certificate(self, csr_bytes: bytes) -> str: + cert = cryptography.x509.load_pem_x509_csr(base64.b64decode(csr_bytes)) + new_cert = self.generate_cert(common_name="", subject=cert.subject) + cert_id = str(mock_random.uuid4()).replace("-", "") + cert_arn = f"arn:aws:acm-pca:{self.region_name}:{self.account_id}:certificate-authority/{self.id}/certificate/{cert_id}" + self.issued_certificates[cert_arn] = new_cert + return cert_arn + + def get_certificate(self, certificate_arn: str) -> bytes: + return self.issued_certificates[certificate_arn] + + def set_revocation_configuration( + self, revocation_configuration: Optional[Dict[str, Any]] + ) -> None: + if revocation_configuration is not None: + self.revocation_configuration = revocation_configuration + if "CrlConfiguration" in self.revocation_configuration: + if ( + "S3ObjectAcl" + not in self.revocation_configuration["CrlConfiguration"] + ): + self.revocation_configuration["CrlConfiguration"][ + "S3ObjectAcl" + ] = "PUBLIC_READ" + + @property + def certificate_bytes(self) -> bytes: + if self.certificate: + return self.certificate.public_bytes(serialization.Encoding.PEM) + return b"" + + @property + def not_valid_after(self) -> Optional[float]: + if self.certificate is None: + return None + return unix_time(self.certificate.not_valid_after) + + @property + def not_valid_before(self) -> Optional[float]: + if self.certificate is None: + return None + return unix_time(self.certificate.not_valid_before) + + def import_certificate_authority_certificate( + self, certificate: bytes, certificate_chain: Optional[bytes] + ) -> None: + self.certificate = load_pem_x509_certificate(certificate) + self.certificate_chain = certificate_chain + self.status = "ACTIVE" + self.updated_at = unix_time() + + def to_json(self) -> Dict[str, Any]: + dct = { + "Arn": self.arn, + "OwnerAccount": self.account_id, + "CertificateAuthorityConfiguration": self.certificate_authority_configuration, + "Type": self.certificate_authority_type, + "RevocationConfiguration": self.revocation_configuration, + "CreatedAt": self.created_at, + "Status": self.status, + } + if self.updated_at: + dct["LastStateChangeAt"] = self.updated_at + if self.certificate: + dct.update( + { + "NotBefore": self.not_valid_before, + "NotAfter": self.not_valid_after, + } + ) + return dct + + +class ACMPCABackend(BaseBackend): + """Implementation of ACMPCA APIs.""" + + def __init__(self, region_name: str, account_id: str): + super().__init__(region_name, account_id) + self.certificate_authorities: Dict[str, CertificateAuthority] = dict() + self.tagger = TaggingService() + + def create_certificate_authority( + self, + certificate_authority_configuration: Dict[str, Any], + revocation_configuration: Dict[str, Any], + certificate_authority_type: str, + tags: List[Dict[str, str]], + ) -> str: + """ + The following parameters are not yet implemented: IdempotencyToken, KeyStorageSecurityStandard, UsageMode + """ + authority = CertificateAuthority( + region=self.region_name, + account_id=self.account_id, + certificate_authority_configuration=certificate_authority_configuration, + certificate_authority_type=certificate_authority_type, + revocation_configuration=revocation_configuration, + ) + self.certificate_authorities[authority.arn] = authority + if tags: + self.tagger.tag_resource(authority.arn, tags) + return authority.arn + + def describe_certificate_authority( + self, certificate_authority_arn: str + ) -> CertificateAuthority: + if certificate_authority_arn not in self.certificate_authorities: + raise ResourceNotFoundException(certificate_authority_arn) + return self.certificate_authorities[certificate_authority_arn] + + def get_certificate_authority_certificate( + self, certificate_authority_arn: str + ) -> Tuple[bytes, Optional[bytes]]: + ca = self.describe_certificate_authority(certificate_authority_arn) + return ca.certificate_bytes, ca.certificate_chain + + def get_certificate_authority_csr(self, certificate_authority_arn: str) -> bytes: + ca = self.describe_certificate_authority(certificate_authority_arn) + return ca.csr + + def list_tags( + self, certificate_authority_arn: str + ) -> Dict[str, List[Dict[str, str]]]: + """ + Pagination is not yet implemented + """ + return self.tagger.list_tags_for_resource(certificate_authority_arn) + + def update_certificate_authority( + self, + certificate_authority_arn: str, + revocation_configuration: Dict[str, Any], + status: str, + ) -> None: + ca = self.describe_certificate_authority(certificate_authority_arn) + if status is not None: + ca.status = status + ca.set_revocation_configuration(revocation_configuration) + ca.updated_at = unix_time() + + def delete_certificate_authority(self, certificate_authority_arn: str) -> None: + ca = self.describe_certificate_authority(certificate_authority_arn) + ca.status = "DELETED" + + def issue_certificate(self, certificate_authority_arn: str, csr: bytes) -> str: + """ + The following parameters are not yet implemented: ApiPassthrough, SigningAlgorithm, TemplateArn, Validity, ValidityNotBefore, IdempotencyToken + Some fields of the resulting certificate will have default values, instead of using the CSR + """ + ca = self.describe_certificate_authority(certificate_authority_arn) + certificate_arn = ca.issue_certificate(csr) + return certificate_arn + + def get_certificate( + self, certificate_authority_arn: str, certificate_arn: str + ) -> Tuple[bytes, Optional[str]]: + """ + The CertificateChain will always return None for now + """ + ca = self.describe_certificate_authority(certificate_authority_arn) + certificate = ca.get_certificate(certificate_arn) + certificate_chain = None + return certificate, certificate_chain + + def import_certificate_authority_certificate( + self, + certificate_authority_arn: str, + certificate: bytes, + certificate_chain: Optional[bytes], + ) -> None: + ca = self.describe_certificate_authority(certificate_authority_arn) + ca.import_certificate_authority_certificate(certificate, certificate_chain) + + def revoke_certificate( + self, + certificate_authority_arn: str, + certificate_serial: str, + revocation_reason: str, + ) -> None: + """ + This is currently a NO-OP + """ + + def tag_certificate_authority( + self, certificate_authority_arn: str, tags: List[Dict[str, str]] + ) -> None: + self.tagger.tag_resource(certificate_authority_arn, tags) + + def untag_certificate_authority( + self, certificate_authority_arn: str, tags: List[Dict[str, str]] + ) -> None: + self.tagger.untag_resource_using_tags(certificate_authority_arn, tags) + + +acmpca_backends = BackendDict(ACMPCABackend, "acm-pca") diff --git a/moto/acmpca/responses.py b/moto/acmpca/responses.py new file mode 100644 index 000000000..1ea4be651 --- /dev/null +++ b/moto/acmpca/responses.py @@ -0,0 +1,164 @@ +"""Handles incoming acmpca requests, invokes methods, returns responses.""" +import base64 +import json + +from moto.core.responses import BaseResponse +from .models import acmpca_backends, ACMPCABackend + + +class ACMPCAResponse(BaseResponse): + """Handler for ACMPCA requests and responses.""" + + def __init__(self) -> None: + super().__init__(service_name="acmpca") + + @property + def acmpca_backend(self) -> ACMPCABackend: + """Return backend instance specific for this region.""" + return acmpca_backends[self.current_account][self.region] + + def create_certificate_authority(self) -> str: + params = json.loads(self.body) + certificate_authority_configuration = params.get( + "CertificateAuthorityConfiguration" + ) + revocation_configuration = params.get("RevocationConfiguration") + certificate_authority_type = params.get("CertificateAuthorityType") + tags = params.get("Tags") + certificate_authority_arn = self.acmpca_backend.create_certificate_authority( + certificate_authority_configuration=certificate_authority_configuration, + revocation_configuration=revocation_configuration, + certificate_authority_type=certificate_authority_type, + tags=tags, + ) + return json.dumps(dict(CertificateAuthorityArn=certificate_authority_arn)) + + def describe_certificate_authority(self) -> str: + params = json.loads(self.body) + certificate_authority_arn = params.get("CertificateAuthorityArn") + certificate_authority = self.acmpca_backend.describe_certificate_authority( + certificate_authority_arn=certificate_authority_arn, + ) + return json.dumps(dict(CertificateAuthority=certificate_authority.to_json())) + + def get_certificate_authority_certificate(self) -> str: + params = json.loads(self.body) + certificate_authority_arn = params.get("CertificateAuthorityArn") + ( + certificate, + certificate_chain, + ) = self.acmpca_backend.get_certificate_authority_certificate( + certificate_authority_arn=certificate_authority_arn, + ) + return json.dumps( + dict( + Certificate=certificate.decode("utf-8"), + CertificateChain=certificate_chain, + ) + ) + + def get_certificate_authority_csr(self) -> str: + params = json.loads(self.body) + certificate_authority_arn = params.get("CertificateAuthorityArn") + csr = self.acmpca_backend.get_certificate_authority_csr( + certificate_authority_arn=certificate_authority_arn, + ) + return json.dumps(dict(Csr=csr.decode("utf-8"))) + + def list_tags(self) -> str: + params = json.loads(self.body) + certificate_authority_arn = params.get("CertificateAuthorityArn") + tags = self.acmpca_backend.list_tags( + certificate_authority_arn=certificate_authority_arn + ) + return json.dumps(tags) + + def update_certificate_authority(self) -> str: + params = json.loads(self.body) + certificate_authority_arn = params.get("CertificateAuthorityArn") + revocation_configuration = params.get("RevocationConfiguration") + status = params.get("Status") + self.acmpca_backend.update_certificate_authority( + certificate_authority_arn=certificate_authority_arn, + revocation_configuration=revocation_configuration, + status=status, + ) + return "{}" + + def delete_certificate_authority(self) -> str: + params = json.loads(self.body) + certificate_authority_arn = params.get("CertificateAuthorityArn") + self.acmpca_backend.delete_certificate_authority( + certificate_authority_arn=certificate_authority_arn + ) + return "{}" + + def issue_certificate(self) -> str: + params = json.loads(self.body) + certificate_authority_arn = params.get("CertificateAuthorityArn") + csr = params.get("Csr").encode("utf-8") + certificate_arn = self.acmpca_backend.issue_certificate( + certificate_authority_arn=certificate_authority_arn, + csr=csr, + ) + return json.dumps(dict(CertificateArn=certificate_arn)) + + def get_certificate(self) -> str: + params = json.loads(self.body) + certificate_authority_arn = params.get("CertificateAuthorityArn") + certificate_arn = params.get("CertificateArn") + certificate, certificate_chain = self.acmpca_backend.get_certificate( + certificate_authority_arn=certificate_authority_arn, + certificate_arn=certificate_arn, + ) + return json.dumps( + dict( + Certificate=certificate.decode("utf-8"), + CertificateChain=certificate_chain, + ) + ) + + def import_certificate_authority_certificate(self) -> str: + params = json.loads(self.body) + certificate_authority_arn = params.get("CertificateAuthorityArn") + certificate = params.get("Certificate") + certificate_bytes = base64.b64decode(certificate) + certificate_chain = params.get("CertificateChain") + self.acmpca_backend.import_certificate_authority_certificate( + certificate_authority_arn=certificate_authority_arn, + certificate=certificate_bytes, + certificate_chain=certificate_chain, + ) + return "{}" + + def revoke_certificate(self) -> str: + params = json.loads(self.body) + certificate_authority_arn = params.get("CertificateAuthorityArn") + certificate_serial = params.get("CertificateSerial") + revocation_reason = params.get("RevocationReason") + self.acmpca_backend.revoke_certificate( + certificate_authority_arn=certificate_authority_arn, + certificate_serial=certificate_serial, + revocation_reason=revocation_reason, + ) + return "{}" + + def tag_certificate_authority(self) -> str: + params = json.loads(self.body) + certificate_authority_arn = params.get("CertificateAuthorityArn") + tags = params.get("Tags") + self.acmpca_backend.tag_certificate_authority( + certificate_authority_arn=certificate_authority_arn, + tags=tags, + ) + return "{}" + + def untag_certificate_authority(self) -> str: + params = json.loads(self.body) + certificate_authority_arn = params.get("CertificateAuthorityArn") + tags = params.get("Tags") + self.acmpca_backend.untag_certificate_authority( + certificate_authority_arn=certificate_authority_arn, + tags=tags, + ) + return "{}" diff --git a/moto/acmpca/urls.py b/moto/acmpca/urls.py new file mode 100644 index 000000000..e5f6da295 --- /dev/null +++ b/moto/acmpca/urls.py @@ -0,0 +1,11 @@ +"""acmpca base URL and path.""" +from .responses import ACMPCAResponse + +url_bases = [ + r"https?://acm-pca\.(.+)\.amazonaws\.com", +] + + +url_paths = { + "{0}/$": ACMPCAResponse.dispatch, +} diff --git a/moto/backend_index.py b/moto/backend_index.py index 9bc4d4765..1d19b6184 100644 --- a/moto/backend_index.py +++ b/moto/backend_index.py @@ -3,6 +3,7 @@ import re backend_url_patterns = [ ("acm", re.compile("https?://acm\\.(.+)\\.amazonaws\\.com")), + ("acm-pca", re.compile("https?://acm-pca\\.(.+)\\.amazonaws\\.com")), ("amp", re.compile("https?://aps\\.(.+)\\.amazonaws\\.com")), ("apigateway", re.compile("https?://apigateway\\.(.+)\\.amazonaws.com")), ( diff --git a/scripts/scaffold.py b/scripts/scaffold.py index 4189485d2..ce8df0c3b 100755 --- a/scripts/scaffold.py +++ b/scripts/scaffold.py @@ -149,12 +149,12 @@ def append_mock_to_init_py(service): with open(path, encoding="utf-8") as fhandle: lines = [_.replace("\n", "") for _ in fhandle.readlines()] - if any(_ for _ in lines if re.match(f"^mock_{service}.*lazy_load(.*)$", _)): + escaped_service = get_escaped_service(service) + if any(_ for _ in lines if _.startswith(f"^mock_{escaped_service} = lazy_load")): return filtered_lines = [_ for _ in lines if re.match("^mock_.*lazy_load(.*)$", _)] last_import_line_index = lines.index(filtered_lines[-1]) - escaped_service = get_escaped_service(service) new_line = ( f"mock_{escaped_service} = lazy_load(" f'".{escaped_service}", "mock_{escaped_service}", boto3_name="{service}")' diff --git a/scripts/template/lib/models.py.j2 b/scripts/template/lib/models.py.j2 index ba61a0e1d..1d339a2d0 100644 --- a/scripts/template/lib/models.py.j2 +++ b/scripts/template/lib/models.py.j2 @@ -1,7 +1,6 @@ """{{ service_class }}Backend class with methods for supported APIs.""" -from moto.core import BaseBackend, BaseModel -from moto.core.utils import BackendDict +from moto.core import BaseBackend, BackendDict, BaseModel class {{ service_class }}Backend(BaseBackend): diff --git a/tests/terraformtests/terraform-tests.success.txt b/tests/terraformtests/terraform-tests.success.txt index 6b7b0ae46..71de52c74 100644 --- a/tests/terraformtests/terraform-tests.success.txt +++ b/tests/terraformtests/terraform-tests.success.txt @@ -1,5 +1,10 @@ acm: - TestAccACMCertificateDataSource +acmpca: + - TestAccACMPCACertificateAuthority_ + - TestAccACMPCACertificateAuthorityDataSource + - TestAccACMPCACertificateAuthorityCertificate + - TestAccACMPCACertificateDataSource amp: - TestAccAMPWorkspace - TestAccAMPRuleGroupNamespace diff --git a/tests/test_acmpca/__init__.py b/tests/test_acmpca/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_acmpca/test_acmpca.py b/tests/test_acmpca/test_acmpca.py new file mode 100644 index 000000000..d7ad4f861 --- /dev/null +++ b/tests/test_acmpca/test_acmpca.py @@ -0,0 +1,360 @@ +"""Unit tests for acmpca-supported APIs.""" +import boto3 +import pytest +import sure # noqa # pylint: disable=unused-import +from botocore.exceptions import ClientError +from moto import mock_acmpca +from moto.core import DEFAULT_ACCOUNT_ID + +import datetime +import cryptography.x509 +from cryptography.x509 import NameOID +import cryptography.hazmat.primitives.asymmetric.rsa +from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.backends import default_backend + +# See our Development Tips on writing tests for hints on how to write good tests: +# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html + + +@mock_acmpca +def test_create_certificate_authority(): + client = boto3.client("acm-pca", region_name="eu-west-1") + resp = client.create_certificate_authority( + CertificateAuthorityConfiguration={ + "KeyAlgorithm": "RSA_4096", + "SigningAlgorithm": "SHA512WITHRSA", + "Subject": {"CommonName": "yscb41lw.test"}, + }, + CertificateAuthorityType="SUBORDINATE", + IdempotencyToken="terraform-20221125230308947400000001", + ) + + resp.should.have.key("CertificateAuthorityArn").match( + f"^arn:aws:acm-pca:eu-west-1:{DEFAULT_ACCOUNT_ID}:certificate-authority/" + ) + + +@mock_acmpca +def test_describe_certificate_authority(): + client = boto3.client("acm-pca", region_name="ap-southeast-1") + ca_arn = client.create_certificate_authority( + CertificateAuthorityConfiguration={ + "KeyAlgorithm": "RSA_4096", + "SigningAlgorithm": "SHA512WITHRSA", + "Subject": {"CommonName": "yscb41lw.test"}, + }, + CertificateAuthorityType="SUBORDINATE", + IdempotencyToken="terraform-20221125230308947400000001", + )["CertificateAuthorityArn"] + ca = client.describe_certificate_authority(CertificateAuthorityArn=ca_arn)[ + "CertificateAuthority" + ] + + ca.should.have.key("Arn").equals(ca_arn) + ca.should.have.key("OwnerAccount").equals(DEFAULT_ACCOUNT_ID) + ca.should.have.key("CreatedAt") + ca.should.have.key("Type").equals("SUBORDINATE") + ca.should.have.key("Status").equals("PENDING_CERTIFICATE") + ca.should.have.key("CertificateAuthorityConfiguration").equals( + { + "KeyAlgorithm": "RSA_4096", + "SigningAlgorithm": "SHA512WITHRSA", + "Subject": {"CommonName": "yscb41lw.test"}, + } + ) + + +@mock_acmpca +def test_describe_unknown_certificate_authority(): + client = boto3.client("acm-pca", region_name="ap-southeast-1") + + with pytest.raises(ClientError) as exc: + client.describe_certificate_authority(CertificateAuthorityArn="unknown") + err = exc.value.response["Error"] + err["Code"].should.equal("ResourceNotFoundException") + + +@mock_acmpca +def test_get_certificate_authority_certificate(): + client = boto3.client("acm-pca", region_name="ap-southeast-1") + ca_arn = client.create_certificate_authority( + CertificateAuthorityConfiguration={ + "KeyAlgorithm": "RSA_4096", + "SigningAlgorithm": "SHA512WITHRSA", + "Subject": {"CommonName": "yscb41lw.test"}, + }, + CertificateAuthorityType="SUBORDINATE", + IdempotencyToken="terraform-20221125230308947400000001", + )["CertificateAuthorityArn"] + + resp = client.get_certificate_authority_certificate(CertificateAuthorityArn=ca_arn) + + # Certificate is empty for now, until we call import_certificate_authority_certificate + resp.should.have.key("Certificate").equals("") + + +@mock_acmpca +def test_get_certificate_authority_csr(): + client = boto3.client("acm-pca", region_name="us-east-2") + ca_arn = client.create_certificate_authority( + CertificateAuthorityConfiguration={ + "KeyAlgorithm": "RSA_4096", + "SigningAlgorithm": "SHA512WITHRSA", + "Subject": {"CommonName": "yscb41lw.test"}, + }, + CertificateAuthorityType="SUBORDINATE", + IdempotencyToken="terraform-20221125230308947400000001", + )["CertificateAuthorityArn"] + + resp = client.get_certificate_authority_csr(CertificateAuthorityArn=ca_arn) + + resp.should.have.key("Csr") + + +@mock_acmpca +def test_list_tags_when_ca_has_no_tags(): + client = boto3.client("acm-pca", region_name="us-east-2") + ca_arn = client.create_certificate_authority( + CertificateAuthorityConfiguration={ + "KeyAlgorithm": "RSA_4096", + "SigningAlgorithm": "SHA512WITHRSA", + "Subject": {"CommonName": "yscb41lw.test"}, + }, + CertificateAuthorityType="SUBORDINATE", + IdempotencyToken="terraform-20221125230308947400000001", + )["CertificateAuthorityArn"] + + resp = client.list_tags(CertificateAuthorityArn=ca_arn) + resp.should.have.key("Tags").equals([]) + + +@mock_acmpca +def test_list_tags(): + client = boto3.client("acm-pca", region_name="us-east-2") + ca_arn = client.create_certificate_authority( + CertificateAuthorityConfiguration={ + "KeyAlgorithm": "RSA_4096", + "SigningAlgorithm": "SHA512WITHRSA", + "Subject": {"CommonName": "yscb41lw.test"}, + }, + CertificateAuthorityType="SUBORDINATE", + IdempotencyToken="terraform-20221125230308947400000001", + Tags=[{"Key": "t1", "Value": "v1"}, {"Key": "t2", "Value": "v2"}], + )["CertificateAuthorityArn"] + + resp = client.list_tags(CertificateAuthorityArn=ca_arn) + resp.should.have.key("Tags").equals( + [{"Key": "t1", "Value": "v1"}, {"Key": "t2", "Value": "v2"}] + ) + + +@mock_acmpca +def test_update_certificate_authority(): + client = boto3.client("acm-pca", region_name="eu-west-1") + ca_arn = client.create_certificate_authority( + CertificateAuthorityConfiguration={ + "KeyAlgorithm": "RSA_4096", + "SigningAlgorithm": "SHA512WITHRSA", + "Subject": {"CommonName": "yscb41lw.test"}, + }, + CertificateAuthorityType="SUBORDINATE", + )["CertificateAuthorityArn"] + + client.update_certificate_authority( + CertificateAuthorityArn=ca_arn, + Status="DISABLED", + ) + + ca = client.describe_certificate_authority(CertificateAuthorityArn=ca_arn)[ + "CertificateAuthority" + ] + ca.should.have.key("Status").equals("DISABLED") + ca.should.have.key("LastStateChangeAt") + + +@mock_acmpca +def test_delete_certificate_authority(): + client = boto3.client("acm-pca", region_name="ap-southeast-1") + ca_arn = client.create_certificate_authority( + CertificateAuthorityConfiguration={ + "KeyAlgorithm": "RSA_4096", + "SigningAlgorithm": "SHA512WITHRSA", + "Subject": {"CommonName": "yscb41lw.test"}, + }, + CertificateAuthorityType="SUBORDINATE", + )["CertificateAuthorityArn"] + + client.delete_certificate_authority(CertificateAuthorityArn=ca_arn) + + ca = client.describe_certificate_authority(CertificateAuthorityArn=ca_arn)[ + "CertificateAuthority" + ] + ca.should.have.key("Status").equals("DELETED") + + +@mock_acmpca +def test_issue_certificate(): + client = boto3.client("acm-pca", region_name="ap-southeast-1") + ca_arn = client.create_certificate_authority( + CertificateAuthorityConfiguration={ + "KeyAlgorithm": "RSA_4096", + "SigningAlgorithm": "SHA512WITHRSA", + "Subject": {"CommonName": "t8fzth32.test"}, + }, + CertificateAuthorityType="ROOT", + )["CertificateAuthorityArn"] + + csr = client.get_certificate_authority_csr(CertificateAuthorityArn=ca_arn)["Csr"] + + resp = client.issue_certificate( + CertificateAuthorityArn=ca_arn, + Csr=csr, + SigningAlgorithm="SHA512WITHRSA", + Validity={"Type": "YEARS", "Value": 10}, + ) + + resp.should.have.key("CertificateArn") + + +@mock_acmpca +def test_get_certificate(): + client = boto3.client("acm-pca", region_name="us-east-2") + ca_arn = client.create_certificate_authority( + CertificateAuthorityConfiguration={ + "KeyAlgorithm": "RSA_4096", + "SigningAlgorithm": "SHA512WITHRSA", + "Subject": {"CommonName": "t8fzth32.test"}, + }, + CertificateAuthorityType="ROOT", + )["CertificateAuthorityArn"] + + csr = client.get_certificate_authority_csr(CertificateAuthorityArn=ca_arn)["Csr"] + + certificate_arn = client.issue_certificate( + CertificateAuthorityArn=ca_arn, + Csr=csr, + SigningAlgorithm="SHA512WITHRSA", + Validity={"Type": "YEARS", "Value": 10}, + )["CertificateArn"] + + resp = client.get_certificate( + CertificateAuthorityArn=ca_arn, CertificateArn=certificate_arn + ) + resp.should.have.key("Certificate") + + +@mock_acmpca +def test_import_certificate_authority_certificate(): + client = boto3.client("acm-pca", region_name="eu-west-1") + ca_arn = client.create_certificate_authority( + CertificateAuthorityConfiguration={ + "KeyAlgorithm": "RSA_4096", + "SigningAlgorithm": "SHA512WITHRSA", + "Subject": {"CommonName": "yscb41lw.test"}, + }, + CertificateAuthorityType="SUBORDINATE", + )["CertificateAuthorityArn"] + + cert = create_cert() + + client.import_certificate_authority_certificate( + CertificateAuthorityArn=ca_arn, + Certificate=cert, + ) + + ca = client.describe_certificate_authority(CertificateAuthorityArn=ca_arn)[ + "CertificateAuthority" + ] + ca.should.have.key("Status").equals("ACTIVE") + ca.should.have.key("NotBefore") + ca.should.have.key("NotAfter") + + resp = client.get_certificate_authority_certificate(CertificateAuthorityArn=ca_arn) + resp.should.have.key("Certificate").match("^-----BEGIN CERTIFICATE-----") + + +@mock_acmpca +def test_tag_certificate_authority(): + client = boto3.client("acm-pca", region_name="eu-west-1") + ca_arn = client.create_certificate_authority( + CertificateAuthorityConfiguration={ + "KeyAlgorithm": "RSA_4096", + "SigningAlgorithm": "SHA512WITHRSA", + "Subject": {"CommonName": "yscb41lw.test"}, + }, + CertificateAuthorityType="SUBORDINATE", + )["CertificateAuthorityArn"] + + client.tag_certificate_authority( + CertificateAuthorityArn=ca_arn, + Tags=[{"Key": "t1", "Value": "v1"}, {"Key": "t2", "Value": "v2"}], + ) + + resp = client.list_tags(CertificateAuthorityArn=ca_arn) + resp.should.have.key("Tags").equals( + [{"Key": "t1", "Value": "v1"}, {"Key": "t2", "Value": "v2"}] + ) + + +@mock_acmpca +def test_untag_certificate_authority(): + client = boto3.client("acm-pca", region_name="eu-west-1") + ca_arn = client.create_certificate_authority( + CertificateAuthorityConfiguration={ + "KeyAlgorithm": "RSA_4096", + "SigningAlgorithm": "SHA512WITHRSA", + "Subject": {"CommonName": "yscb41lw.test"}, + }, + CertificateAuthorityType="SUBORDINATE", + )["CertificateAuthorityArn"] + + client.tag_certificate_authority( + CertificateAuthorityArn=ca_arn, + Tags=[{"Key": "t1", "Value": "v1"}, {"Key": "t2", "Value": "v2"}], + ) + + client.untag_certificate_authority( + CertificateAuthorityArn=ca_arn, Tags=[{"Key": "t1", "Value": "v1"}] + ) + + resp = client.list_tags(CertificateAuthorityArn=ca_arn) + resp.should.have.key("Tags").equals([{"Key": "t2", "Value": "v2"}]) + + +def create_cert(): + serial_number = cryptography.x509.random_serial_number() + subject = cryptography.x509.Name( + [ + cryptography.x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), + cryptography.x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "CA"), + cryptography.x509.NameAttribute(NameOID.LOCALITY_NAME, "Test Francisco"), + cryptography.x509.NameAttribute(NameOID.ORGANIZATION_NAME, "TestCompany"), + cryptography.x509.NameAttribute(NameOID.COMMON_NAME, "testcert.io"), + ] + ) + issuer = cryptography.x509.Name( + [ # C = US, O = Amazon, OU = Server CA 1B, CN = Amazon + cryptography.x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), + cryptography.x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Amazon"), + cryptography.x509.NameAttribute( + NameOID.ORGANIZATIONAL_UNIT_NAME, "Server CA 1B" + ), + cryptography.x509.NameAttribute(NameOID.COMMON_NAME, "TestCert"), + ] + ) + key = cryptography.hazmat.primitives.asymmetric.rsa.generate_private_key( + public_exponent=65537, key_size=2048 + ) + cert = ( + cryptography.x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(key.public_key()) + .serial_number(serial_number) + .not_valid_before(datetime.datetime.utcnow() - datetime.timedelta(days=10)) + .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=10)) + .sign(key, hashes.SHA512(), default_backend()) + ) + + return cert.public_bytes(serialization.Encoding.PEM)