Finished ACM + tests
This commit is contained in:
		
							parent
							
								
									edbbbf6d20
								
							
						
					
					
						commit
						e3034275db
					
				| @ -9,7 +9,8 @@ from moto.ec2 import ec2_backends | |||||||
| from .utils import make_arn_for_certificate | from .utils import make_arn_for_certificate | ||||||
| 
 | 
 | ||||||
| import cryptography.x509 | import cryptography.x509 | ||||||
| from cryptography.hazmat.primitives import serialization | import cryptography.hazmat.primitives.asymmetric.rsa | ||||||
|  | from cryptography.hazmat.primitives import serialization, hashes | ||||||
| from cryptography.hazmat.backends import default_backend | from cryptography.hazmat.backends import default_backend | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @ -69,7 +70,7 @@ class AWSResourceNotFoundException(AWSError): | |||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class CertBundle(BaseModel): | class CertBundle(BaseModel): | ||||||
|     def __init__(self, certificate, private_key, chain=None, region='us-east-1', arn=None, cert_type='IMPORTED'): |     def __init__(self, certificate, private_key, chain=None, region='us-east-1', arn=None, cert_type='IMPORTED', cert_status='ISSUED'): | ||||||
|         self.created_at = datetime.datetime.now() |         self.created_at = datetime.datetime.now() | ||||||
|         self.cert = certificate |         self.cert = certificate | ||||||
|         self._cert = None |         self._cert = None | ||||||
| @ -80,6 +81,7 @@ class CertBundle(BaseModel): | |||||||
|         self.tags = {} |         self.tags = {} | ||||||
|         self._chain = None |         self._chain = None | ||||||
|         self.type = cert_type  # Should really be an enum |         self.type = cert_type  # Should really be an enum | ||||||
|  |         self.status = cert_status  # Should really be an enum | ||||||
| 
 | 
 | ||||||
|         # AWS always returns your chain + root CA |         # AWS always returns your chain + root CA | ||||||
|         if self.chain is None: |         if self.chain is None: | ||||||
| @ -102,6 +104,56 @@ class CertBundle(BaseModel): | |||||||
|         else: |         else: | ||||||
|             self.arn = arn |             self.arn = arn | ||||||
| 
 | 
 | ||||||
|  |     @classmethod | ||||||
|  |     def generate_cert(cls, domain_name, sans=None): | ||||||
|  |         if sans is None: | ||||||
|  |             sans = set() | ||||||
|  |         else: | ||||||
|  |             sans = set(sans) | ||||||
|  | 
 | ||||||
|  |         sans.add(domain_name) | ||||||
|  |         sans = [cryptography.x509.DNSName(item) for item in sans] | ||||||
|  | 
 | ||||||
|  |         key = cryptography.hazmat.primitives.asymmetric.rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) | ||||||
|  |         subject = cryptography.x509.Name([ | ||||||
|  |             cryptography.x509.NameAttribute(cryptography.x509.NameOID.COUNTRY_NAME, u"US"), | ||||||
|  |             cryptography.x509.NameAttribute(cryptography.x509.NameOID.STATE_OR_PROVINCE_NAME, u"CA"), | ||||||
|  |             cryptography.x509.NameAttribute(cryptography.x509.NameOID.LOCALITY_NAME, u"San Francisco"), | ||||||
|  |             cryptography.x509.NameAttribute(cryptography.x509.NameOID.ORGANIZATION_NAME, u"My Company"), | ||||||
|  |             cryptography.x509.NameAttribute(cryptography.x509.NameOID.COMMON_NAME, domain_name), | ||||||
|  |         ]) | ||||||
|  |         issuer = cryptography.x509.Name([  # C = US, O = Amazon, OU = Server CA 1B, CN = Amazon | ||||||
|  |             cryptography.x509.NameAttribute(cryptography.x509.NameOID.COUNTRY_NAME, u"US"), | ||||||
|  |             cryptography.x509.NameAttribute(cryptography.x509.NameOID.ORGANIZATION_NAME, u"Amazon"), | ||||||
|  |             cryptography.x509.NameAttribute(cryptography.x509.NameOID.ORGANIZATIONAL_UNIT_NAME, u"Server CA 1B"), | ||||||
|  |             cryptography.x509.NameAttribute(cryptography.x509.NameOID.COMMON_NAME, u"Amazon"), | ||||||
|  |         ]) | ||||||
|  |         cert = cryptography.x509.CertificateBuilder().subject_name( | ||||||
|  |             subject | ||||||
|  |         ).issuer_name( | ||||||
|  |             issuer | ||||||
|  |         ).public_key( | ||||||
|  |             key.public_key() | ||||||
|  |         ).serial_number( | ||||||
|  |             cryptography.x509.random_serial_number() | ||||||
|  |         ).not_valid_before( | ||||||
|  |             datetime.datetime.utcnow() | ||||||
|  |         ).not_valid_after( | ||||||
|  |             datetime.datetime.utcnow() + datetime.timedelta(days=365) | ||||||
|  |         ).add_extension( | ||||||
|  |             cryptography.x509.SubjectAlternativeName(sans), | ||||||
|  |             critical=False, | ||||||
|  |         ).sign(key, hashes.SHA512(), default_backend()) | ||||||
|  | 
 | ||||||
