moto/moto/acmpca/responses.py

169 lines
6.7 KiB
Python

"""Handles incoming acmpca requests, invokes methods, returns responses."""
import base64
import json
from moto.core.responses import BaseResponse
from .models import ACMPCABackend, acmpca_backends
class ACMPCAResponse(BaseResponse):
"""Handler for ACMPCA requests and responses."""
def __init__(self) -> None:
super().__init__(service_name="acmpca")
@property
def acmpca_backend(self) -> ACMPCABackend:
"""Return backend instance specific for this region."""
return acmpca_backends[self.current_account][self.region]
def create_certificate_authority(self) -> str:
params = json.loads(self.body)
certificate_authority_configuration = params.get(
"CertificateAuthorityConfiguration"
)
revocation_configuration = params.get("RevocationConfiguration")
certificate_authority_type = params.get("CertificateAuthorityType")
security_standard = params.get("KeyStorageSecurityStandard")
tags = params.get("Tags")
certificate_authority_arn = self.acmpca_backend.create_certificate_authority(
certificate_authority_configuration=certificate_authority_configuration,
revocation_configuration=revocation_configuration,
certificate_authority_type=certificate_authority_type,
security_standard=security_standard,
tags=tags,
)
return json.dumps(dict(CertificateAuthorityArn=certificate_authority_arn))
def describe_certificate_authority(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
certificate_authority = self.acmpca_backend.describe_certificate_authority(
certificate_authority_arn=certificate_authority_arn,
)
return json.dumps(dict(CertificateAuthority=certificate_authority.to_json()))
def get_certificate_authority_certificate(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
(
certificate,
certificate_chain,
) = self.acmpca_backend.get_certificate_authority_certificate(
certificate_authority_arn=certificate_authority_arn,
)
return json.dumps(
dict(
Certificate=certificate.decode("utf-8"),
CertificateChain=certificate_chain,
)
)
def get_certificate_authority_csr(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
csr = self.acmpca_backend.get_certificate_authority_csr(
certificate_authority_arn=certificate_authority_arn,
)
return json.dumps(dict(Csr=csr.decode("utf-8")))
def list_tags(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
tags = self.acmpca_backend.list_tags(
certificate_authority_arn=certificate_authority_arn
)
return json.dumps(tags)
def update_certificate_authority(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
revocation_configuration = params.get("RevocationConfiguration")
status = params.get("Status")
self.acmpca_backend.update_certificate_authority(
certificate_authority_arn=certificate_authority_arn,
revocation_configuration=revocation_configuration,
status=status,
)
return "{}"
def delete_certificate_authority(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
self.acmpca_backend.delete_certificate_authority(
certificate_authority_arn=certificate_authority_arn
)
return "{}"
def issue_certificate(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
csr = params.get("Csr").encode("utf-8")
certificate_arn = self.acmpca_backend.issue_certificate(
certificate_authority_arn=certificate_authority_arn,
csr=csr,
)
return json.dumps(dict(CertificateArn=certificate_arn))
def get_certificate(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
certificate_arn = params.get("CertificateArn")
certificate, certificate_chain = self.acmpca_backend.get_certificate(
certificate_authority_arn=certificate_authority_arn,
certificate_arn=certificate_arn,
)
return json.dumps(
dict(
Certificate=certificate.decode("utf-8"),
CertificateChain=certificate_chain,
)
)
def import_certificate_authority_certificate(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
certificate = params.get("Certificate")
certificate_bytes = base64.b64decode(certificate)
certificate_chain = params.get("CertificateChain")
self.acmpca_backend.import_certificate_authority_certificate(
certificate_authority_arn=certificate_authority_arn,
certificate=certificate_bytes,
certificate_chain=certificate_chain,
)
return "{}"
def revoke_certificate(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
certificate_serial = params.get("CertificateSerial")
revocation_reason = params.get("RevocationReason")
self.acmpca_backend.revoke_certificate(
certificate_authority_arn=certificate_authority_arn,
certificate_serial=certificate_serial,
revocation_reason=revocation_reason,
)
return "{}"
def tag_certificate_authority(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
tags = params.get("Tags")
self.acmpca_backend.tag_certificate_authority(
certificate_authority_arn=certificate_authority_arn,
tags=tags,
)
return "{}"
def untag_certificate_authority(self) -> str:
params = json.loads(self.body)
certificate_authority_arn = params.get("CertificateAuthorityArn")
tags = params.get("Tags")
self.acmpca_backend.untag_certificate_authority(
certificate_authority_arn=certificate_authority_arn,
tags=tags,
)
return "{}"