from __future__ import unicode_literals import os import boto3 from freezegun import freeze_time import sure # noqa import uuid from botocore.exceptions import ClientError from moto import mock_acm RESOURCE_FOLDER = os.path.join(os.path.dirname(__file__), 'resources') _GET_RESOURCE = lambda x: open(os.path.join(RESOURCE_FOLDER, x), 'rb').read() CA_CRT = _GET_RESOURCE('ca.pem') CA_KEY = _GET_RESOURCE('ca.key') SERVER_CRT = _GET_RESOURCE('star_moto_com.pem') SERVER_COMMON_NAME = '*.moto.com' SERVER_CRT_BAD = _GET_RESOURCE('star_moto_com-bad.pem') SERVER_KEY = _GET_RESOURCE('star_moto_com.key') BAD_ARN = 'arn:aws:acm:us-east-2:123456789012:certificate/_0000000-0000-0000-0000-000000000000' def _import_cert(client): response = client.import_certificate( Certificate=SERVER_CRT, PrivateKey=SERVER_KEY, CertificateChain=CA_CRT ) return response['CertificateArn'] # Also tests GetCertificate @mock_acm def test_import_certificate(): client = boto3.client('acm', region_name='eu-central-1') resp = client.import_certificate( Certificate=SERVER_CRT, PrivateKey=SERVER_KEY, CertificateChain=CA_CRT ) resp = client.get_certificate(CertificateArn=resp['CertificateArn']) resp['Certificate'].should.equal(SERVER_CRT.decode()) resp.should.contain('CertificateChain') @mock_acm def test_import_bad_certificate(): client = boto3.client('acm', region_name='eu-central-1') try: client.import_certificate( Certificate=SERVER_CRT_BAD, PrivateKey=SERVER_KEY, ) except ClientError as err: err.response['Error']['Code'].should.equal('ValidationException') else: raise RuntimeError('Should of raised ValidationException') @mock_acm def test_list_certificates(): client = boto3.client('acm', region_name='eu-central-1') arn = _import_cert(client) resp = client.list_certificates() len(resp['CertificateSummaryList']).should.equal(1) resp['CertificateSummaryList'][0]['CertificateArn'].should.equal(arn) resp['CertificateSummaryList'][0]['DomainName'].should.equal(SERVER_COMMON_NAME) @mock_acm def test_list_certificates_by_status(): client = boto3.client('acm', region_name='eu-central-1') issued_arn = _import_cert(client) pending_arn = client.request_certificate(DomainName='google.com')['CertificateArn'] resp = client.list_certificates() len(resp['CertificateSummaryList']).should.equal(2) resp = client.list_certificates(CertificateStatuses=['EXPIRED', 'INACTIVE']) len(resp['CertificateSummaryList']).should.equal(0) resp = client.list_certificates(CertificateStatuses=['PENDING_VALIDATION']) len(resp['CertificateSummaryList']).should.equal(1) resp['CertificateSummaryList'][0]['CertificateArn'].should.equal(pending_arn) resp = client.list_certificates(CertificateStatuses=['ISSUED']) len(resp['CertificateSummaryList']).should.equal(1) resp['CertificateSummaryList'][0]['CertificateArn'].should.equal(issued_arn) resp = client.list_certificates(CertificateStatuses=['ISSUED', 'PENDING_VALIDATION']) len(resp['CertificateSummaryList']).should.equal(2) arns = {cert['CertificateArn'] for cert in resp['CertificateSummaryList']} arns.should.contain(issued_arn) arns.should.contain(pending_arn) @mock_acm def test_get_invalid_certificate(): client = boto3.client('acm', region_name='eu-central-1') try: client.get_certificate(CertificateArn=BAD_ARN) except ClientError as err: err.response['Error']['Code'].should.equal('ResourceNotFoundException') else: raise RuntimeError('Should of raised ResourceNotFoundException') # Also tests deleting invalid certificate @mock_acm def test_delete_certificate(): client = boto3.client('acm', region_name='eu-central-1') arn = _import_cert(client) # If it does not raise an error and the next call does, all is fine client.delete_certificate(CertificateArn=arn) try: client.delete_certificate(CertificateArn=arn) except ClientError as err: err.response['Error']['Code'].should.equal('ResourceNotFoundException') else: raise RuntimeError('Should of raised ResourceNotFoundException') @mock_acm def test_describe_certificate(): client = boto3.client('acm', region_name='eu-central-1') arn = _import_cert(client) resp = client.describe_certificate(CertificateArn=arn) resp['Certificate']['CertificateArn'].should.equal(arn) resp['Certificate']['DomainName'].should.equal(SERVER_COMMON_NAME) resp['Certificate']['Issuer'].should.equal('Moto') resp['Certificate']['KeyAlgorithm'].should.equal('RSA_2048') resp['Certificate']['Status'].should.equal('ISSUED') resp['Certificate']['Type'].should.equal('IMPORTED') @mock_acm def test_describe_certificate(): client = boto3.client('acm', region_name='eu-central-1') try: client.describe_certificate(CertificateArn=BAD_ARN) except ClientError as err: err.response['Error']['Code'].should.equal('ResourceNotFoundException') else: raise RuntimeError('Should of raised ResourceNotFoundException') # Also tests ListTagsForCertificate @mock_acm def test_add_tags_to_certificate(): client = boto3.client('acm', region_name='eu-central-1') arn = _import_cert(client) client.add_tags_to_certificate( CertificateArn=arn, Tags=[ {'Key': 'key1', 'Value': 'value1'}, {'Key': 'key2'}, ] ) resp = client.list_tags_for_certificate(CertificateArn=arn) tags = {item['Key']: item.get('Value', '__NONE__') for item in resp['Tags']} tags.should.contain('key1') tags.should.contain('key2') tags['key1'].should.equal('value1') # This way, it ensures that we can detect if None is passed back when it shouldnt, # as we store keys without values with a value of None, but it shouldnt be passed back tags['key2'].should.equal('__NONE__') @mock_acm def test_add_tags_to_invalid_certificate(): client = boto3.client('acm', region_name='eu-central-1') try: client.add_tags_to_certificate( CertificateArn=BAD_ARN, Tags=[ {'Key': 'key1', 'Value': 'value1'}, {'Key': 'key2'}, ] ) except ClientError as err: err.response['Error']['Code'].should.equal('ResourceNotFoundException') else: raise RuntimeError('Should of raised ResourceNotFoundException') @mock_acm def test_list_tags_for_invalid_certificate(): client = boto3.client('acm', region_name='eu-central-1') try: client.list_tags_for_certificate(CertificateArn=BAD_ARN) except ClientError as err: err.response['Error']['Code'].should.equal('ResourceNotFoundException') else: raise RuntimeError('Should of raised ResourceNotFoundException') @mock_acm def test_remove_tags_from_certificate(): client = boto3.client('acm', region_name='eu-central-1') arn = _import_cert(client) client.add_tags_to_certificate( CertificateArn=arn, Tags=[ {'Key': 'key1', 'Value': 'value1'}, {'Key': 'key2'}, {'Key': 'key3', 'Value': 'value3'}, {'Key': 'key4', 'Value': 'value4'}, ] ) client.remove_tags_from_certificate( CertificateArn=arn, Tags=[ {'Key': 'key1', 'Value': 'value2'}, # Should not remove as doesnt match {'Key': 'key2'}, # Single key removal {'Key': 'key3', 'Value': 'value3'}, # Exact match removal {'Key': 'key4'} # Partial match removal ] ) resp = client.list_tags_for_certificate(CertificateArn=arn) tags = {item['Key']: item.get('Value', '__NONE__') for item in resp['Tags']} for key in ('key2', 'key3', 'key4'): tags.should_not.contain(key) tags.should.contain('key1') @mock_acm def test_remove_tags_from_invalid_certificate(): client = boto3.client('acm', region_name='eu-central-1') try: client.remove_tags_from_certificate( CertificateArn=BAD_ARN, Tags=[ {'Key': 'key1', 'Value': 'value1'}, {'Key': 'key2'}, ] ) except ClientError as err: err.response['Error']['Code'].should.equal('ResourceNotFoundException') else: raise RuntimeError('Should of raised ResourceNotFoundException') @mock_acm def test_resend_validation_email(): client = boto3.client('acm', region_name='eu-central-1') arn = _import_cert(client) client.resend_validation_email( CertificateArn=arn, Domain='*.moto.com', ValidationDomain='NOTUSEDYET' ) # Returns nothing, boto would raise Exceptions otherwise @mock_acm def test_resend_validation_email_invalid(): client = boto3.client('acm', region_name='eu-central-1') arn = _import_cert(client) try: client.resend_validation_email( CertificateArn=arn, Domain='no-match.moto.com', ValidationDomain='NOTUSEDYET' ) except ClientError as err: err.response['Error']['Code'].should.equal('InvalidDomainValidationOptionsException') else: raise RuntimeError('Should of raised InvalidDomainValidationOptionsException') try: client.resend_validation_email( CertificateArn=BAD_ARN, Domain='no-match.moto.com', ValidationDomain='NOTUSEDYET' ) except ClientError as err: err.response['Error']['Code'].should.equal('ResourceNotFoundException') else: raise RuntimeError('Should of raised ResourceNotFoundException') @mock_acm def test_request_certificate(): client = boto3.client('acm', region_name='eu-central-1') token = str(uuid.uuid4()) resp = client.request_certificate( DomainName='google.com', IdempotencyToken=token, SubjectAlternativeNames=['google.com', 'www.google.com', 'mail.google.com'], ) resp.should.contain('CertificateArn') arn = resp['CertificateArn'] arn.should.match(r"arn:aws:acm:eu-central-1:\d{12}:certificate/") resp = client.request_certificate( DomainName='google.com', IdempotencyToken=token, SubjectAlternativeNames=['google.com', 'www.google.com', 'mail.google.com'], ) resp['CertificateArn'].should.equal(arn) @mock_acm def test_request_certificate_no_san(): client = boto3.client('acm', region_name='eu-central-1') resp = client.request_certificate( DomainName='google.com' ) resp.should.contain('CertificateArn') resp2 = client.describe_certificate( CertificateArn=resp['CertificateArn'] ) resp2.should.contain('Certificate') # # Also tests the SAN code # # requires Pull: https://github.com/spulec/freezegun/pull/210 # @freeze_time("2012-01-01 12:00:00", as_arg=True) # @mock_acm # def test_request_certificate(frozen_time): # # After requesting a certificate, it should then auto-validate after 1 minute # # Some sneaky programming for that ;-) # client = boto3.client('acm', region_name='eu-central-1') # # resp = client.request_certificate( # DomainName='google.com', # SubjectAlternativeNames=['google.com', 'www.google.com', 'mail.google.com'], # ) # arn = resp['CertificateArn'] # # resp = client.describe_certificate(CertificateArn=arn) # resp['Certificate']['CertificateArn'].should.equal(arn) # resp['Certificate']['DomainName'].should.equal('google.com') # resp['Certificate']['Issuer'].should.equal('Amazon') # resp['Certificate']['KeyAlgorithm'].should.equal('RSA_2048') # resp['Certificate']['Status'].should.equal('PENDING_VALIDATION') # resp['Certificate']['Type'].should.equal('AMAZON_ISSUED') # len(resp['Certificate']['SubjectAlternativeNames']).should.equal(3) # # # Move time # frozen_time.move_to('2012-01-01 12:02:00') # resp = client.describe_certificate(CertificateArn=arn) # resp['Certificate']['CertificateArn'].should.equal(arn) # resp['Certificate']['Status'].should.equal('ISSUED') # # # # requires Pull: https://github.com/spulec/freezegun/pull/210 # @freeze_time("2012-01-01 12:00:00", as_arg=True) # @mock_acm # def test_request_certificate(frozen_time): # # After requesting a certificate, it should then auto-validate after 1 minute # # Some sneaky programming for that ;-) # client = boto3.client('acm', region_name='eu-central-1') # # resp = client.request_certificate( # IdempotencyToken='test_token', # DomainName='google.com', # SubjectAlternativeNames=['google.com', 'www.google.com', 'mail.google.com'], # ) # original_arn = resp['CertificateArn'] # # # Should be able to request a certificate multiple times in an hour # # after that it makes a new one # for time_intervals in ('2012-01-01 12:15:00', '2012-01-01 12:30:00', '2012-01-01 12:45:00'): # frozen_time.move_to(time_intervals) # resp = client.request_certificate( # IdempotencyToken='test_token', # DomainName='google.com', # SubjectAlternativeNames=['google.com', 'www.google.com', 'mail.google.com'], # ) # arn = resp['CertificateArn'] # arn.should.equal(original_arn) # # # Move time # frozen_time.move_to('2012-01-01 13:01:00') # resp = client.request_certificate( # IdempotencyToken='test_token', # DomainName='google.com', # SubjectAlternativeNames=['google.com', 'www.google.com', 'mail.google.com'], # ) # arn = resp['CertificateArn'] # arn.should_not.equal(original_arn)