diff --git a/moto/acm/models.py b/moto/acm/models.py index 0608400f7..6e4ac1508 100644 --- a/moto/acm/models.py +++ b/moto/acm/models.py @@ -70,6 +70,68 @@ class AWSResourceNotFoundException(AWSError): TYPE = "ResourceNotFoundException" +class AWSTooManyTagsException(AWSError): + TYPE = "TooManyTagsException" + + +class TagHolder(dict): + MAX_TAG_COUNT = 50 + MAX_KEY_LENGTH = 128 + MAX_VALUE_LENGTH = 256 + + def _validate_kv(self, key, value, index): + if len(key) > self.MAX_KEY_LENGTH: + raise AWSValidationException( + "Value '%s' at 'tags.%d.member.key' failed to satisfy constraint: Member must have length less than or equal to %s" + % (key, index, self.MAX_KEY_LENGTH) + ) + if value and len(value) > self.MAX_VALUE_LENGTH: + raise AWSValidationException( + "Value '%s' at 'tags.%d.member.value' failed to satisfy constraint: Member must have length less than or equal to %s" + % (value, index, self.MAX_VALUE_LENGTH) + ) + if key.startswith("aws:"): + raise AWSValidationException( + 'Invalid Tag Key: "%s". AWS internal tags cannot be changed with this API' + % key + ) + + def add(self, tags): + tags_copy = self.copy() + for i, tag in enumerate(tags): + key = tag["Key"] + value = tag.get("Value", None) + self._validate_kv(key, value, i + 1) + + tags_copy[key] = value + if len(tags_copy) > self.MAX_TAG_COUNT: + raise AWSTooManyTagsException( + "the TagSet: '{%s}' contains too many Tags" + % ", ".join(k + "=" + str(v or "") for k, v in tags_copy.items()) + ) + + self.update(tags_copy) + + def remove(self, tags): + for i, tag in enumerate(tags): + key = tag["Key"] + value = tag.get("Value", None) + self._validate_kv(key, value, i + 1) + try: + # If value isnt provided, just delete key + if value is None: + del self[key] + # If value is provided, only delete if it matches what already exists + elif self[key] == value: + del self[key] + except KeyError: + pass + + def equals(self, tags): + tags = {t["Key"]: t.get("Value", None) for t in tags} if tags else {} + return self == tags + + class CertBundle(BaseModel): def __init__( self, @@ -88,7 +150,7 @@ class CertBundle(BaseModel): self.key = private_key self._key = None self.chain = chain - self.tags = {} + self.tags = TagHolder() self._chain = None self.type = cert_type # Should really be an enum self.status = cert_status # Should really be an enum @@ -388,7 +450,7 @@ class AWSCertificateManagerBackend(BaseBackend): "expires": datetime.datetime.now() + datetime.timedelta(hours=1), } - def import_cert(self, certificate, private_key, chain=None, arn=None): + def import_cert(self, certificate, private_key, chain=None, arn=None, tags=None): if arn is not None: if arn not in self._certificates: raise self._arn_not_found(arn) @@ -403,6 +465,9 @@ class AWSCertificateManagerBackend(BaseBackend): self._certificates[bundle.arn] = bundle + if tags: + self.add_tags_to_certificate(bundle.arn, tags) + return bundle.arn def get_certificates_list(self, statuses): @@ -437,10 +502,11 @@ class AWSCertificateManagerBackend(BaseBackend): domain_validation_options, idempotency_token, subject_alt_names, + tags=None, ): if idempotency_token is not None: arn = self._get_arn_from_idempotency_token(idempotency_token) - if arn is not None: + if arn and self._certificates[arn].tags.equals(tags): return arn cert = CertBundle.generate_cert( @@ -450,34 +516,20 @@ class AWSCertificateManagerBackend(BaseBackend): self._set_idempotency_token_arn(idempotency_token, cert.arn) self._certificates[cert.arn] = cert + if tags: + cert.tags.add(tags) + return cert.arn def add_tags_to_certificate(self, arn, tags): # get_cert does arn check cert_bundle = self.get_certificate(arn) - - for tag in tags: - key = tag["Key"] - value = tag.get("Value", None) - cert_bundle.tags[key] = value + cert_bundle.tags.add(tags) def remove_tags_from_certificate(self, arn, tags): # get_cert does arn check cert_bundle = self.get_certificate(arn) - - for tag in tags: - key = tag["Key"] - value = tag.get("Value", None) - - try: - # If value isnt provided, just delete key - if value is None: - del cert_bundle.tags[key] - # If value is provided, only delete if it matches what already exists - elif cert_bundle.tags[key] == value: - del cert_bundle.tags[key] - except KeyError: - pass + cert_bundle.tags.remove(tags) acm_backends = {} diff --git a/moto/acm/responses.py b/moto/acm/responses.py index 13b22fa95..0908c6ff7 100644 --- a/moto/acm/responses.py +++ b/moto/acm/responses.py @@ -117,6 +117,7 @@ class AWSCertificateManagerResponse(BaseResponse): private_key = self._get_param("PrivateKey") chain = self._get_param("CertificateChain") # Optional current_arn = self._get_param("CertificateArn") # Optional + tags = self._get_param("Tags") # Optional # Simple parameter decoding. Rather do it here as its a data transport decision not part of the # actual data @@ -142,7 +143,7 @@ class AWSCertificateManagerResponse(BaseResponse): try: arn = self.acm_backend.import_cert( - certificate, private_key, chain=chain, arn=current_arn + certificate, private_key, chain=chain, arn=current_arn, tags=tags ) except AWSError as err: return err.response() @@ -210,6 +211,7 @@ class AWSCertificateManagerResponse(BaseResponse): ) # is ignored atm idempotency_token = self._get_param("IdempotencyToken") subject_alt_names = self._get_param("SubjectAlternativeNames") + tags = self._get_param("Tags") # Optional if subject_alt_names is not None and len(subject_alt_names) > 10: # There is initial AWS limit of 10 @@ -227,6 +229,7 @@ class AWSCertificateManagerResponse(BaseResponse): domain_validation_options, idempotency_token, subject_alt_names, + tags, ) except AWSError as err: return err.response() diff --git a/tests/test_acm/test_acm.py b/tests/test_acm/test_acm.py index 017d166dd..790ae4430 100644 --- a/tests/test_acm/test_acm.py +++ b/tests/test_acm/test_acm.py @@ -11,6 +11,7 @@ from botocore.exceptions import ClientError from moto import mock_acm from moto.core import ACCOUNT_ID +from nose.tools import assert_raises RESOURCE_FOLDER = os.path.join(os.path.dirname(__file__), "resources") _GET_RESOURCE = lambda x: open(os.path.join(RESOURCE_FOLDER, x), "rb").read() @@ -46,6 +47,30 @@ def test_import_certificate(): resp.should.contain("CertificateChain") +@mock_acm +def test_import_certificate_with_tags(): + client = boto3.client("acm", region_name="eu-central-1") + + resp = client.import_certificate( + Certificate=SERVER_CRT, + PrivateKey=SERVER_KEY, + CertificateChain=CA_CRT, + Tags=[{"Key": "Environment", "Value": "QA"}, {"Key": "KeyOnly"},], + ) + arn = resp["CertificateArn"] + + resp = client.get_certificate(CertificateArn=arn) + resp["Certificate"].should.equal(SERVER_CRT.decode()) + resp.should.contain("CertificateChain") + + resp = client.list_tags_for_certificate(CertificateArn=arn) + tags = {item["Key"]: item.get("Value", "__NONE__") for item in resp["Tags"]} + tags.should.contain("Environment") + tags.should.contain("KeyOnly") + tags["Environment"].should.equal("QA") + tags["KeyOnly"].should.equal("__NONE__") + + @mock_acm def test_import_bad_certificate(): client = boto3.client("acm", region_name="eu-central-1") @@ -313,6 +338,150 @@ def test_request_certificate(): resp["CertificateArn"].should.equal(arn) +@mock_acm +def test_request_certificate_with_tags(): + client = boto3.client("acm", region_name="eu-central-1") + + token = str(uuid.uuid4()) + + resp = client.request_certificate( + DomainName="google.com", + IdempotencyToken=token, + SubjectAlternativeNames=["google.com", "www.google.com", "mail.google.com"], + Tags=[ + {"Key": "Environment", "Value": "QA"}, + {"Key": "WithEmptyStr", "Value": ""}, + ], + ) + resp.should.contain("CertificateArn") + arn_1 = resp["CertificateArn"] + + resp = client.list_tags_for_certificate(CertificateArn=arn_1) + tags = {item["Key"]: item.get("Value", "__NONE__") for item in resp["Tags"]} + tags.should.have.length_of(2) + tags["Environment"].should.equal("QA") + tags["WithEmptyStr"].should.equal("") + + # Request certificate for "google.com" with same IdempotencyToken but with different Tags + resp = client.request_certificate( + DomainName="google.com", + IdempotencyToken=token, + SubjectAlternativeNames=["google.com", "www.google.com", "mail.google.com"], + Tags=[{"Key": "Environment", "Value": "Prod"}, {"Key": "KeyOnly"},], + ) + arn_2 = resp["CertificateArn"] + + assert arn_1 != arn_2 # if tags are matched, ACM would have returned same arn + + resp = client.list_tags_for_certificate(CertificateArn=arn_2) + tags = {item["Key"]: item.get("Value", "__NONE__") for item in resp["Tags"]} + tags.should.have.length_of(2) + tags["Environment"].should.equal("Prod") + tags["KeyOnly"].should.equal("__NONE__") + + resp = client.request_certificate( + DomainName="google.com", + IdempotencyToken=token, + SubjectAlternativeNames=["google.com", "www.google.com", "mail.google.com"], + Tags=[ + {"Key": "Environment", "Value": "QA"}, + {"Key": "WithEmptyStr", "Value": ""}, + ], + ) + + +@mock_acm +def test_operations_with_invalid_tags(): + client = boto3.client("acm", region_name="eu-central-1") + + # request certificate with invalid tags + with assert_raises(ClientError) as ex: + client.request_certificate( + DomainName="example.com", Tags=[{"Key": "X" * 200, "Value": "Valid"}], + ) + ex.exception.response["Error"]["Code"].should.equal("ValidationException") + ex.exception.response["Error"]["Message"].should.contain( + "Member must have length less than or equal to 128" + ) + + # import certificate with invalid tags + with assert_raises(ClientError) as ex: + client.import_certificate( + Certificate=SERVER_CRT, + PrivateKey=SERVER_KEY, + CertificateChain=CA_CRT, + Tags=[ + {"Key": "Valid", "Value": "X" * 300}, + {"Key": "aws:xx", "Value": "Valid"}, + ], + ) + + ex.exception.response["Error"]["Code"].should.equal("ValidationException") + ex.exception.response["Error"]["Message"].should.contain( + "Member must have length less than or equal to 256" + ) + + arn = _import_cert(client) + + # add invalid tags to existing certificate + with assert_raises(ClientError) as ex: + client.add_tags_to_certificate( + CertificateArn=arn, + Tags=[{"Key": "aws:xxx", "Value": "Valid"}, {"Key": "key2"}], + ) + ex.exception.response["Error"]["Code"].should.equal("ValidationException") + ex.exception.response["Error"]["Message"].should.contain( + "AWS internal tags cannot be changed with this API" + ) + + # try removing invalid tags from existing certificate + with assert_raises(ClientError) as ex: + client.remove_tags_from_certificate( + CertificateArn=arn, Tags=[{"Key": "aws:xxx", "Value": "Valid"}] + ) + ex.exception.response["Error"]["Code"].should.equal("ValidationException") + ex.exception.response["Error"]["Message"].should.contain( + "AWS internal tags cannot be changed with this API" + ) + + +@mock_acm +def test_add_too_many_tags(): + client = boto3.client("acm", region_name="eu-central-1") + arn = _import_cert(client) + + # Add 51 tags + with assert_raises(ClientError) as ex: + client.add_tags_to_certificate( + CertificateArn=arn, + Tags=[{"Key": "a-%d" % i, "Value": "abcd"} for i in range(1, 52)], + ) + ex.exception.response["Error"]["Code"].should.equal("TooManyTagsException") + ex.exception.response["Error"]["Message"].should.contain("contains too many Tags") + client.list_tags_for_certificate(CertificateArn=arn)["Tags"].should.have.empty + + # Add 49 tags first, then try to add 2 more. + client.add_tags_to_certificate( + CertificateArn=arn, + Tags=[{"Key": "p-%d" % i, "Value": "pqrs"} for i in range(1, 50)], + ) + client.list_tags_for_certificate(CertificateArn=arn)["Tags"].should.have.length_of( + 49 + ) + with assert_raises(ClientError) as ex: + client.add_tags_to_certificate( + CertificateArn=arn, + Tags=[{"Key": "x-1", "Value": "xyz"}, {"Key": "x-2", "Value": "xyz"}], + ) + ex.exception.response["Error"]["Code"].should.equal("TooManyTagsException") + ex.exception.response["Error"]["Message"].should.contain("contains too many Tags") + ex.exception.response["Error"]["Message"].count("pqrs").should.equal(49) + ex.exception.response["Error"]["Message"].count("xyz").should.equal(2) + client.list_tags_for_certificate(CertificateArn=arn)["Tags"].should.have.length_of( + 49 + ) + + @mock_acm def test_request_certificate_no_san(): client = boto3.client("acm", region_name="eu-central-1")