from __future__ import unicode_literals import os import uuid import boto3 import pytest import sure # noqa from botocore.exceptions import ClientError from freezegun import freeze_time from moto import mock_acm, settings from moto.core import ACCOUNT_ID from unittest import SkipTest RESOURCE_FOLDER = os.path.join(os.path.dirname(__file__), "resources") _GET_RESOURCE = lambda x: open(os.path.join(RESOURCE_FOLDER, x), "rb").read() CA_CRT = _GET_RESOURCE("ca.pem") CA_KEY = _GET_RESOURCE("ca.key") SERVER_CRT = _GET_RESOURCE("star_moto_com.pem") SERVER_COMMON_NAME = "*.moto.com" SERVER_CRT_BAD = _GET_RESOURCE("star_moto_com-bad.pem") SERVER_KEY = _GET_RESOURCE("star_moto_com.key") BAD_ARN = "arn:aws:acm:us-east-2:{}:certificate/_0000000-0000-0000-0000-000000000000".format( ACCOUNT_ID ) def _import_cert(client): response = client.import_certificate( Certificate=SERVER_CRT, PrivateKey=SERVER_KEY, CertificateChain=CA_CRT ) return response["CertificateArn"] # Also tests GetCertificate @mock_acm def test_import_certificate(): client = boto3.client("acm", region_name="eu-central-1") resp = client.import_certificate( Certificate=SERVER_CRT, PrivateKey=SERVER_KEY, CertificateChain=CA_CRT ) resp = client.get_certificate(CertificateArn=resp["CertificateArn"]) resp["Certificate"].should.equal(SERVER_CRT.decode()) resp.should.contain("CertificateChain") @mock_acm def test_import_certificate_with_tags(): client = boto3.client("acm", region_name="eu-central-1") resp = client.import_certificate( Certificate=SERVER_CRT, PrivateKey=SERVER_KEY, CertificateChain=CA_CRT, Tags=[{"Key": "Environment", "Value": "QA"}, {"Key": "KeyOnly"},], ) arn = resp["CertificateArn"] resp = client.get_certificate(CertificateArn=arn) resp["Certificate"].should.equal(SERVER_CRT.decode()) resp.should.contain("CertificateChain") resp = client.list_tags_for_certificate(CertificateArn=arn) tags = {item["Key"]: item.get("Value", "__NONE__") for item in resp["Tags"]} tags.should.contain("Environment") tags.should.contain("KeyOnly") tags["Environment"].should.equal("QA") tags["KeyOnly"].should.equal("__NONE__") @mock_acm def test_import_bad_certificate(): client = boto3.client("acm", region_name="eu-central-1") try: client.import_certificate(Certificate=SERVER_CRT_BAD, PrivateKey=SERVER_KEY) except ClientError as err: err.response["Error"]["Code"].should.equal("ValidationException") else: raise RuntimeError("Should of raised ValidationException") @mock_acm def test_list_certificates(): client = boto3.client("acm", region_name="eu-central-1") arn = _import_cert(client) resp = client.list_certificates() len(resp["CertificateSummaryList"]).should.equal(1) resp["CertificateSummaryList"][0]["CertificateArn"].should.equal(arn) resp["CertificateSummaryList"][0]["DomainName"].should.equal(SERVER_COMMON_NAME) @mock_acm def test_list_certificates_by_status(): client = boto3.client("acm", region_name="eu-central-1") issued_arn = _import_cert(client) pending_arn = client.request_certificate(DomainName="google.com")["CertificateArn"] resp = client.list_certificates() len(resp["CertificateSummaryList"]).should.equal(2) resp = client.list_certificates(CertificateStatuses=["EXPIRED", "INACTIVE"]) len(resp["CertificateSummaryList"]).should.equal(0) resp = client.list_certificates(CertificateStatuses=["PENDING_VALIDATION"]) len(resp["CertificateSummaryList"]).should.equal(1) resp["CertificateSummaryList"][0]["CertificateArn"].should.equal(pending_arn) resp = client.list_certificates(CertificateStatuses=["ISSUED"]) len(resp["CertificateSummaryList"]).should.equal(1) resp["CertificateSummaryList"][0]["CertificateArn"].should.equal(issued_arn) resp = client.list_certificates( CertificateStatuses=["ISSUED", "PENDING_VALIDATION"] ) len(resp["CertificateSummaryList"]).should.equal(2) arns = {cert["CertificateArn"] for cert in resp["CertificateSummaryList"]} arns.should.contain(issued_arn) arns.should.contain(pending_arn) @mock_acm def test_get_invalid_certificate(): client = boto3.client("acm", region_name="eu-central-1") try: client.get_certificate(CertificateArn=BAD_ARN) except ClientError as err: err.response["Error"]["Code"].should.equal("ResourceNotFoundException") else: raise RuntimeError("Should of raised ResourceNotFoundException") # Also tests deleting invalid certificate @mock_acm def test_delete_certificate(): client = boto3.client("acm", region_name="eu-central-1") arn = _import_cert(client) # If it does not raise an error and the next call does, all is fine client.delete_certificate(CertificateArn=arn) try: client.delete_certificate(CertificateArn=arn) except ClientError as err: err.response["Error"]["Code"].should.equal("ResourceNotFoundException") else: raise RuntimeError("Should of raised ResourceNotFoundException") @mock_acm def test_describe_certificate(): client = boto3.client("acm", region_name="eu-central-1") arn = _import_cert(client) resp = client.describe_certificate(CertificateArn=arn) resp["Certificate"]["CertificateArn"].should.equal(arn) resp["Certificate"]["DomainName"].should.equal(SERVER_COMMON_NAME) resp["Certificate"]["Issuer"].should.equal("Moto") resp["Certificate"]["KeyAlgorithm"].should.equal("RSA_2048") resp["Certificate"]["Status"].should.equal("ISSUED") resp["Certificate"]["Type"].should.equal("IMPORTED") @mock_acm def test_describe_certificate_with_bad_arn(): client = boto3.client("acm", region_name="eu-central-1") try: client.describe_certificate(CertificateArn=BAD_ARN) except ClientError as err: err.response["Error"]["Code"].should.equal("ResourceNotFoundException") else: raise RuntimeError("Should of raised ResourceNotFoundException") # Also tests ListTagsForCertificate @mock_acm def test_add_tags_to_certificate(): client = boto3.client("acm", region_name="eu-central-1") arn = _import_cert(client) client.add_tags_to_certificate( CertificateArn=arn, Tags=[{"Key": "key1", "Value": "value1"}, {"Key": "key2"}] ) resp = client.list_tags_for_certificate(CertificateArn=arn) tags = {item["Key"]: item.get("Value", "__NONE__") for item in resp["Tags"]} tags.should.contain("key1") tags.should.contain("key2") tags["key1"].should.equal("value1") # This way, it ensures that we can detect if None is passed back when it shouldnt, # as we store keys without values with a value of None, but it shouldnt be passed back tags["key2"].should.equal("__NONE__") @mock_acm def test_add_tags_to_invalid_certificate(): client = boto3.client("acm", region_name="eu-central-1") try: client.add_tags_to_certificate( CertificateArn=BAD_ARN, Tags=[{"Key": "key1", "Value": "value1"}, {"Key": "key2"}], ) except ClientError as err: err.response["Error"]["Code"].should.equal("ResourceNotFoundException") else: raise RuntimeError("Should of raised ResourceNotFoundException") @mock_acm def test_list_tags_for_invalid_certificate(): client = boto3.client("acm", region_name="eu-central-1") try: client.list_tags_for_certificate(CertificateArn=BAD_ARN) except ClientError as err: err.response["Error"]["Code"].should.equal("ResourceNotFoundException") else: raise RuntimeError("Should of raised ResourceNotFoundException") @mock_acm def test_remove_tags_from_certificate(): client = boto3.client("acm", region_name="eu-central-1") arn = _import_cert(client) client.add_tags_to_certificate( CertificateArn=arn, Tags=[ {"Key": "key1", "Value": "value1"}, {"Key": "key2"}, {"Key": "key3", "Value": "value3"}, {"Key": "key4", "Value": "value4"}, ], ) client.remove_tags_from_certificate( CertificateArn=arn, Tags=[ {"Key": "key1", "Value": "value2"}, # Should not remove as doesnt match {"Key": "key2"}, # Single key removal {"Key": "key3", "Value": "value3"}, # Exact match removal {"Key": "key4"}, # Partial match removal ], ) resp = client.list_tags_for_certificate(CertificateArn=arn) tags = {item["Key"]: item.get("Value", "__NONE__") for item in resp["Tags"]} for key in ("key2", "key3", "key4"): tags.should_not.contain(key) tags.should.contain("key1") @mock_acm def test_remove_tags_from_invalid_certificate(): client = boto3.client("acm", region_name="eu-central-1") try: client.remove_tags_from_certificate( CertificateArn=BAD_ARN, Tags=[{"Key": "key1", "Value": "value1"}, {"Key": "key2"}], ) except ClientError as err: err.response["Error"]["Code"].should.equal("ResourceNotFoundException") else: raise RuntimeError("Should of raised ResourceNotFoundException") @mock_acm def test_resend_validation_email(): client = boto3.client("acm", region_name="eu-central-1") arn = _import_cert(client) client.resend_validation_email( CertificateArn=arn, Domain="*.moto.com", ValidationDomain="NOTUSEDYET" ) # Returns nothing, boto would raise Exceptions otherwise @mock_acm def test_resend_validation_email_invalid(): client = boto3.client("acm", region_name="eu-central-1") arn = _import_cert(client) try: client.resend_validation_email( CertificateArn=arn, Domain="no-match.moto.com", ValidationDomain="NOTUSEDYET", ) except ClientError as err: err.response["Error"]["Code"].should.equal( "InvalidDomainValidationOptionsException" ) else: raise RuntimeError("Should of raised InvalidDomainValidationOptionsException") try: client.resend_validation_email( CertificateArn=BAD_ARN, Domain="no-match.moto.com", ValidationDomain="NOTUSEDYET", ) except ClientError as err: err.response["Error"]["Code"].should.equal("ResourceNotFoundException") else: raise RuntimeError("Should of raised ResourceNotFoundException") @mock_acm def test_request_certificate(): client = boto3.client("acm", region_name="eu-central-1") token = str(uuid.uuid4()) resp = client.request_certificate( DomainName="google.com", IdempotencyToken=token, SubjectAlternativeNames=["google.com", "www.google.com", "mail.google.com"], ) resp.should.contain("CertificateArn") arn = resp["CertificateArn"] arn.should.match(r"arn:aws:acm:eu-central-1:\d{12}:certificate/") resp = client.request_certificate( DomainName="google.com", IdempotencyToken=token, SubjectAlternativeNames=["google.com", "www.google.com", "mail.google.com"], ) resp["CertificateArn"].should.equal(arn) @mock_acm def test_request_certificate_with_tags(): client = boto3.client("acm", region_name="eu-central-1") token = str(uuid.uuid4()) resp = client.request_certificate( DomainName="google.com", IdempotencyToken=token, SubjectAlternativeNames=["google.com", "www.google.com", "mail.google.com"], Tags=[ {"Key": "Environment", "Value": "QA"}, {"Key": "WithEmptyStr", "Value": ""}, ], ) resp.should.contain("CertificateArn") arn_1 = resp["CertificateArn"] resp = client.list_tags_for_certificate(CertificateArn=arn_1) tags = {item["Key"]: item.get("Value", "__NONE__") for item in resp["Tags"]} tags.should.have.length_of(2) tags["Environment"].should.equal("QA") tags["WithEmptyStr"].should.equal("") # Request certificate for "google.com" with same IdempotencyToken but with different Tags resp = client.request_certificate( DomainName="google.com", IdempotencyToken=token, SubjectAlternativeNames=["google.com", "www.google.com", "mail.google.com"], Tags=[{"Key": "Environment", "Value": "Prod"}, {"Key": "KeyOnly"},], ) arn_2 = resp["CertificateArn"] assert arn_1 != arn_2 # if tags are matched, ACM would have returned same arn resp = client.list_tags_for_certificate(CertificateArn=arn_2) tags = {item["Key"]: item.get("Value", "__NONE__") for item in resp["Tags"]} tags.should.have.length_of(2) tags["Environment"].should.equal("Prod") tags["KeyOnly"].should.equal("__NONE__") resp = client.request_certificate( DomainName="google.com", IdempotencyToken=token, SubjectAlternativeNames=["google.com", "www.google.com", "mail.google.com"], Tags=[ {"Key": "Environment", "Value": "QA"}, {"Key": "WithEmptyStr", "Value": ""}, ], ) @mock_acm def test_operations_with_invalid_tags(): client = boto3.client("acm", region_name="eu-central-1") # request certificate with invalid tags with pytest.raises(ClientError) as ex: client.request_certificate( DomainName="example.com", Tags=[{"Key": "X" * 200, "Value": "Valid"}], ) ex.value.response["Error"]["Code"].should.equal("ValidationException") ex.value.response["Error"]["Message"].should.contain( "Member must have length less than or equal to 128" ) # import certificate with invalid tags with pytest.raises(ClientError) as ex: client.import_certificate( Certificate=SERVER_CRT, PrivateKey=SERVER_KEY, CertificateChain=CA_CRT, Tags=[ {"Key": "Valid", "Value": "X" * 300}, {"Key": "aws:xx", "Value": "Valid"}, ], ) ex.value.response["Error"]["Code"].should.equal("ValidationException") ex.value.response["Error"]["Message"].should.contain( "Member must have length less than or equal to 256" ) arn = _import_cert(client) # add invalid tags to existing certificate with pytest.raises(ClientError) as ex: client.add_tags_to_certificate( CertificateArn=arn, Tags=[{"Key": "aws:xxx", "Value": "Valid"}, {"Key": "key2"}], ) ex.value.response["Error"]["Code"].should.equal("ValidationException") ex.value.response["Error"]["Message"].should.contain( "AWS internal tags cannot be changed with this API" ) # try removing invalid tags from existing certificate with pytest.raises(ClientError) as ex: client.remove_tags_from_certificate( CertificateArn=arn, Tags=[{"Key": "aws:xxx", "Value": "Valid"}] ) ex.value.response["Error"]["Code"].should.equal("ValidationException") ex.value.response["Error"]["Message"].should.contain( "AWS internal tags cannot be changed with this API" ) @mock_acm def test_add_too_many_tags(): client = boto3.client("acm", region_name="eu-central-1") arn = _import_cert(client) # Add 51 tags with pytest.raises(ClientError) as ex: client.add_tags_to_certificate( CertificateArn=arn, Tags=[{"Key": "a-%d" % i, "Value": "abcd"} for i in range(1, 52)], ) ex.value.response["Error"]["Code"].should.equal("TooManyTagsException") ex.value.response["Error"]["Message"].should.contain("contains too many Tags") client.list_tags_for_certificate(CertificateArn=arn)["Tags"].should.have.empty # Add 49 tags first, then try to add 2 more. client.add_tags_to_certificate( CertificateArn=arn, Tags=[{"Key": "p-%d" % i, "Value": "pqrs"} for i in range(1, 50)], ) client.list_tags_for_certificate(CertificateArn=arn)["Tags"].should.have.length_of( 49 ) with pytest.raises(ClientError) as ex: client.add_tags_to_certificate( CertificateArn=arn, Tags=[{"Key": "x-1", "Value": "xyz"}, {"Key": "x-2", "Value": "xyz"}], ) ex.value.response["Error"]["Code"].should.equal("TooManyTagsException") ex.value.response["Error"]["Message"].should.contain("contains too many Tags") ex.value.response["Error"]["Message"].count("pqrs").should.equal(49) ex.value.response["Error"]["Message"].count("xyz").should.equal(2) client.list_tags_for_certificate(CertificateArn=arn)["Tags"].should.have.length_of( 49 ) @mock_acm def test_request_certificate_no_san(): client = boto3.client("acm", region_name="eu-central-1") resp = client.request_certificate(DomainName="google.com") resp.should.contain("CertificateArn") resp2 = client.describe_certificate(CertificateArn=resp["CertificateArn"]) resp2.should.contain("Certificate") # Also tests the SAN code @mock_acm def test_request_certificate_issued_status(): # After requesting a certificate, it should then auto-validate after 1 minute # Some sneaky programming for that ;-) client = boto3.client("acm", region_name="eu-central-1") with freeze_time("2012-01-01 12:00:00"): resp = client.request_certificate( DomainName="google.com", SubjectAlternativeNames=["google.com", "www.google.com", "mail.google.com"], ) arn = resp["CertificateArn"] with freeze_time("2012-01-01 12:00:00"): resp = client.describe_certificate(CertificateArn=arn) resp["Certificate"]["CertificateArn"].should.equal(arn) resp["Certificate"]["DomainName"].should.equal("google.com") resp["Certificate"]["Issuer"].should.equal("Amazon") resp["Certificate"]["KeyAlgorithm"].should.equal("RSA_2048") resp["Certificate"]["Status"].should.equal("PENDING_VALIDATION") resp["Certificate"]["Type"].should.equal("AMAZON_ISSUED") len(resp["Certificate"]["SubjectAlternativeNames"]).should.equal(3) # validation will be pending for 1 minute. with freeze_time("2012-01-01 12:00:00"): resp = client.describe_certificate(CertificateArn=arn) resp["Certificate"]["CertificateArn"].should.equal(arn) resp["Certificate"]["Status"].should.equal("PENDING_VALIDATION") if not settings.TEST_SERVER_MODE: # Move time to get it issued. with freeze_time("2012-01-01 12:02:00"): resp = client.describe_certificate(CertificateArn=arn) resp["Certificate"]["CertificateArn"].should.equal(arn) resp["Certificate"]["Status"].should.equal("ISSUED") @mock_acm def test_request_certificate_with_mutiple_times(): if settings.TEST_SERVER_MODE: raise SkipTest("Cant manipulate time in server mode") # After requesting a certificate, it should then auto-validate after 1 minute # Some sneaky programming for that ;-) client = boto3.client("acm", region_name="eu-central-1") with freeze_time("2012-01-01 12:00:00"): resp = client.request_certificate( IdempotencyToken="test_token", DomainName="google.com", SubjectAlternativeNames=["google.com", "www.google.com", "mail.google.com"], ) original_arn = resp["CertificateArn"] # Should be able to request a certificate multiple times in an hour # after that it makes a new one for time_intervals in ( "2012-01-01 12:15:00", "2012-01-01 12:30:00", "2012-01-01 12:45:00", ): with freeze_time(time_intervals): resp = client.request_certificate( IdempotencyToken="test_token", DomainName="google.com", SubjectAlternativeNames=[ "google.com", "www.google.com", "mail.google.com", ], ) arn = resp["CertificateArn"] arn.should.equal(original_arn) # Move time with freeze_time("2012-01-01 13:01:00"): resp = client.request_certificate( IdempotencyToken="test_token", DomainName="google.com", SubjectAlternativeNames=["google.com", "www.google.com", "mail.google.com"], ) arn = resp["CertificateArn"] arn.should_not.equal(original_arn)