From edbbbf6d200b95279f91266f01507a30342a912d Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Fri, 22 Sep 2017 11:21:36 +0100 Subject: [PATCH] Nearly finished implementation and tests --- moto/acm/models.py | 54 +++++++++- moto/acm/responses.py | 32 +++++- tests/test_acm/test_acm.py | 211 ++++++++++++++++++++++++++++++++++++- 3 files changed, 287 insertions(+), 10 deletions(-) diff --git a/moto/acm/models.py b/moto/acm/models.py index b51d7aa68..e4b0204c1 100644 --- a/moto/acm/models.py +++ b/moto/acm/models.py @@ -43,6 +43,11 @@ MqO5tzHpCvX2HzLc # so for now a cheap response is just give any old root CA +def datetime_to_epoch(date): + # As only Py3 has datetime.timestamp() + return int((date - datetime.datetime(1970, 1, 1)).total_seconds()) + + class AWSError(Exception): TYPE = None STATUS = 400 @@ -64,7 +69,8 @@ class AWSResourceNotFoundException(AWSError): class CertBundle(BaseModel): - def __init__(self, certificate, private_key, chain=None, region='us-east-1', arn=None): + def __init__(self, certificate, private_key, chain=None, region='us-east-1', arn=None, cert_type='IMPORTED'): + self.created_at = datetime.datetime.now() self.cert = certificate self._cert = None self.common_name = None @@ -73,6 +79,7 @@ class CertBundle(BaseModel): self.chain = chain self.tags = {} self._chain = None + self.type = cert_type # Should really be an enum # AWS always returns your chain + root CA if self.chain is None: @@ -132,10 +139,13 @@ class CertBundle(BaseModel): self._chain = [] for cert_armored in self.chain.split(b'-\n-'): + # Would leave encoded but Py2 does not have raw binary strings + cert_armored = cert_armored.decode() + # Fix missing -'s on split - cert_armored = re.sub(rb'^----B', b'-----B', cert_armored) - cert_armored = re.sub(rb'E----$', b'E-----', cert_armored) - cert = cryptography.x509.load_pem_x509_certificate(cert_armored, default_backend()) + cert_armored = re.sub(r'^----B', '-----B', cert_armored) + cert_armored = re.sub(r'E----$', 'E-----', cert_armored) + cert = cryptography.x509.load_pem_x509_certificate(cert_armored.encode(), default_backend()) self._chain.append(cert) now = datetime.datetime.now() @@ -150,6 +160,42 @@ class CertBundle(BaseModel): raise raise AWSValidationException('The certificate is not PEM-encoded or is not valid.') + def describe(self): + #'RenewalSummary': {}, # Only when cert is amazon issued + + if self._key.key_size == 1024: + key_algo = 'RSA_1024' + elif self._key.key_size == 2048: + key_algo = 'RSA_2048' + else: + key_algo = 'EC_prime256v1' + + result = { + 'Certificate': { + 'CertificateArn': self.arn, + 'DomainName': self.common_name, + 'InUseBy': [], + 'Issuer': self._cert.issuer.get_attributes_for_oid(cryptography.x509.OID_COMMON_NAME)[0].value, + 'KeyAlgorithm': key_algo, + 'NotAfter': datetime_to_epoch(self._cert.not_valid_after), + 'NotBefore': datetime_to_epoch(self._cert.not_valid_before), + 'Serial': self._cert.serial, + 'SignatureAlgorithm': self._cert.signature_algorithm_oid._name.upper().replace('ENCRYPTION', ''), + 'Status': 'ISSUED', # One of PENDING_VALIDATION, ISSUED, INACTIVE, EXPIRED, VALIDATION_TIMED_OUT, REVOKED, FAILED. + 'Subject': 'CN={0}'.format(self.common_name), + 'SubjectAlternativeNames': [], + 'Type': self.type # One of IMPORTED, AMAZON_ISSUED + } + } + + if self.type == 'IMPORTED': + result['Certificate']['ImportedAt'] = datetime_to_epoch(self.created_at) + else: + result['Certificate']['CreatedAt'] = datetime_to_epoch(self.created_at) + result['Certificate']['IssuedAt'] = datetime_to_epoch(self.created_at) + + return result + def __str__(self): return self.arn diff --git a/moto/acm/responses.py b/moto/acm/responses.py index a9243b7ea..35cf64099 100644 --- a/moto/acm/responses.py +++ b/moto/acm/responses.py @@ -30,7 +30,7 @@ class AWSCertificateManagerResponse(BaseResponse): def add_tags_to_certificate(self): arn = self._get_param('CertificateArn') - tags = self._get_list_prefix('Tags') + tags = self._get_param('Tags') if arn is None: msg = 'A required parameter for the specified action is not supplied.' @@ -58,7 +58,18 @@ class AWSCertificateManagerResponse(BaseResponse): return '' def describe_certificate(self): - raise NotImplementedError() + arn = self._get_param('CertificateArn') + + if arn is None: + msg = 'A required parameter for the specified action is not supplied.' + return {'__type': 'MissingParameter', 'message': msg}, dict(status=400) + + try: + cert_bundle = self.acm_backend.get_certificate(arn) + except AWSError as err: + return err.response() + + return json.dumps(cert_bundle.describe()) def get_certificate(self): arn = self._get_param('CertificateArn') @@ -79,7 +90,18 @@ class AWSCertificateManagerResponse(BaseResponse): return json.dumps(result) def import_certificate(self): - # TODO comment on what raises exceptions for all branches + """ + Returns errors on: + Certificate, PrivateKey or Chain not being properly formatted + Arn not existing if its provided + PrivateKey size > 2048 + Certificate expired or is not yet in effect + + Does not return errors on: + Checking Certificate is legit, or a selfsigned chain is provided + + :return: str(JSON) for response + """ certificate = self._get_param('Certificate') private_key = self._get_param('PrivateKey') chain = self._get_param('CertificateChain') # Optional @@ -134,7 +156,7 @@ class AWSCertificateManagerResponse(BaseResponse): result = {'Tags': []} # Tag "objects" can not contain the Value part - for key, value in cert_bundle.tags: + for key, value in cert_bundle.tags.items(): tag_dict = {'Key': key} if value is not None: tag_dict['Value'] = value @@ -144,7 +166,7 @@ class AWSCertificateManagerResponse(BaseResponse): def remove_tags_from_certificate(self): arn = self._get_param('CertificateArn') - tags = self._get_list_prefix('Tags') + tags = self._get_param('Tags') if arn is None: msg = 'A required parameter for the specified action is not supplied.' diff --git a/tests/test_acm/test_acm.py b/tests/test_acm/test_acm.py index b45018e67..ff9ec6510 100644 --- a/tests/test_acm/test_acm.py +++ b/tests/test_acm/test_acm.py @@ -14,10 +14,22 @@ _GET_RESOURCE = lambda x: open(os.path.join(RESOURCE_FOLDER, x), 'rb').read() CA_CRT = _GET_RESOURCE('ca.pem') CA_KEY = _GET_RESOURCE('ca.key') SERVER_CRT = _GET_RESOURCE('star_moto_com.pem') +SERVER_COMMON_NAME = '*.moto.com' SERVER_CRT_BAD = _GET_RESOURCE('star_moto_com-bad.pem') SERVER_KEY = _GET_RESOURCE('star_moto_com.key') +BAD_ARN = 'arn:aws:acm:us-east-2:123456789012:certificate/_0000000-0000-0000-0000-000000000000' +def _import_cert(client): + response = client.import_certificate( + Certificate=SERVER_CRT, + PrivateKey=SERVER_KEY, + CertificateChain=CA_CRT + ) + return response['CertificateArn'] + + +# Also tests GetCertificate @mock_acm def test_import_certificate(): client = boto3.client('acm', region_name='eu-central-1') @@ -29,4 +41,201 @@ def test_import_certificate(): ) resp = client.get_certificate(CertificateArn=resp['CertificateArn']) - print(resp) + resp['Certificate'].should.equal(SERVER_CRT.decode()) + resp.should.contain('CertificateChain') + + +@mock_acm +def test_import_bad_certificate(): + client = boto3.client('acm', region_name='eu-central-1') + + try: + client.import_certificate( + Certificate=SERVER_CRT_BAD, + PrivateKey=SERVER_KEY, + ) + except ClientError as err: + err.response['Error']['Code'].should.equal('ValidationException') + else: + raise RuntimeError('Should of raised ValidationException') + + +@mock_acm +def test_list_certificates(): + client = boto3.client('acm', region_name='eu-central-1') + arn = _import_cert(client) + + resp = client.list_certificates() + len(resp['CertificateSummaryList']).should.equal(1) + + resp['CertificateSummaryList'][0]['CertificateArn'].should.equal(arn) + resp['CertificateSummaryList'][0]['DomainName'].should.equal(SERVER_COMMON_NAME) + + +@mock_acm +def test_get_invalid_certificate(): + client = boto3.client('acm', region_name='eu-central-1') + + try: + client.get_certificate(CertificateArn=BAD_ARN) + except ClientError as err: + err.response['Error']['Code'].should.equal('ResourceNotFoundException') + else: + raise RuntimeError('Should of raised ResourceNotFoundException') + + +# Also tests deleting invalid certificate +@mock_acm +def test_delete_certificate(): + client = boto3.client('acm', region_name='eu-central-1') + arn = _import_cert(client) + + # If it does not raise an error and the next call does, all is fine + client.delete_certificate(CertificateArn=arn) + + try: + client.delete_certificate(CertificateArn=arn) + except ClientError as err: + err.response['Error']['Code'].should.equal('ResourceNotFoundException') + else: + raise RuntimeError('Should of raised ResourceNotFoundException') + + +@mock_acm +def test_describe_certificate(): + client = boto3.client('acm', region_name='eu-central-1') + arn = _import_cert(client) + + resp = client.describe_certificate(CertificateArn=arn) + resp['Certificate']['CertificateArn'].should.equal(arn) + resp['Certificate']['DomainName'].should.equal(SERVER_COMMON_NAME) + resp['Certificate']['Issuer'].should.equal('Moto') + resp['Certificate']['KeyAlgorithm'].should.equal('RSA_2048') + resp['Certificate']['Status'].should.equal('ISSUED') + resp['Certificate']['Type'].should.equal('IMPORTED') + + +@mock_acm +def test_describe_certificate(): + client = boto3.client('acm', region_name='eu-central-1') + + try: + client.describe_certificate(CertificateArn=BAD_ARN) + except ClientError as err: + err.response['Error']['Code'].should.equal('ResourceNotFoundException') + else: + raise RuntimeError('Should of raised ResourceNotFoundException') + + +# Also tests ListTagsForCertificate +@mock_acm +def test_add_tags_to_certificate(): + client = boto3.client('acm', region_name='eu-central-1') + arn = _import_cert(client) + + client.add_tags_to_certificate( + CertificateArn=arn, + Tags=[ + {'Key': 'key1', 'Value': 'value1'}, + {'Key': 'key2'}, + ] + ) + + resp = client.list_tags_for_certificate(CertificateArn=arn) + tags = {item['Key']: item.get('Value', '__NONE__') for item in resp['Tags']} + + tags.should.contain('key1') + tags.should.contain('key2') + tags['key1'].should.equal('value1') + + # This way, it ensures that we can detect if None is passed back when it shouldnt, + # as we store keys without values with a value of None, but it shouldnt be passed back + tags['key2'].should.equal('__NONE__') + + +@mock_acm +def test_add_tags_to_invalid_certificate(): + client = boto3.client('acm', region_name='eu-central-1') + + try: + client.add_tags_to_certificate( + CertificateArn=BAD_ARN, + Tags=[ + {'Key': 'key1', 'Value': 'value1'}, + {'Key': 'key2'}, + ] + ) + except ClientError as err: + err.response['Error']['Code'].should.equal('ResourceNotFoundException') + else: + raise RuntimeError('Should of raised ResourceNotFoundException') + + +@mock_acm +def test_list_tags_for_invalid_certificate(): + client = boto3.client('acm', region_name='eu-central-1') + + try: + client.list_tags_for_certificate(CertificateArn=BAD_ARN) + except ClientError as err: + err.response['Error']['Code'].should.equal('ResourceNotFoundException') + else: + raise RuntimeError('Should of raised ResourceNotFoundException') + + +@mock_acm +def test_remove_tags_from_certificate(): + client = boto3.client('acm', region_name='eu-central-1') + arn = _import_cert(client) + + client.add_tags_to_certificate( + CertificateArn=arn, + Tags=[ + {'Key': 'key1', 'Value': 'value1'}, + {'Key': 'key2'}, + {'Key': 'key3', 'Value': 'value3'}, + {'Key': 'key4', 'Value': 'value4'}, + ] + ) + + client.remove_tags_from_certificate( + CertificateArn=arn, + Tags=[ + {'Key': 'key1', 'Value': 'value2'}, # Should not remove as doesnt match + {'Key': 'key2'}, # Single key removal + {'Key': 'key3', 'Value': 'value3'}, # Exact match removal + {'Key': 'key4'} # Partial match removal + ] + ) + + resp = client.list_tags_for_certificate(CertificateArn=arn) + tags = {item['Key']: item.get('Value', '__NONE__') for item in resp['Tags']} + + for key in ('key2', 'key3', 'key4'): + tags.should_not.contain(key) + + tags.should.contain('key1') + + +@mock_acm +def test_remove_tags_from_invalid_certificate(): + client = boto3.client('acm', region_name='eu-central-1') + + try: + client.remove_tags_from_certificate( + CertificateArn=BAD_ARN, + Tags=[ + {'Key': 'key1', 'Value': 'value1'}, + {'Key': 'key2'}, + ] + ) + except ClientError as err: + err.response['Error']['Code'].should.equal('ResourceNotFoundException') + else: + raise RuntimeError('Should of raised ResourceNotFoundException') + + + + + +