|  |         cert_armored = cert.public_bytes(serialization.Encoding.PEM) | ||||||
|  |         private_key = key.private_bytes( | ||||||
|  |             encoding=serialization.Encoding.PEM, | ||||||
|  |             format=serialization.PrivateFormat.TraditionalOpenSSL, | ||||||
|  |             encryption_algorithm=serialization.NoEncryption() | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         return cls(cert_armored, private_key, cert_type='AMAZON_ISSUED', cert_status='PENDING_VALIDATION') | ||||||
|  | 
 | ||||||
|     def validate_pk(self): |     def validate_pk(self): | ||||||
|         try: |         try: | ||||||
|             self._key = serialization.load_pem_private_key(self.key, password=None, backend=default_backend()) |             self._key = serialization.load_pem_private_key(self.key, password=None, backend=default_backend()) | ||||||
| @ -160,9 +212,15 @@ class CertBundle(BaseModel): | |||||||
|                 raise |                 raise | ||||||
|             raise AWSValidationException('The certificate is not PEM-encoded or is not valid.') |             raise AWSValidationException('The certificate is not PEM-encoded or is not valid.') | ||||||
| 
 | 
 | ||||||
|  |     def check(self): | ||||||
|  |         # Basically, if the certificate is pending, and then checked again after 1 min | ||||||
|  |         # It will appear as if its been validated | ||||||
|  |         if self.type == 'AMAZON_ISSUED' and self.status == 'PENDING_VALIDATION' and \ | ||||||
|  |            (datetime.datetime.now() - self.created_at).total_seconds() > 60:  # 1min | ||||||
|  |             self.status = 'ISSUED' | ||||||
|  | 
 | ||||||
|     def describe(self): |     def describe(self): | ||||||
|         # 'RenewalSummary': {},  # Only when cert is amazon issued |         # 'RenewalSummary': {},  # Only when cert is amazon issued | ||||||
| 
 |  | ||||||
|         if self._key.key_size == 1024: |         if self._key.key_size == 1024: | ||||||
|             key_algo = 'RSA_1024' |             key_algo = 'RSA_1024' | ||||||
|         elif self._key.key_size == 2048: |         elif self._key.key_size == 2048: | ||||||
| @ -170,6 +228,12 @@ class CertBundle(BaseModel): | |||||||
|         else: |         else: | ||||||
|             key_algo = 'EC_prime256v1' |             key_algo = 'EC_prime256v1' | ||||||
| 
 | 
 | ||||||
|  |         # Look for SANs | ||||||
|  |         san_obj = self._cert.extensions.get_extension_for_oid(cryptography.x509.OID_SUBJECT_ALTERNATIVE_NAME) | ||||||
|  |         sans = [] | ||||||
|  |         if san_obj is not None: | ||||||
|  |             sans = [item.value for item in san_obj.value] | ||||||
|  | 
 | ||||||
|         result = { |         result = { | ||||||
|             'Certificate': { |             'Certificate': { | ||||||
|                 'CertificateArn': self.arn, |                 'CertificateArn': self.arn, | ||||||
| @ -181,9 +245,9 @@ class CertBundle(BaseModel): | |||||||
|                 'NotBefore': datetime_to_epoch(self._cert.not_valid_before), |                 'NotBefore': datetime_to_epoch(self._cert.not_valid_before), | ||||||
|                 'Serial': self._cert.serial, |                 'Serial': self._cert.serial, | ||||||
|                 'SignatureAlgorithm': self._cert.signature_algorithm_oid._name.upper().replace('ENCRYPTION', ''), |                 'SignatureAlgorithm': self._cert.signature_algorithm_oid._name.upper().replace('ENCRYPTION', ''), | ||||||
|                 'Status': 'ISSUED',  # One of PENDING_VALIDATION, ISSUED, INACTIVE, EXPIRED, VALIDATION_TIMED_OUT, REVOKED, FAILED. |                 'Status': self.status,  # One of PENDING_VALIDATION, ISSUED, INACTIVE, EXPIRED, VALIDATION_TIMED_OUT, REVOKED, FAILED. | ||||||
|                 'Subject': 'CN={0}'.format(self.common_name), |                 'Subject': 'CN={0}'.format(self.common_name), | ||||||
|                 'SubjectAlternativeNames': [], |                 'SubjectAlternativeNames': sans, | ||||||
|                 'Type': self.type  # One of IMPORTED, AMAZON_ISSUED |                 'Type': self.type  # One of IMPORTED, AMAZON_ISSUED | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| @ -208,16 +272,44 @@ class AWSCertificateManagerBackend(BaseBackend): | |||||||
|         super(AWSCertificateManagerBackend, self).__init__() |         super(AWSCertificateManagerBackend, self).__init__() | ||||||
|         self.region = region |         self.region = region | ||||||
|         self._certificates = {} |         self._certificates = {} | ||||||
|  |         self._idempotency_tokens = {} | ||||||
| 
 | 
 | ||||||
|     def reset(self): |     def reset(self): | ||||||
|         region = self.region |         region = self.region | ||||||
|         self.__dict__ = {} |         self.__dict__ = {} | ||||||
|         self.__init__(region) |         self.__init__(region) | ||||||
| 
 | 
 | ||||||
|     def _arn_not_found(self, arn): |     @staticmethod | ||||||
|  |     def _arn_not_found(arn): | ||||||
|         msg = 'Certificate with arn {0} not found in account {1}'.format(arn, DEFAULT_ACCOUNT_ID) |         msg = 'Certificate with arn {0} not found in account {1}'.format(arn, DEFAULT_ACCOUNT_ID) | ||||||
|         return AWSResourceNotFoundException(msg) |         return AWSResourceNotFoundException(msg) | ||||||
| 
 | 
 | ||||||
|  |     def _get_arn_from_idempotency_token(self, token): | ||||||
|  |         """ | ||||||
|  |         If token doesnt exist, return None, later it will be | ||||||
|  |         set with an expiry and arn. | ||||||
|  | 
 | ||||||
|  |         If token expiry has passed, delete entry and return None | ||||||
|  | 
 | ||||||
|  |         Else return ARN | ||||||
|  | 
 | ||||||
|  |         :param token: String token | ||||||
|  |         :return: None or ARN | ||||||
|  |         """ | ||||||
|  |         now = datetime.datetime.now() | ||||||
|  |         if token in self._idempotency_tokens: | ||||||
|  |             if self._idempotency_tokens[token]['expires'] < now: | ||||||
|  |                 # Token has expired, new request | ||||||
|  |                 del self._idempotency_tokens[token] | ||||||
|  |                 return None | ||||||
|  |             else: | ||||||
|  |                 return self._idempotency_tokens[token]['arn'] | ||||||
|  | 
 | ||||||
|  |         return None | ||||||
|  | 
 | ||||||
|  |     def _set_idempotency_token_arn(self, token, arn): | ||||||
|  |         self._idempotency_tokens[token] = {'arn': arn, 'expires': datetime.datetime.now() + datetime.timedelta(hours=1)} | ||||||
|  | 
 | ||||||
|     def import_cert(self, certificate, private_key, chain=None, arn=None): |     def import_cert(self, certificate, private_key, chain=None, arn=None): | ||||||
|         if arn is not None: |         if arn is not None: | ||||||
|             if arn not in self._certificates: |             if arn not in self._certificates: | ||||||
| @ -240,13 +332,16 @@ class AWSCertificateManagerBackend(BaseBackend): | |||||||
|         :return: List of certificates |         :return: List of certificates | ||||||
|         :rtype: list of CertBundle |         :rtype: list of CertBundle | ||||||
|         """ |         """ | ||||||
|         return self._certificates.values() |         for arn in self._certificates.keys(): | ||||||
|  |             yield self.get_certificate(arn) | ||||||
| 
 | 
 | ||||||
|     def get_certificate(self, arn): |     def get_certificate(self, arn): | ||||||
|         if arn not in self._certificates: |         if arn not in self._certificates: | ||||||
|             raise self._arn_not_found(arn) |             raise self._arn_not_found(arn) | ||||||
| 
 | 
 | ||||||
|         return self._certificates[arn] |         cert_bundle = self._certificates[arn] | ||||||
|  |         cert_bundle.check() | ||||||
|  |         return cert_bundle | ||||||
| 
 | 
 | ||||||
|     def delete_certificate(self, arn): |     def delete_certificate(self, arn): | ||||||
|         if arn not in self._certificates: |         if arn not in self._certificates: | ||||||
| @ -254,6 +349,19 @@ class AWSCertificateManagerBackend(BaseBackend): | |||||||
| 
 | 
 | ||||||
|         del self._certificates[arn] |         del self._certificates[arn] | ||||||
| 
 | 
 | ||||||
|  |     def request_certificate(self, domain_name, domain_validation_options, idempotency_token, subject_alt_names): | ||||||
|  |         if idempotency_token is not None: | ||||||
|  |             arn = self._get_arn_from_idempotency_token(idempotency_token) | ||||||
|  |             if arn is not None: | ||||||
|  |                 return arn | ||||||
|  | 
 | ||||||
|  |         cert = CertBundle.generate_cert(domain_name, subject_alt_names) | ||||||
|  |         if idempotency_token is not None: | ||||||
|  |             self._set_idempotency_token_arn(idempotency_token, cert.arn) | ||||||
|  |         self._certificates[cert.arn] = cert | ||||||
|  | 
 | ||||||
|  |         return cert.arn | ||||||
|  | 
 | ||||||
|     def add_tags_to_certificate(self, arn, tags): |     def add_tags_to_certificate(self, arn, tags): | ||||||
|         # get_cert does arn check |         # get_cert does arn check | ||||||
|         cert_bundle = self.get_certificate(arn) |         cert_bundle = self.get_certificate(arn) | ||||||
|  | |||||||
| @ -34,7 +34,7 @@ class AWSCertificateManagerResponse(BaseResponse): | |||||||
| 
 | 
 | ||||||
|         if arn is None: |         if arn is None: | ||||||
|             msg = 'A required parameter for the specified action is not supplied.' |             msg = 'A required parameter for the specified action is not supplied.' | ||||||
|             return {'__type': 'MissingParameter', 'message': msg}, dict(status=400) |             return json.dumps({'__type': 'MissingParameter', 'message': msg}), dict(status=400) | ||||||
| 
 | 
 | ||||||
|         try: |         try: | ||||||
|             self.acm_backend.add_tags_to_certificate(arn, tags) |             self.acm_backend.add_tags_to_certificate(arn, tags) | ||||||
| @ -48,7 +48,7 @@ class AWSCertificateManagerResponse(BaseResponse): | |||||||
| 
 | 
 | ||||||
|         if arn is None: |         if arn is None: | ||||||
|             msg = 'A required parameter for the specified action is not supplied.' |             msg = 'A required parameter for the specified action is not supplied.' | ||||||
|             return {'__type': 'MissingParameter', 'message': msg}, dict(status=400) |             return json.dumps({'__type': 'MissingParameter', 'message': msg}), dict(status=400) | ||||||
| 
 | 
 | ||||||
|         try: |         try: | ||||||
|             self.acm_backend.delete_certificate(arn) |             self.acm_backend.delete_certificate(arn) | ||||||
| @ -62,7 +62,7 @@ class AWSCertificateManagerResponse(BaseResponse): | |||||||
| 
 | 
 | ||||||
|         if arn is None: |         if arn is None: | ||||||
|             msg = 'A required parameter for the specified action is not supplied.' |             msg = 'A required parameter for the specified action is not supplied.' | ||||||
|             return {'__type': 'MissingParameter', 'message': msg}, dict(status=400) |             return json.dumps({'__type': 'MissingParameter', 'message': msg}), dict(status=400) | ||||||
| 
 | 
 | ||||||
|         try: |         try: | ||||||
|             cert_bundle = self.acm_backend.get_certificate(arn) |             cert_bundle = self.acm_backend.get_certificate(arn) | ||||||
| @ -76,7 +76,7 @@ class AWSCertificateManagerResponse(BaseResponse): | |||||||
| 
 | 
 | ||||||
|         if arn is None: |         if arn is None: | ||||||
|             msg = 'A required parameter for the specified action is not supplied.' |             msg = 'A required parameter for the specified action is not supplied.' | ||||||
|             return {'__type': 'MissingParameter', 'message': msg}, dict(status=400) |             return json.dumps({'__type': 'MissingParameter', 'message': msg}), dict(status=400) | ||||||
| 
 | 
 | ||||||
|         try: |         try: | ||||||
|             cert_bundle = self.acm_backend.get_certificate(arn) |             cert_bundle = self.acm_backend.get_certificate(arn) | ||||||
| @ -170,7 +170,7 @@ class AWSCertificateManagerResponse(BaseResponse): | |||||||
| 
 | 
 | ||||||
|         if arn is None: |         if arn is None: | ||||||
|             msg = 'A required parameter for the specified action is not supplied.' |             msg = 'A required parameter for the specified action is not supplied.' | ||||||
|             return {'__type': 'MissingParameter', 'message': msg}, dict(status=400) |             return json.dumps({'__type': 'MissingParameter', 'message': msg}), dict(status=400) | ||||||
| 
 | 
 | ||||||
|         try: |         try: | ||||||
|             self.acm_backend.remove_tags_from_certificate(arn, tags) |             self.acm_backend.remove_tags_from_certificate(arn, tags) | ||||||
| @ -180,7 +180,45 @@ class AWSCertificateManagerResponse(BaseResponse): | |||||||
|         return '' |         return '' | ||||||
| 
 | 
 | ||||||
|     def request_certificate(self): |     def request_certificate(self): | ||||||
|         raise NotImplementedError() |         domain_name = self._get_param('DomainName') | ||||||
|  |         domain_validation_options = self._get_param('DomainValidationOptions')  # is ignored atm | ||||||
|  |         idempotency_token = self._get_param('IdempotencyToken') | ||||||
|  |         subject_alt_names = self._get_param('SubjectAlternativeNames') | ||||||
|  | 
 | ||||||
|  |         if len(subject_alt_names) > 10: | ||||||
|  |             # There is initial AWS limit of 10 | ||||||
|  |             msg = 'An ACM limit has been exceeded. Need to request SAN limit to be raised' | ||||||
|  |             return json.dumps({'__type': 'LimitExceededException', 'message': msg}), dict(status=400) | ||||||
|  | 
 | ||||||
|  |         try: | ||||||
|  |             arn = self.acm_backend.request_certificate(domain_name, domain_validation_options, idempotency_token, subject_alt_names) | ||||||
|  |         except AWSError as err: | ||||||
|  |             return err.response() | ||||||
|  | 
 | ||||||
|  |         return json.dumps({'CertificateArn': arn}) | ||||||
| 
 | 
 | ||||||
|     def resend_validation_email(self): |     def resend_validation_email(self): | ||||||
|         raise NotImplementedError() |         arn = self._get_param('CertificateArn') | ||||||
|  |         domain = self._get_param('Domain') | ||||||
|  |         # ValidationDomain not used yet. | ||||||
|  |         # Contains domain which is equal to or a subset of Domain | ||||||
|  |         # that AWS will send validation emails to | ||||||
|  |         # https://docs.aws.amazon.com/acm/latest/APIReference/API_ResendValidationEmail.html | ||||||
|  |         # validation_domain = self._get_param('ValidationDomain') | ||||||
|  | 
 | ||||||
|  |         if arn is None: | ||||||
|  |             msg = 'A required parameter for the specified action is not supplied.' | ||||||
|  |             return json.dumps({'__type': 'MissingParameter', 'message': msg}), dict(status=400) | ||||||
|  | 
 | ||||||
|  |         try: | ||||||
|  |             cert_bundle = self.acm_backend.get_certificate(arn) | ||||||
|  | 
 | ||||||
|  |             if cert_bundle.common_name != domain: | ||||||
|  |                 msg = 'Parameter Domain does not match certificate domain' | ||||||
|  |                 _type = 'InvalidDomainValidationOptionsException' | ||||||
|  |                 return json.dumps({'__type': _type, 'message': msg}), dict(status=400) | ||||||
|  | 
 | ||||||
|  |         except AWSError as err: | ||||||
|  |             return err.response() | ||||||
|  | 
 | ||||||
|  |         return '' | ||||||
|  | |||||||
| @ -2,6 +2,7 @@ from __future__ import unicode_literals | |||||||
| 
 | 
 | ||||||
| import os | import os | ||||||
| import boto3 | import boto3 | ||||||
|  | from freezegun import freeze_time | ||||||
| import sure  # noqa | import sure  # noqa | ||||||
| 
 | 
 | ||||||
| from botocore.exceptions import ClientError | from botocore.exceptions import ClientError | ||||||
| @ -235,7 +236,122 @@ def test_remove_tags_from_invalid_certificate(): | |||||||
|         raise RuntimeError('Should of raised ResourceNotFoundException') |         raise RuntimeError('Should of raised ResourceNotFoundException') | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @mock_acm | ||||||
|  | def test_resend_validation_email(): | ||||||
|  |     client = boto3.client('acm', region_name='eu-central-1') | ||||||
|  |     arn = _import_cert(client) | ||||||
|  | 
 | ||||||
|  |     client.resend_validation_email( | ||||||
|  |         CertificateArn=arn, | ||||||
|  |         Domain='*.moto.com', | ||||||
|  |         ValidationDomain='NOTUSEDYET' | ||||||
|  |     ) | ||||||
|  |     # Returns nothing, boto would raise Exceptions otherwise | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @mock_acm | ||||||
|  | def test_resend_validation_email_invalid(): | ||||||
|  |     client = boto3.client('acm', region_name='eu-central-1') | ||||||
|  |     arn = _import_cert(client) | ||||||
|  | 
 | ||||||
|  |     try: | ||||||
|  |         client.resend_validation_email( | ||||||
|  |             CertificateArn=arn, | ||||||
|  |             Domain='no-match.moto.com', | ||||||
|  |             ValidationDomain='NOTUSEDYET' | ||||||
|  |         ) | ||||||
|  |     except ClientError as err: | ||||||
|  |         err.response['Error']['Code'].should.equal('InvalidDomainValidationOptionsException') | ||||||
|  |     else: | ||||||
|  |         raise RuntimeError('Should of raised InvalidDomainValidationOptionsException') | ||||||
|  | 
 | ||||||
|  |     try: | ||||||
|  |         client.resend_validation_email( | ||||||
|  |             CertificateArn=BAD_ARN, | ||||||
|  |             Domain='no-match.moto.com', | ||||||
|  |             ValidationDomain='NOTUSEDYET' | ||||||
|  |         ) | ||||||
|  |     except ClientError as err: | ||||||
|  |         err.response['Error']['Code'].should.equal('ResourceNotFoundException') | ||||||
|  |     else: | ||||||
|  |         raise RuntimeError('Should of raised ResourceNotFoundException') | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @mock_acm | ||||||
|  | def test_request_certificate(): | ||||||
|  |     client = boto3.client('acm', region_name='eu-central-1') | ||||||
|  | 
 | ||||||
|  |     resp = client.request_certificate( | ||||||
|  |         DomainName='google.com', | ||||||
|  |         SubjectAlternativeNames=['google.com', 'www.google.com', 'mail.google.com'], | ||||||
|  |     ) | ||||||
|  |     resp.should.contain('CertificateArn') | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # # Also tests the SAN code | ||||||
|  | # # requires Pull: https://github.com/spulec/freezegun/pull/210 | ||||||
|  | # @freeze_time("2012-01-01 12:00:00", as_arg=True) | ||||||
|  | # @mock_acm | ||||||
|  | # def test_request_certificate(frozen_time): | ||||||
|  | #     # After requesting a certificate, it should then auto-validate after 1 minute | ||||||
|  | #     # Some sneaky programming for that ;-) | ||||||
|  | #     client = boto3.client('acm', region_name='eu-central-1') | ||||||
|  | # | ||||||
|  | #     resp = client.request_certificate( | ||||||
|  | #         DomainName='google.com', | ||||||
|  | #         SubjectAlternativeNames=['google.com', 'www.google.com', 'mail.google.com'], | ||||||
|  | #     ) | ||||||
|  | #     arn = resp['CertificateArn'] | ||||||
|  | # | ||||||
|  | #     resp = client.describe_certificate(CertificateArn=arn) | ||||||
|  | #     resp['Certificate']['CertificateArn'].should.equal(arn) | ||||||
|  | #     resp['Certificate']['DomainName'].should.equal('google.com') | ||||||
|  | #     resp['Certificate']['Issuer'].should.equal('Amazon') | ||||||
|  | #     resp['Certificate']['KeyAlgorithm'].should.equal('RSA_2048') | ||||||
|  | #     resp['Certificate']['Status'].should.equal('PENDING_VALIDATION') | ||||||
|  | #     resp['Certificate']['Type'].should.equal('AMAZON_ISSUED') | ||||||
|  | #     len(resp['Certificate']['SubjectAlternativeNames']).should.equal(3) | ||||||
|  | # | ||||||
|  | #     # Move time | ||||||
|  | #     frozen_time.move_to('2012-01-01 12:02:00') | ||||||
|  | #     resp = client.describe_certificate(CertificateArn=arn) | ||||||
|  | #     resp['Certificate']['CertificateArn'].should.equal(arn) | ||||||
|  | #     resp['Certificate']['Status'].should.equal('ISSUED') | ||||||
|  | # | ||||||
|  | # | ||||||
|  | # # requires Pull: https://github.com/spulec/freezegun/pull/210 | ||||||
|  | # @freeze_time("2012-01-01 12:00:00", as_arg=True) | ||||||
|  | # @mock_acm | ||||||
|  | # def test_request_certificate(frozen_time): | ||||||
|  | #     # After requesting a certificate, it should then auto-validate after 1 minute | ||||||
|  | #     # Some sneaky programming for that ;-) | ||||||
|  | #     client = boto3.client('acm', region_name='eu-central-1') | ||||||
|  | # | ||||||
|  | #     resp = client.request_certificate( | ||||||
|  | #         IdempotencyToken='test_token', | ||||||
|  | #         DomainName='google.com', | ||||||
|  | #         SubjectAlternativeNames=['google.com', 'www.google.com', 'mail.google.com'], | ||||||
|  | #     ) | ||||||
|  | #     original_arn = resp['CertificateArn'] | ||||||
|  | # | ||||||
|  | #     # Should be able to request a certificate multiple times in an hour | ||||||
|  | #     # after that it makes a new one | ||||||
|  | #     for time_intervals in ('2012-01-01 12:15:00', '2012-01-01 12:30:00', '2012-01-01 12:45:00'): | ||||||
|  | #         frozen_time.move_to(time_intervals) | ||||||
|  | #         resp = client.request_certificate( | ||||||
|  | #             IdempotencyToken='test_token', | ||||||
|  | #             DomainName='google.com', | ||||||
|  | #             SubjectAlternativeNames=['google.com', 'www.google.com', 'mail.google.com'], | ||||||
|  | #         ) | ||||||
|  | #         arn = resp['CertificateArn'] | ||||||
|  | #         arn.should.equal(original_arn) | ||||||
|  | # | ||||||
|  | #     # Move time | ||||||
|  | #     frozen_time.move_to('2012-01-01 13:01:00') | ||||||
|  | #     resp = client.request_certificate( | ||||||
|  | #         IdempotencyToken='test_token', | ||||||
|  | #         DomainName='google.com', | ||||||
|  | #         SubjectAlternativeNames=['google.com', 'www.google.com', 'mail.google.com'], | ||||||
|  | #     ) | ||||||
|  | #     arn = resp['CertificateArn'] | ||||||
|  | #     arn.should_not.equal(original_arn) | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user