Add validation of tags to TaggingService (#4253)

Co-authored-by: Karri Balk <kbalk@users.noreply.github.com>
This commit is contained in:
kbalk 2021-09-01 02:07:01 -04:00 committed by GitHub
parent c707ee002c
commit 17e9a37c81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 208 additions and 31 deletions

View File

@ -328,7 +328,7 @@ class ECRBackend(BaseBackend):
self.registry_policy = None
self.replication_config = {"rules": []}
self.repositories: Dict[str, Repository] = {}
self.tagger = TaggingService(tagName="tags")
self.tagger = TaggingService(tag_name="tags")
def reset(self):
region_name = self.region_name

View File

@ -157,7 +157,7 @@ class KmsBackend(BaseBackend):
def __init__(self):
self.keys = {}
self.key_to_aliases = defaultdict(set)
self.tagger = TaggingService(keyName="TagKey", valueName="TagValue")
self.tagger = TaggingService(key_name="TagKey", value_name="TagValue")
def create_key(
self, policy, key_usage, customer_master_key_spec, description, tags, region

View File

@ -1,76 +1,158 @@
"""Tag functionality contained in class TaggingService."""
import re
class TaggingService:
def __init__(self, tagName="Tags", keyName="Key", valueName="Value"):
self.tagName = tagName
self.keyName = keyName
self.valueName = valueName
"""Functionality related to tags, i.e., adding, deleting, testing."""
def __init__(self, tag_name="Tags", key_name="Key", value_name="Value"):
self.tag_name = tag_name
self.key_name = key_name
self.value_name = value_name
self.tags = {}
def get_tag_dict_for_resource(self, arn):
"""Return dict of key/value pairs vs. list of key/values dicts."""
result = {}
if self.has_tags(arn):
for k, v in self.tags[arn].items():
result[k] = v
for key, val in self.tags[arn].items():
result[key] = val
return result
def list_tags_for_resource(self, arn):
"""Return list of tags inside dict with key of "tag_name".
Useful for describe functions; this return value can be added to
dictionary returned from a describe function.
"""
result = []
if self.has_tags(arn):
for k, v in self.tags[arn].items():
result.append({self.keyName: k, self.valueName: v})
return {self.tagName: result}
for key, val in self.tags[arn].items():
result.append({self.key_name: key, self.value_name: val})
return {self.tag_name: result}
def delete_all_tags_for_resource(self, arn):
"""Delete all tags associated with given ARN."""
if self.has_tags(arn):
del self.tags[arn]
def has_tags(self, arn):
"""Return True if the ARN has any associated tags, False otherwise."""
return arn in self.tags
def tag_resource(self, arn, tags):
"""Store associated list of dicts with ARN.
Note: the storage is internal to this class instance.
"""
if arn not in self.tags:
self.tags[arn] = {}
for t in tags:
if self.valueName in t:
self.tags[arn][t[self.keyName]] = t[self.valueName]
for tag in tags:
if self.value_name in tag:
self.tags[arn][tag[self.key_name]] = tag[self.value_name]
else:
self.tags[arn][t[self.keyName]] = None
self.tags[arn][tag[self.key_name]] = None
def copy_tags(self, from_arn, to_arn):
"""Copy stored list of tags associated with one ARN to another ARN.
Note: the storage is internal to this class instance.
"""
if self.has_tags(from_arn):
self.tag_resource(
to_arn, self.list_tags_for_resource(from_arn)[self.tagName]
to_arn, self.list_tags_for_resource(from_arn)[self.tag_name]
)
def untag_resource_using_names(self, arn, tag_names):
"""Remove tags associated with ARN using key names in 'tag_names'."""
for name in tag_names:
if name in self.tags.get(arn, {}):
del self.tags[arn][name]
def untag_resource_using_tags(self, arn, tags):
m = self.tags.get(arn, {})
for t in tags:
if self.keyName in t:
if t[self.keyName] in m:
if self.valueName in t:
if m[t[self.keyName]] != t[self.valueName]:
"""Remove tags associated with ARN using key/value pairs in 'tags'."""
current_tags = self.tags.get(arn, {})
for tag in tags:
if self.key_name in tag:
if tag[self.key_name] in current_tags:
if self.value_name in tag:
if current_tags[tag[self.key_name]] != tag[self.value_name]:
continue
# If both key and value are provided, match both before deletion
del m[t[self.keyName]]
del current_tags[tag[self.key_name]]
def extract_tag_names(self, tags):
"""Return list of key names in list of 'tags' key/value dicts."""
results = []
if len(tags) == 0:
return results
for tag in tags:
if self.keyName in tag:
results.append(tag[self.keyName])
if self.key_name in tag:
results.append(tag[self.key_name])
return results
def flatten_tag_list(self, tags):
"""Return dict of key/value pairs with 'tag_name', 'value_name'."""
result = {}
for t in tags:
if self.valueName in t:
result[t[self.keyName]] = t[self.valueName]
for tag in tags:
if self.value_name in tag:
result[tag[self.key_name]] = tag[self.value_name]
else:
result[t[self.keyName]] = None
result[tag[self.key_name]] = None
return result
def validate_tags(self, tags):
"""Returns error message if tags in 'tags' list of dicts are invalid.
The validation does not include a check for duplicate keys.
Duplicate keys are not always an error and the error message isn't
consistent across services, so this should be a separate check.
"""
errors = []
key_regex = re.compile(r"^(?!aws:)([\w\s\d_.:/=+\-@]*)$")
value_regex = re.compile(r"^([\w\s\d_.:/=+\-@]*)$")
# AWS only outputs one error for all keys and one for all values.
for idx, tag in enumerate(tags, 1):
for tag_key, tag_value in tag.items():
if tag_key == self.key_name:
# Validation for len(tag_key) >= 1 is done by botocore.
if len(tag_value) > 128:
errors.append(
f"Value '{tag_value}' at 'tags.{idx}.member.key' "
f"failed to satisfy constraint: Member must have "
f"length less than or equal to 128"
)
if not re.match(key_regex, tag_value):
errors.append(
f"Value '{tag_value}' at 'tags.{idx}.member.key' "
f"failed to satisfy constraint: Member must "
f"satisfy regular expression pattern: "
r"^(?!aws:)[{a-zA-Z0-9 }_.://=+-@%]*$"
)
elif tag_key == self.value_name:
# Validation for len(tag_value) >= 0 is nonsensical.
if len(tag_value) > 256:
errors.append(
f"Value '{tag_value}' at 'tags.{idx}.member.value' "
f"failed to satisfy constraint: Member must have "
f"length less than or equal to 256"
# Member must have length greater than or equal to 0, "
)
if not re.match(value_regex, tag_value):
errors.append(
f"Value '{tag_value}' at 'tags.{idx}.member.value' "
f"failed to satisfy constraint: Member must satisfy "
f"regular expression pattern: "
r"^[{a-zA-Z0-9 }_.://=+-@%]*$"
)
errors_len = len(errors)
return (
(
f"{errors_len} validation error{'s' if len(errors) > 1 else ''} "
f"detected: {'; '.join(errors)}"
)
if errors
else ""
)

View File

@ -1,5 +1,4 @@
import sure
"""Unit tests for the TaggingService class."""
from moto.utilities.tagging_service import TaggingService
@ -108,3 +107,99 @@ def test_copy_existing_arn():
actual.should.equal(
[{"Key": "key1", "Value": "value1"}, {"Key": "key2", "Value": "value2"}]
)
def test_validate_tags():
"""Unit tests for validate_tags()."""
svc = TaggingService()
# Key with invalid characters.
errors = svc.validate_tags([{"Key": "foo!", "Value": "bar"}])
assert (
"Value 'foo!' at 'tags.1.member.key' failed to satisfy constraint: "
"Member must satisfy regular expression pattern"
) in errors
# Value with invalid characters.
errors = svc.validate_tags([{"Key": "foo", "Value": "bar!"}])
assert (
"Value 'bar!' at 'tags.1.member.value' failed to satisfy "
"constraint: Member must satisfy regular expression pattern"
) in errors
# Key too long.
errors = svc.validate_tags(
[
{
"Key": (
"12345678901234567890123456789012345678901234567890"
"12345678901234567890123456789012345678901234567890"
"123456789012345678901234567890"
),
"Value": "foo",
}
]
)
assert (
"at 'tags.1.member.key' failed to satisfy constraint: Member must "
"have length less than or equal to 128"
) in errors
# Value too long.
errors = svc.validate_tags(
[
{
"Key": "foo",
"Value": (
"12345678901234567890123456789012345678901234567890"
"12345678901234567890123456789012345678901234567890"
"12345678901234567890123456789012345678901234567890"
"12345678901234567890123456789012345678901234567890"
"12345678901234567890123456789012345678901234567890"
"1234567890"
),
},
]
)
assert (
"at 'tags.1.member.value' failed to satisfy constraint: Member "
"must have length less than or equal to 256"
) in errors
# Compound errors.
errors = svc.validate_tags(
[
{
"Key": (
"12345678901234567890123456789012345678901234567890"
"12345678901234567890123456789012345678901234567890"
"123456789012345678901234567890"
),
"Value": (
"12345678901234567890123456789012345678901234567890"
"12345678901234567890123456789012345678901234567890"
"12345678901234567890123456789012345678901234567890"
"12345678901234567890123456789012345678901234567890"
"12345678901234567890123456789012345678901234567890"
"1234567890"
),
},
{"Key": "foo!", "Value": "bar!"},
]
)
assert "4 validation errors detected" in errors
assert (
"at 'tags.1.member.key' failed to satisfy constraint: Member must "
"have length less than or equal to 128"
) in errors
assert (
"at 'tags.1.member.value' failed to satisfy constraint: Member "
"must have length less than or equal to 256"
) in errors
assert (
"Value 'foo!' at 'tags.2.member.key' failed to satisfy constraint: "
"Member must satisfy regular expression pattern"
) in errors
assert (
"Value 'bar!' at 'tags.2.member.value' failed to satisfy "
"constraint: Member must satisfy regular expression pattern"
) in errors