diff --git a/moto/ecr/models.py b/moto/ecr/models.py index 9baea9c0f..ed6bb2e57 100644 --- a/moto/ecr/models.py +++ b/moto/ecr/models.py @@ -328,7 +328,7 @@ class ECRBackend(BaseBackend): self.registry_policy = None self.replication_config = {"rules": []} self.repositories: Dict[str, Repository] = {} - self.tagger = TaggingService(tagName="tags") + self.tagger = TaggingService(tag_name="tags") def reset(self): region_name = self.region_name diff --git a/moto/kms/models.py b/moto/kms/models.py index d30a6ef3c..54bf37231 100644 --- a/moto/kms/models.py +++ b/moto/kms/models.py @@ -157,7 +157,7 @@ class KmsBackend(BaseBackend): def __init__(self): self.keys = {} self.key_to_aliases = defaultdict(set) - self.tagger = TaggingService(keyName="TagKey", valueName="TagValue") + self.tagger = TaggingService(key_name="TagKey", value_name="TagValue") def create_key( self, policy, key_usage, customer_master_key_spec, description, tags, region diff --git a/moto/utilities/tagging_service.py b/moto/utilities/tagging_service.py index 2d6ac99c9..ca33a2dd0 100644 --- a/moto/utilities/tagging_service.py +++ b/moto/utilities/tagging_service.py @@ -1,76 +1,158 @@ +"""Tag functionality contained in class TaggingService.""" +import re + + class TaggingService: - def __init__(self, tagName="Tags", keyName="Key", valueName="Value"): - self.tagName = tagName - self.keyName = keyName - self.valueName = valueName + """Functionality related to tags, i.e., adding, deleting, testing.""" + + def __init__(self, tag_name="Tags", key_name="Key", value_name="Value"): + self.tag_name = tag_name + self.key_name = key_name + self.value_name = value_name self.tags = {} def get_tag_dict_for_resource(self, arn): + """Return dict of key/value pairs vs. list of key/values dicts.""" result = {} if self.has_tags(arn): - for k, v in self.tags[arn].items(): - result[k] = v + for key, val in self.tags[arn].items(): + result[key] = val return result def list_tags_for_resource(self, arn): + """Return list of tags inside dict with key of "tag_name". + + Useful for describe functions; this return value can be added to + dictionary returned from a describe function. + """ result = [] if self.has_tags(arn): - for k, v in self.tags[arn].items(): - result.append({self.keyName: k, self.valueName: v}) - return {self.tagName: result} + for key, val in self.tags[arn].items(): + result.append({self.key_name: key, self.value_name: val}) + return {self.tag_name: result} def delete_all_tags_for_resource(self, arn): + """Delete all tags associated with given ARN.""" if self.has_tags(arn): del self.tags[arn] def has_tags(self, arn): + """Return True if the ARN has any associated tags, False otherwise.""" return arn in self.tags def tag_resource(self, arn, tags): + """Store associated list of dicts with ARN. + + Note: the storage is internal to this class instance. + """ if arn not in self.tags: self.tags[arn] = {} - for t in tags: - if self.valueName in t: - self.tags[arn][t[self.keyName]] = t[self.valueName] + for tag in tags: + if self.value_name in tag: + self.tags[arn][tag[self.key_name]] = tag[self.value_name] else: - self.tags[arn][t[self.keyName]] = None + self.tags[arn][tag[self.key_name]] = None def copy_tags(self, from_arn, to_arn): + """Copy stored list of tags associated with one ARN to another ARN. + + Note: the storage is internal to this class instance. + """ if self.has_tags(from_arn): self.tag_resource( - to_arn, self.list_tags_for_resource(from_arn)[self.tagName] + to_arn, self.list_tags_for_resource(from_arn)[self.tag_name] ) def untag_resource_using_names(self, arn, tag_names): + """Remove tags associated with ARN using key names in 'tag_names'.""" for name in tag_names: if name in self.tags.get(arn, {}): del self.tags[arn][name] def untag_resource_using_tags(self, arn, tags): - m = self.tags.get(arn, {}) - for t in tags: - if self.keyName in t: - if t[self.keyName] in m: - if self.valueName in t: - if m[t[self.keyName]] != t[self.valueName]: + """Remove tags associated with ARN using key/value pairs in 'tags'.""" + current_tags = self.tags.get(arn, {}) + for tag in tags: + if self.key_name in tag: + if tag[self.key_name] in current_tags: + if self.value_name in tag: + if current_tags[tag[self.key_name]] != tag[self.value_name]: continue # If both key and value are provided, match both before deletion - del m[t[self.keyName]] + del current_tags[tag[self.key_name]] def extract_tag_names(self, tags): + """Return list of key names in list of 'tags' key/value dicts.""" results = [] if len(tags) == 0: return results for tag in tags: - if self.keyName in tag: - results.append(tag[self.keyName]) + if self.key_name in tag: + results.append(tag[self.key_name]) return results def flatten_tag_list(self, tags): + """Return dict of key/value pairs with 'tag_name', 'value_name'.""" result = {} - for t in tags: - if self.valueName in t: - result[t[self.keyName]] = t[self.valueName] + for tag in tags: + if self.value_name in tag: + result[tag[self.key_name]] = tag[self.value_name] else: - result[t[self.keyName]] = None + result[tag[self.key_name]] = None return result + + def validate_tags(self, tags): + """Returns error message if tags in 'tags' list of dicts are invalid. + + The validation does not include a check for duplicate keys. + Duplicate keys are not always an error and the error message isn't + consistent across services, so this should be a separate check. + """ + errors = [] + key_regex = re.compile(r"^(?!aws:)([\w\s\d_.:/=+\-@]*)$") + value_regex = re.compile(r"^([\w\s\d_.:/=+\-@]*)$") + + # AWS only outputs one error for all keys and one for all values. + for idx, tag in enumerate(tags, 1): + for tag_key, tag_value in tag.items(): + if tag_key == self.key_name: + # Validation for len(tag_key) >= 1 is done by botocore. + if len(tag_value) > 128: + errors.append( + f"Value '{tag_value}' at 'tags.{idx}.member.key' " + f"failed to satisfy constraint: Member must have " + f"length less than or equal to 128" + ) + if not re.match(key_regex, tag_value): + errors.append( + f"Value '{tag_value}' at 'tags.{idx}.member.key' " + f"failed to satisfy constraint: Member must " + f"satisfy regular expression pattern: " + r"^(?!aws:)[{a-zA-Z0-9 }_.://=+-@%]*$" + ) + elif tag_key == self.value_name: + # Validation for len(tag_value) >= 0 is nonsensical. + if len(tag_value) > 256: + errors.append( + f"Value '{tag_value}' at 'tags.{idx}.member.value' " + f"failed to satisfy constraint: Member must have " + f"length less than or equal to 256" + # Member must have length greater than or equal to 0, " + ) + if not re.match(value_regex, tag_value): + errors.append( + f"Value '{tag_value}' at 'tags.{idx}.member.value' " + f"failed to satisfy constraint: Member must satisfy " + f"regular expression pattern: " + r"^[{a-zA-Z0-9 }_.://=+-@%]*$" + ) + + errors_len = len(errors) + return ( + ( + f"{errors_len} validation error{'s' if len(errors) > 1 else ''} " + f"detected: {'; '.join(errors)}" + ) + if errors + else "" + ) diff --git a/tests/test_utilities/test_tagging_service.py b/tests/test_utilities/test_tagging_service.py index 1eac276a1..2fedb2df1 100644 --- a/tests/test_utilities/test_tagging_service.py +++ b/tests/test_utilities/test_tagging_service.py @@ -1,5 +1,4 @@ -import sure - +"""Unit tests for the TaggingService class.""" from moto.utilities.tagging_service import TaggingService @@ -108,3 +107,99 @@ def test_copy_existing_arn(): actual.should.equal( [{"Key": "key1", "Value": "value1"}, {"Key": "key2", "Value": "value2"}] ) + + +def test_validate_tags(): + """Unit tests for validate_tags().""" + svc = TaggingService() + # Key with invalid characters. + errors = svc.validate_tags([{"Key": "foo!", "Value": "bar"}]) + assert ( + "Value 'foo!' at 'tags.1.member.key' failed to satisfy constraint: " + "Member must satisfy regular expression pattern" + ) in errors + + # Value with invalid characters. + errors = svc.validate_tags([{"Key": "foo", "Value": "bar!"}]) + assert ( + "Value 'bar!' at 'tags.1.member.value' failed to satisfy " + "constraint: Member must satisfy regular expression pattern" + ) in errors + + # Key too long. + errors = svc.validate_tags( + [ + { + "Key": ( + "12345678901234567890123456789012345678901234567890" + "12345678901234567890123456789012345678901234567890" + "123456789012345678901234567890" + ), + "Value": "foo", + } + ] + ) + assert ( + "at 'tags.1.member.key' failed to satisfy constraint: Member must " + "have length less than or equal to 128" + ) in errors + + # Value too long. + errors = svc.validate_tags( + [ + { + "Key": "foo", + "Value": ( + "12345678901234567890123456789012345678901234567890" + "12345678901234567890123456789012345678901234567890" + "12345678901234567890123456789012345678901234567890" + "12345678901234567890123456789012345678901234567890" + "12345678901234567890123456789012345678901234567890" + "1234567890" + ), + }, + ] + ) + assert ( + "at 'tags.1.member.value' failed to satisfy constraint: Member " + "must have length less than or equal to 256" + ) in errors + + # Compound errors. + errors = svc.validate_tags( + [ + { + "Key": ( + "12345678901234567890123456789012345678901234567890" + "12345678901234567890123456789012345678901234567890" + "123456789012345678901234567890" + ), + "Value": ( + "12345678901234567890123456789012345678901234567890" + "12345678901234567890123456789012345678901234567890" + "12345678901234567890123456789012345678901234567890" + "12345678901234567890123456789012345678901234567890" + "12345678901234567890123456789012345678901234567890" + "1234567890" + ), + }, + {"Key": "foo!", "Value": "bar!"}, + ] + ) + assert "4 validation errors detected" in errors + assert ( + "at 'tags.1.member.key' failed to satisfy constraint: Member must " + "have length less than or equal to 128" + ) in errors + assert ( + "at 'tags.1.member.value' failed to satisfy constraint: Member " + "must have length less than or equal to 256" + ) in errors + assert ( + "Value 'foo!' at 'tags.2.member.key' failed to satisfy constraint: " + "Member must satisfy regular expression pattern" + ) in errors + assert ( + "Value 'bar!' at 'tags.2.member.value' failed to satisfy " + "constraint: Member must satisfy regular expression pattern" + ) in errors