2017-09-21 21:44:34 +01:00
from __future__ import unicode_literals
2017-09-22 00:20:26 +01:00
import re
import json
import datetime
2017-09-21 21:44:34 +01:00
from moto . core import BaseBackend , BaseModel
from moto . ec2 import ec2_backends
2017-09-22 00:20:26 +01:00
from . utils import make_arn_for_certificate
2017-09-21 21:44:34 +01:00
2017-09-22 00:20:26 +01:00
import cryptography . x509
2017-09-22 14:26:05 +01:00
import cryptography . hazmat . primitives . asymmetric . rsa
from cryptography . hazmat . primitives import serialization , hashes
2017-09-22 00:20:26 +01:00
from cryptography . hazmat . backends import default_backend
2017-09-21 21:44:34 +01:00
2017-09-22 00:20:26 +01:00
DEFAULT_ACCOUNT_ID = 123456789012
GOOGLE_ROOT_CA = b """ -----BEGIN CERTIFICATE-----
MIIEKDCCAxCgAwIBAgIQAQAhJYiw + lmnd + 8 Fe2Yn3zANBgkqhkiG9w0BAQsFADBC
MQswCQYDVQQGEwJVUzEWMBQGA1UEChMNR2VvVHJ1c3QgSW5jLjEbMBkGA1UEAxMS
R2VvVHJ1c3QgR2xvYmFsIENBMB4XDTE3MDUyMjExMzIzN1oXDTE4MTIzMTIzNTk1
OVowSTELMAkGA1UEBhMCVVMxEzARBgNVBAoTCkdvb2dsZSBJbmMxJTAjBgNVBAMT
HEdvb2dsZSBJbnRlcm5ldCBBdXRob3JpdHkgRzIwggEiMA0GCSqGSIb3DQEBAQUA
A4IBDwAwggEKAoIBAQCcKgR3XNhQkToGo4Lg2FBIvIk / 8 RlwGohGfuCPxfGJziHu
Wv5hDbcyRImgdAtTT1WkzoJile7rWV / G4QWAEsRelD + 8 W0g49FP3JOb7kekVxM / 0
Uw30SvyfVN59vqBrb4fA0FAfKDADQNoIc1Fsf / 86 PKc3Bo69SxEE630k3ub5 / DFx
+ 5 TVYPMuSq9C0svqxGoassxT3RVLix / IGWEfzZ2oPmMrhDVpZYTIGcVGIvhTlb7j
gEoQxirsupcgEcc5mRAEoPBhepUljE5SdeK27QjKFPzOImqzTs9GA5eXA37Asd57
r0Uzz7o + cbfe9CUlwg01iZ2d + w4ReYkeN8WvjnJpAgMBAAGjggERMIIBDTAfBgNV
HSMEGDAWgBTAephojYn7qwVkDBF9qn1luMrMTjAdBgNVHQ4EFgQUSt0GFhu89mi1
dvWBtrtiGrpagS8wDgYDVR0PAQH / BAQDAgEGMC4GCCsGAQUFBwEBBCIwIDAeBggr
BgEFBQcwAYYSaHR0cDovL2cuc3ltY2QuY29tMBIGA1UdEwEB / wQIMAYBAf8CAQAw
NQYDVR0fBC4wLDAqoCigJoYkaHR0cDovL2cuc3ltY2IuY29tL2NybHMvZ3RnbG9i
YWwuY3JsMCEGA1UdIAQaMBgwDAYKKwYBBAHWeQIFATAIBgZngQwBAgIwHQYDVR0l
BBYwFAYIKwYBBQUHAwEGCCsGAQUFBwMCMA0GCSqGSIb3DQEBCwUAA4IBAQDKSeWs
12 Rkd1u + cfrP9B4jx5ppY1Rf60zWGSgjZGaOHMeHgGRfBIsmr5jfCnC8vBk97nsz
qX + 99 AXUcLsFJnnqmseYuQcZZTTMPOk / xQH6bwx + 23 pwXEz + LQDwyr4tjrSogPsB
E4jLnD / lu3fKOmc2887VJwJyQ6C9bgLxRwVxPgFZ6RGeGvOED4Cmong1L7bHon8X
fOGLVq7uZ4hRJzBgpWJSwzfVO + qFKgE4h6LPcK2kesnE58rF2rwjMvL + GMJ74N87
L9TQEOaWTPtEtyFkDbkAlDASJodYmDkFOA / MgkgMCkdm7r + 0X8 T / cKjhf4t5K7hl
MqO5tzHpCvX2HzLc
- - - - - END CERTIFICATE - - - - - """
# Added google root CA as AWS returns chain you gave it + root CA (provided or not)
# so for now a cheap response is just give any old root CA
2017-09-22 11:21:36 +01:00
def datetime_to_epoch ( date ) :
# As only Py3 has datetime.timestamp()
return int ( ( date - datetime . datetime ( 1970 , 1 , 1 ) ) . total_seconds ( ) )
2017-09-22 00:20:26 +01:00
class AWSError ( Exception ) :
TYPE = None
STATUS = 400
def __init__ ( self , message ) :
self . message = message
def response ( self ) :
resp = { ' __type ' : self . TYPE , ' message ' : self . message }
return json . dumps ( resp ) , dict ( status = self . STATUS )
class AWSValidationException ( AWSError ) :
TYPE = ' ValidationException '
class AWSResourceNotFoundException ( AWSError ) :
TYPE = ' ResourceNotFoundException '
class CertBundle ( BaseModel ) :
2017-09-22 14:26:05 +01:00
def __init__ ( self , certificate , private_key , chain = None , region = ' us-east-1 ' , arn = None , cert_type = ' IMPORTED ' , cert_status = ' ISSUED ' ) :
2017-09-22 11:21:36 +01:00
self . created_at = datetime . datetime . now ( )
2017-09-22 00:20:26 +01:00
self . cert = certificate
self . _cert = None
self . common_name = None
self . key = private_key
self . _key = None
self . chain = chain
self . tags = { }
self . _chain = None
2017-09-22 11:21:36 +01:00
self . type = cert_type # Should really be an enum
2017-09-22 14:26:05 +01:00
self . status = cert_status # Should really be an enum
2017-09-22 00:20:26 +01:00
# AWS always returns your chain + root CA
if self . chain is None :
self . chain = GOOGLE_ROOT_CA
else :
self . chain + = b ' \n ' + GOOGLE_ROOT_CA
# Takes care of PEM checking
self . validate_pk ( )
self . validate_certificate ( )
if chain is not None :
self . validate_chain ( )
# TODO check cert is valid, or if self-signed then a chain is provided, otherwise
# raise AWSValidationException('Provided certificate is not a valid self signed. Please provide either a valid self-signed certificate or certificate chain.')
# Used for when one wants to overwrite an arn
if arn is None :
self . arn = make_arn_for_certificate ( DEFAULT_ACCOUNT_ID , region )
else :
self . arn = arn
2017-09-22 14:26:05 +01:00
@classmethod
def generate_cert ( cls , domain_name , sans = None ) :
if sans is None :
sans = set ( )
else :
sans = set ( sans )
sans . add ( domain_name )
sans = [ cryptography . x509 . DNSName ( item ) for item in sans ]
key = cryptography . hazmat . primitives . asymmetric . rsa . generate_private_key ( public_exponent = 65537 , key_size = 2048 , backend = default_backend ( ) )
subject = cryptography . x509 . Name ( [
cryptography . x509 . NameAttribute ( cryptography . x509 . NameOID . COUNTRY_NAME , u " US " ) ,
cryptography . x509 . NameAttribute ( cryptography . x509 . NameOID . STATE_OR_PROVINCE_NAME , u " CA " ) ,
cryptography . x509 . NameAttribute ( cryptography . x509 . NameOID . LOCALITY_NAME , u " San Francisco " ) ,
cryptography . x509 . NameAttribute ( cryptography . x509 . NameOID . ORGANIZATION_NAME , u " My Company " ) ,
cryptography . x509 . NameAttribute ( cryptography . x509 . NameOID . COMMON_NAME , domain_name ) ,
] )
issuer = cryptography . x509 . Name ( [ # C = US, O = Amazon, OU = Server CA 1B, CN = Amazon
cryptography . x509 . NameAttribute ( cryptography . x509 . NameOID . COUNTRY_NAME , u " US " ) ,
cryptography . x509 . NameAttribute ( cryptography . x509 . NameOID . ORGANIZATION_NAME , u " Amazon " ) ,
cryptography . x509 . NameAttribute ( cryptography . x509 . NameOID . ORGANIZATIONAL_UNIT_NAME , u " Server CA 1B " ) ,
cryptography . x509 . NameAttribute ( cryptography . x509 . NameOID . COMMON_NAME , u " Amazon " ) ,
] )
cert = cryptography . x509 . CertificateBuilder ( ) . subject_name (
subject
) . issuer_name (
issuer
) . public_key (
key . public_key ( )
) . serial_number (
cryptography . x509 . random_serial_number ( )
) . not_valid_before (
datetime . datetime . utcnow ( )
) . not_valid_after (
datetime . datetime . utcnow ( ) + datetime . timedelta ( days = 365 )
) . add_extension (
cryptography . x509 . SubjectAlternativeName ( sans ) ,
critical = False ,
) . sign ( key , hashes . SHA512 ( ) , default_backend ( ) )
cert_armored = cert . public_bytes ( serialization . Encoding . PEM )
private_key = key . private_bytes (
encoding = serialization . Encoding . PEM ,
format = serialization . PrivateFormat . TraditionalOpenSSL ,
encryption_algorithm = serialization . NoEncryption ( )
)
return cls ( cert_armored , private_key , cert_type = ' AMAZON_ISSUED ' , cert_status = ' PENDING_VALIDATION ' )
2017-09-22 00:20:26 +01:00
def validate_pk ( self ) :
try :
self . _key = serialization . load_pem_private_key ( self . key , password = None , backend = default_backend ( ) )
if self . _key . key_size > 2048 :
AWSValidationException ( ' The private key length is not supported. Only 1024-bit and 2048-bit are allowed. ' )
except Exception as err :
if isinstance ( err , AWSValidationException ) :
raise
raise AWSValidationException ( ' The private key is not PEM-encoded or is not valid. ' )
def validate_certificate ( self ) :
try :
self . _cert = cryptography . x509 . load_pem_x509_certificate ( self . cert , default_backend ( ) )
2017-10-18 17:30:00 -07:00
now = datetime . datetime . utcnow ( )
2017-09-22 00:20:26 +01:00
if self . _cert . not_valid_after < now :
raise AWSValidationException ( ' The certificate has expired, is not valid. ' )
if self . _cert . not_valid_before > now :
raise AWSValidationException ( ' The certificate is not in effect yet, is not valid. ' )
# Extracting some common fields for ease of use
# Have to search through cert.subject for OIDs
self . common_name = self . _cert . subject . get_attributes_for_oid ( cryptography . x509 . OID_COMMON_NAME ) [ 0 ] . value
except Exception as err :
if isinstance ( err , AWSValidationException ) :
raise
raise AWSValidationException ( ' The certificate is not PEM-encoded or is not valid. ' )
def validate_chain ( self ) :
try :
self . _chain = [ ]
for cert_armored in self . chain . split ( b ' - \n - ' ) :
2017-09-22 11:21:36 +01:00
# Would leave encoded but Py2 does not have raw binary strings
cert_armored = cert_armored . decode ( )
2017-09-22 00:20:26 +01:00
# Fix missing -'s on split
2017-09-22 11:21:36 +01:00
cert_armored = re . sub ( r ' ^----B ' , ' -----B ' , cert_armored )
cert_armored = re . sub ( r ' E----$ ' , ' E----- ' , cert_armored )
cert = cryptography . x509 . load_pem_x509_certificate ( cert_armored . encode ( ) , default_backend ( ) )
2017-09-22 00:20:26 +01:00
self . _chain . append ( cert )
2017-09-21 21:44:34 +01:00
2017-09-22 00:20:26 +01:00
now = datetime . datetime . now ( )
if self . _cert . not_valid_after < now :
raise AWSValidationException ( ' The certificate chain has expired, is not valid. ' )
if self . _cert . not_valid_before > now :
raise AWSValidationException ( ' The certificate chain is not in effect yet, is not valid. ' )
except Exception as err :
if isinstance ( err , AWSValidationException ) :
raise
raise AWSValidationException ( ' The certificate is not PEM-encoded or is not valid. ' )
2017-09-22 14:26:05 +01:00
def check ( self ) :
# Basically, if the certificate is pending, and then checked again after 1 min
# It will appear as if its been validated
if self . type == ' AMAZON_ISSUED ' and self . status == ' PENDING_VALIDATION ' and \
( datetime . datetime . now ( ) - self . created_at ) . total_seconds ( ) > 60 : # 1min
self . status = ' ISSUED '
2017-09-22 11:21:36 +01:00
2017-09-22 14:26:05 +01:00
def describe ( self ) :
# 'RenewalSummary': {}, # Only when cert is amazon issued
2017-09-22 11:21:36 +01:00
if self . _key . key_size == 1024 :
key_algo = ' RSA_1024 '
elif self . _key . key_size == 2048 :
key_algo = ' RSA_2048 '
else :
key_algo = ' EC_prime256v1 '
2017-09-22 14:26:05 +01:00
# Look for SANs
san_obj = self . _cert . extensions . get_extension_for_oid ( cryptography . x509 . OID_SUBJECT_ALTERNATIVE_NAME )
sans = [ ]
if san_obj is not None :
sans = [ item . value for item in san_obj . value ]
2017-09-22 11:21:36 +01:00
result = {
' Certificate ' : {
' CertificateArn ' : self . arn ,
' DomainName ' : self . common_name ,
' InUseBy ' : [ ] ,
' Issuer ' : self . _cert . issuer . get_attributes_for_oid ( cryptography . x509 . OID_COMMON_NAME ) [ 0 ] . value ,
' KeyAlgorithm ' : key_algo ,
' NotAfter ' : datetime_to_epoch ( self . _cert . not_valid_after ) ,
' NotBefore ' : datetime_to_epoch ( self . _cert . not_valid_before ) ,
' Serial ' : self . _cert . serial ,
' SignatureAlgorithm ' : self . _cert . signature_algorithm_oid . _name . upper ( ) . replace ( ' ENCRYPTION ' , ' ' ) ,
2017-09-22 14:26:05 +01:00
' Status ' : self . status , # One of PENDING_VALIDATION, ISSUED, INACTIVE, EXPIRED, VALIDATION_TIMED_OUT, REVOKED, FAILED.
2017-09-22 11:21:36 +01:00
' Subject ' : ' CN= {0} ' . format ( self . common_name ) ,
2017-09-22 14:26:05 +01:00
' SubjectAlternativeNames ' : sans ,
2017-09-22 11:21:36 +01:00
' Type ' : self . type # One of IMPORTED, AMAZON_ISSUED
}
}
if self . type == ' IMPORTED ' :
result [ ' Certificate ' ] [ ' ImportedAt ' ] = datetime_to_epoch ( self . created_at )
else :
result [ ' Certificate ' ] [ ' CreatedAt ' ] = datetime_to_epoch ( self . created_at )
result [ ' Certificate ' ] [ ' IssuedAt ' ] = datetime_to_epoch ( self . created_at )
return result
2017-09-22 00:20:26 +01:00
def __str__ ( self ) :
return self . arn
def __repr__ ( self ) :
return ' <Certificate> '
class AWSCertificateManagerBackend ( BaseBackend ) :
def __init__ ( self , region ) :
super ( AWSCertificateManagerBackend , self ) . __init__ ( )
self . region = region
2017-09-21 21:44:34 +01:00
self . _certificates = { }
2017-09-22 14:26:05 +01:00
self . _idempotency_tokens = { }
2017-09-21 21:44:34 +01:00
2017-09-22 00:20:26 +01:00
def reset ( self ) :
region = self . region
self . __dict__ = { }
self . __init__ ( region )
2017-09-22 14:26:05 +01:00
@staticmethod
def _arn_not_found ( arn ) :
2017-09-22 00:20:26 +01:00
msg = ' Certificate with arn {0} not found in account {1} ' . format ( arn , DEFAULT_ACCOUNT_ID )
return AWSResourceNotFoundException ( msg )
2017-09-22 14:26:05 +01:00
def _get_arn_from_idempotency_token ( self , token ) :
"""
If token doesnt exist , return None , later it will be
set with an expiry and arn .
If token expiry has passed , delete entry and return None
Else return ARN
: param token : String token
: return : None or ARN
"""
now = datetime . datetime . now ( )
if token in self . _idempotency_tokens :
if self . _idempotency_tokens [ token ] [ ' expires ' ] < now :
# Token has expired, new request
del self . _idempotency_tokens [ token ]
return None
else :
return self . _idempotency_tokens [ token ] [ ' arn ' ]
return None
def _set_idempotency_token_arn ( self , token , arn ) :
self . _idempotency_tokens [ token ] = { ' arn ' : arn , ' expires ' : datetime . datetime . now ( ) + datetime . timedelta ( hours = 1 ) }
2017-09-22 00:20:26 +01:00
def import_cert ( self , certificate , private_key , chain = None , arn = None ) :
if arn is not None :
if arn not in self . _certificates :
raise self . _arn_not_found ( arn )
else :
# Will reuse provided ARN
bundle = CertBundle ( certificate , private_key , chain = chain , region = region , arn = arn )
else :
# Will generate a random ARN
bundle = CertBundle ( certificate , private_key , chain = chain , region = region )
self . _certificates [ bundle . arn ] = bundle
return bundle . arn
def get_certificates_list ( self ) :
"""
Get list of certificates
: return : List of certificates
: rtype : list of CertBundle
"""
2017-09-22 14:26:05 +01:00
for arn in self . _certificates . keys ( ) :
yield self . get_certificate ( arn )
2017-09-22 00:20:26 +01:00
def get_certificate ( self , arn ) :
if arn not in self . _certificates :
raise self . _arn_not_found ( arn )
2017-09-22 14:26:05 +01:00
cert_bundle = self . _certificates [ arn ]
cert_bundle . check ( )
return cert_bundle
2017-09-22 00:20:26 +01:00
def delete_certificate ( self , arn ) :
if arn not in self . _certificates :
raise self . _arn_not_found ( arn )
del self . _certificates [ arn ]
2017-09-22 14:26:05 +01:00
def request_certificate ( self , domain_name , domain_validation_options , idempotency_token , subject_alt_names ) :
if idempotency_token is not None :
arn = self . _get_arn_from_idempotency_token ( idempotency_token )
if arn is not None :
return arn
cert = CertBundle . generate_cert ( domain_name , subject_alt_names )
if idempotency_token is not None :
self . _set_idempotency_token_arn ( idempotency_token , cert . arn )
self . _certificates [ cert . arn ] = cert
return cert . arn
2017-09-22 00:20:26 +01:00
def add_tags_to_certificate ( self , arn , tags ) :
# get_cert does arn check
cert_bundle = self . get_certificate ( arn )
for tag in tags :
key = tag [ ' Key ' ]
value = tag . get ( ' Value ' , None )
cert_bundle . tags [ key ] = value
def remove_tags_from_certificate ( self , arn , tags ) :
# get_cert does arn check
cert_bundle = self . get_certificate ( arn )
for tag in tags :
key = tag [ ' Key ' ]
value = tag . get ( ' Value ' , None )
try :
# If value isnt provided, just delete key
if value is None :
del cert_bundle . tags [ key ]
# If value is provided, only delete if it matches what already exists
elif cert_bundle . tags [ key ] == value :
del cert_bundle . tags [ key ]
except KeyError :
pass
2017-09-21 21:44:34 +01:00
acm_backends = { }
for region , ec2_backend in ec2_backends . items ( ) :
2017-09-22 00:20:26 +01:00
acm_backends [ region ] = AWSCertificateManagerBackend ( region )