Merge pull request #2857 from bblommers/feature/extend_generic_tagger_to_s3

S3 - Extend generic TaggingService to s3
This commit is contained in:
Steve Pulec 2020-04-25 18:41:02 -05:00 committed by GitHub
commit 111d0519f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 147 additions and 140 deletions

View File

@ -145,10 +145,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
# Do S3, resource type s3 # Do S3, resource type s3
if not resource_type_filters or "s3" in resource_type_filters: if not resource_type_filters or "s3" in resource_type_filters:
for bucket in self.s3_backend.buckets.values(): for bucket in self.s3_backend.buckets.values():
tags = [] tags = self.s3_backend.tagger.list_tags_for_resource(bucket.arn)["Tags"]
for tag in bucket.tags.tag_set.tags:
tags.append({"Key": tag.key, "Value": tag.value})
if not tags or not tag_filter( if not tags or not tag_filter(
tags tags
): # Skip if no tags, or invalid filter ): # Skip if no tags, or invalid filter
@ -362,8 +359,9 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
# Do S3, resource type s3 # Do S3, resource type s3
for bucket in self.s3_backend.buckets.values(): for bucket in self.s3_backend.buckets.values():
for tag in bucket.tags.tag_set.tags: tags = self.s3_backend.tagger.get_tag_dict_for_resource(bucket.arn)
yield tag.key for key, _ in tags.items():
yield key
# EC2 tags # EC2 tags
def get_ec2_keys(res_id): def get_ec2_keys(res_id):
@ -414,9 +412,10 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
# Do S3, resource type s3 # Do S3, resource type s3
for bucket in self.s3_backend.buckets.values(): for bucket in self.s3_backend.buckets.values():
for tag in bucket.tags.tag_set.tags: tags = self.s3_backend.tagger.get_tag_dict_for_resource(bucket.arn)
if tag.key == tag_key: for key, value in tags.items():
yield tag.value if key == tag_key:
yield value
# EC2 tags # EC2 tags
def get_ec2_values(res_id): def get_ec2_values(res_id):

View File

@ -23,6 +23,7 @@ from bisect import insort
from moto.core import ACCOUNT_ID, BaseBackend, BaseModel from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_with_milliseconds, rfc_1123_datetime from moto.core.utils import iso_8601_datetime_with_milliseconds, rfc_1123_datetime
from moto.cloudwatch.models import metric_providers, MetricDatum from moto.cloudwatch.models import metric_providers, MetricDatum
from moto.utilities.tagging_service import TaggingService
from .exceptions import ( from .exceptions import (
BucketAlreadyExists, BucketAlreadyExists,
MissingBucket, MissingBucket,
@ -35,7 +36,6 @@ from .exceptions import (
MalformedXML, MalformedXML,
InvalidStorageClass, InvalidStorageClass,
InvalidTargetBucketForLogging, InvalidTargetBucketForLogging,
DuplicateTagKeys,
CrossLocationLoggingProhibitted, CrossLocationLoggingProhibitted,
NoSuchPublicAccessBlockConfiguration, NoSuchPublicAccessBlockConfiguration,
InvalidPublicAccessBlockConfiguration, InvalidPublicAccessBlockConfiguration,
@ -95,6 +95,7 @@ class FakeKey(BaseModel):
version_id=0, version_id=0,
max_buffer_size=DEFAULT_KEY_BUFFER_SIZE, max_buffer_size=DEFAULT_KEY_BUFFER_SIZE,
multipart=None, multipart=None,
bucket_name=None,
): ):
self.name = name self.name = name
self.last_modified = datetime.datetime.utcnow() self.last_modified = datetime.datetime.utcnow()
@ -106,8 +107,8 @@ class FakeKey(BaseModel):
self._etag = etag self._etag = etag
self._version_id = version_id self._version_id = version_id
self._is_versioned = is_versioned self._is_versioned = is_versioned
self._tagging = FakeTagging()
self.multipart = multipart self.multipart = multipart
self.bucket_name = bucket_name
self._value_buffer = tempfile.SpooledTemporaryFile(max_size=max_buffer_size) self._value_buffer = tempfile.SpooledTemporaryFile(max_size=max_buffer_size)
self._max_buffer_size = max_buffer_size self._max_buffer_size = max_buffer_size
@ -127,6 +128,13 @@ class FakeKey(BaseModel):
self.lock.release() self.lock.release()
return r return r
@property
def arn(self):
# S3 Objects don't have an ARN, but we do need something unique when creating tags against this resource
return "arn:aws:s3:::{}/{}/{}".format(
self.bucket_name, self.name, self.version_id
)
@value.setter @value.setter
def value(self, new_value): def value(self, new_value):
self._value_buffer.seek(0) self._value_buffer.seek(0)
@ -153,9 +161,6 @@ class FakeKey(BaseModel):
self._metadata = {} self._metadata = {}
self._metadata.update(metadata) self._metadata.update(metadata)
def set_tagging(self, tagging):
self._tagging = tagging
def set_storage_class(self, storage): def set_storage_class(self, storage):
if storage is not None and storage not in STORAGE_CLASS: if storage is not None and storage not in STORAGE_CLASS:
raise InvalidStorageClass(storage=storage) raise InvalidStorageClass(storage=storage)
@ -211,10 +216,6 @@ class FakeKey(BaseModel):
def metadata(self): def metadata(self):
return self._metadata return self._metadata
@property
def tagging(self):
return self._tagging
@property @property
def response_dict(self): def response_dict(self):
res = { res = {
@ -472,26 +473,10 @@ def get_canned_acl(acl):
return FakeAcl(grants=grants) return FakeAcl(grants=grants)
class FakeTagging(BaseModel):
def __init__(self, tag_set=None):
self.tag_set = tag_set or FakeTagSet()
class FakeTagSet(BaseModel):
def __init__(self, tags=None):
self.tags = tags or []
class FakeTag(BaseModel):
def __init__(self, key, value=None):
self.key = key
self.value = value
class LifecycleFilter(BaseModel): class LifecycleFilter(BaseModel):
def __init__(self, prefix=None, tag=None, and_filter=None): def __init__(self, prefix=None, tag=None, and_filter=None):
self.prefix = prefix self.prefix = prefix
self.tag = tag (self.tag_key, self.tag_value) = tag if tag else (None, None)
self.and_filter = and_filter self.and_filter = and_filter
def to_config_dict(self): def to_config_dict(self):
@ -500,11 +485,11 @@ class LifecycleFilter(BaseModel):
"predicate": {"type": "LifecyclePrefixPredicate", "prefix": self.prefix} "predicate": {"type": "LifecyclePrefixPredicate", "prefix": self.prefix}
} }
elif self.tag: elif self.tag_key:
return { return {
"predicate": { "predicate": {
"type": "LifecycleTagPredicate", "type": "LifecycleTagPredicate",
"tag": {"key": self.tag.key, "value": self.tag.value}, "tag": {"key": self.tag_key, "value": self.tag_value},
} }
} }
@ -528,12 +513,9 @@ class LifecycleAndFilter(BaseModel):
if self.prefix is not None: if self.prefix is not None:
data.append({"type": "LifecyclePrefixPredicate", "prefix": self.prefix}) data.append({"type": "LifecyclePrefixPredicate", "prefix": self.prefix})
for tag in self.tags: for key, value in self.tags.items():
data.append( data.append(
{ {"type": "LifecycleTagPredicate", "tag": {"key": key, "value": value},}
"type": "LifecycleTagPredicate",
"tag": {"key": tag.key, "value": tag.value},
}
) )
return data return data
@ -788,7 +770,6 @@ class FakeBucket(BaseModel):
self.policy = None self.policy = None
self.website_configuration = None self.website_configuration = None
self.acl = get_canned_acl("private") self.acl = get_canned_acl("private")
self.tags = FakeTagging()
self.cors = [] self.cors = []
self.logging = {} self.logging = {}
self.notification_configuration = None self.notification_configuration = None
@ -880,7 +861,7 @@ class FakeBucket(BaseModel):
and_filter = None and_filter = None
if rule["Filter"].get("And"): if rule["Filter"].get("And"):
filters += 1 filters += 1
and_tags = [] and_tags = {}
if rule["Filter"]["And"].get("Tag"): if rule["Filter"]["And"].get("Tag"):
if not isinstance(rule["Filter"]["And"]["Tag"], list): if not isinstance(rule["Filter"]["And"]["Tag"], list):
rule["Filter"]["And"]["Tag"] = [ rule["Filter"]["And"]["Tag"] = [
@ -888,7 +869,7 @@ class FakeBucket(BaseModel):
] ]
for t in rule["Filter"]["And"]["Tag"]: for t in rule["Filter"]["And"]["Tag"]:
and_tags.append(FakeTag(t["Key"], t.get("Value", ""))) and_tags[t["Key"]] = t.get("Value", "")
try: try:
and_prefix = ( and_prefix = (
@ -902,7 +883,7 @@ class FakeBucket(BaseModel):
filter_tag = None filter_tag = None
if rule["Filter"].get("Tag"): if rule["Filter"].get("Tag"):
filters += 1 filters += 1
filter_tag = FakeTag( filter_tag = (
rule["Filter"]["Tag"]["Key"], rule["Filter"]["Tag"]["Key"],
rule["Filter"]["Tag"].get("Value", ""), rule["Filter"]["Tag"].get("Value", ""),
) )
@ -989,16 +970,6 @@ class FakeBucket(BaseModel):
def delete_cors(self): def delete_cors(self):
self.cors = [] self.cors = []
def set_tags(self, tagging):
self.tags = tagging
def delete_tags(self):
self.tags = FakeTagging()
@property
def tagging(self):
return self.tags
def set_logging(self, logging_config, bucket_backend): def set_logging(self, logging_config, bucket_backend):
if not logging_config: if not logging_config:
self.logging = {} self.logging = {}
@ -1086,6 +1057,10 @@ class FakeBucket(BaseModel):
def set_acl(self, acl): def set_acl(self, acl):
self.acl = acl self.acl = acl
@property
def arn(self):
return "arn:aws:s3:::{}".format(self.name)
@property @property
def physical_resource_id(self): def physical_resource_id(self):
return self.name return self.name
@ -1111,7 +1086,7 @@ class FakeBucket(BaseModel):
int(time.mktime(self.creation_date.timetuple())) int(time.mktime(self.creation_date.timetuple()))
), # PY2 and 3 compatible ), # PY2 and 3 compatible
"configurationItemMD5Hash": "", "configurationItemMD5Hash": "",
"arn": "arn:aws:s3:::{}".format(self.name), "arn": self.arn,
"resourceType": "AWS::S3::Bucket", "resourceType": "AWS::S3::Bucket",
"resourceId": self.name, "resourceId": self.name,
"resourceName": self.name, "resourceName": self.name,
@ -1120,7 +1095,7 @@ class FakeBucket(BaseModel):
"resourceCreationTime": str(self.creation_date), "resourceCreationTime": str(self.creation_date),
"relatedEvents": [], "relatedEvents": [],
"relationships": [], "relationships": [],
"tags": {tag.key: tag.value for tag in self.tagging.tag_set.tags}, "tags": s3_backend.tagger.get_tag_dict_for_resource(self.arn),
"configuration": { "configuration": {
"name": self.name, "name": self.name,
"owner": {"id": OWNER}, "owner": {"id": OWNER},
@ -1182,6 +1157,8 @@ class S3Backend(BaseBackend):
def __init__(self): def __init__(self):
self.buckets = {} self.buckets = {}
self.account_public_access_block = None self.account_public_access_block = None
self.tagger = TaggingService()
# Register this class as a CloudWatch Metric Provider # Register this class as a CloudWatch Metric Provider
# Must provide a method 'get_cloudwatch_metrics' that will return a list of metrics, based on the data available # Must provide a method 'get_cloudwatch_metrics' that will return a list of metrics, based on the data available
metric_providers["S3"] = self metric_providers["S3"] = self
@ -1383,23 +1360,32 @@ class S3Backend(BaseBackend):
else: else:
return None return None
def set_key_tagging(self, bucket_name, key_name, tagging, version_id=None): def get_key_tags(self, key):
key = self.get_key(bucket_name, key_name, version_id) return self.tagger.list_tags_for_resource(key.arn)
def set_key_tags(self, key, tags, key_name=None):
if key is None: if key is None:
raise MissingKey(key_name) raise MissingKey(key_name)
key.set_tagging(tagging) self.tagger.delete_all_tags_for_resource(key.arn)
self.tagger.tag_resource(
key.arn, [{"Key": key, "Value": value} for key, value in tags.items()],
)
return key return key
def put_bucket_tagging(self, bucket_name, tagging): def get_bucket_tags(self, bucket_name):
tag_keys = [tag.key for tag in tagging.tag_set.tags]
if len(tag_keys) != len(set(tag_keys)):
raise DuplicateTagKeys()
bucket = self.get_bucket(bucket_name) bucket = self.get_bucket(bucket_name)
bucket.set_tags(tagging) return self.tagger.list_tags_for_resource(bucket.arn)
def put_bucket_tags(self, bucket_name, tags):
bucket = self.get_bucket(bucket_name)
self.tagger.delete_all_tags_for_resource(bucket.arn)
self.tagger.tag_resource(
bucket.arn, [{"Key": key, "Value": value} for key, value in tags.items()],
)
def delete_bucket_tagging(self, bucket_name): def delete_bucket_tagging(self, bucket_name):
bucket = self.get_bucket(bucket_name) bucket = self.get_bucket(bucket_name)
bucket.delete_tags() self.tagger.delete_all_tags_for_resource(bucket.arn)
def put_bucket_cors(self, bucket_name, cors_rules): def put_bucket_cors(self, bucket_name, cors_rules):
bucket = self.get_bucket(bucket_name) bucket = self.get_bucket(bucket_name)
@ -1607,6 +1593,7 @@ class S3Backend(BaseBackend):
key = self.get_key(src_bucket_name, src_key_name, version_id=src_version_id) key = self.get_key(src_bucket_name, src_key_name, version_id=src_version_id)
new_key = key.copy(dest_key_name, dest_bucket.is_versioned) new_key = key.copy(dest_key_name, dest_bucket.is_versioned)
self.tagger.copy_tags(key.arn, new_key.arn)
if storage is not None: if storage is not None:
new_key.set_storage_class(storage) new_key.set_storage_class(storage)

View File

@ -24,6 +24,7 @@ from moto.s3bucket_path.utils import (
from .exceptions import ( from .exceptions import (
BucketAlreadyExists, BucketAlreadyExists,
DuplicateTagKeys,
S3ClientError, S3ClientError,
MissingBucket, MissingBucket,
MissingKey, MissingKey,
@ -43,9 +44,6 @@ from .models import (
FakeGrant, FakeGrant,
FakeAcl, FakeAcl,
FakeKey, FakeKey,
FakeTagging,
FakeTagSet,
FakeTag,
) )
from .utils import ( from .utils import (
bucket_name_from_url, bucket_name_from_url,
@ -378,13 +376,13 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
template = self.response_template(S3_OBJECT_ACL_RESPONSE) template = self.response_template(S3_OBJECT_ACL_RESPONSE)
return template.render(obj=bucket) return template.render(obj=bucket)
elif "tagging" in querystring: elif "tagging" in querystring:
bucket = self.backend.get_bucket(bucket_name) tags = self.backend.get_bucket_tags(bucket_name)["Tags"]
# "Special Error" if no tags: # "Special Error" if no tags:
if len(bucket.tagging.tag_set.tags) == 0: if len(tags) == 0:
template = self.response_template(S3_NO_BUCKET_TAGGING) template = self.response_template(S3_NO_BUCKET_TAGGING)
return 404, {}, template.render(bucket_name=bucket_name) return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_BUCKET_TAGGING_RESPONSE) template = self.response_template(S3_OBJECT_TAGGING_RESPONSE)
return template.render(bucket=bucket) return template.render(tags=tags)
elif "logging" in querystring: elif "logging" in querystring:
bucket = self.backend.get_bucket(bucket_name) bucket = self.backend.get_bucket(bucket_name)
if not bucket.logging: if not bucket.logging:
@ -652,7 +650,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return "" return ""
elif "tagging" in querystring: elif "tagging" in querystring:
tagging = self._bucket_tagging_from_xml(body) tagging = self._bucket_tagging_from_xml(body)
self.backend.put_bucket_tagging(bucket_name, tagging) self.backend.put_bucket_tags(bucket_name, tagging)
return "" return ""
elif "website" in querystring: elif "website" in querystring:
self.backend.set_bucket_website_configuration(bucket_name, body) self.backend.set_bucket_website_configuration(bucket_name, body)
@ -1098,8 +1096,9 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
template = self.response_template(S3_OBJECT_ACL_RESPONSE) template = self.response_template(S3_OBJECT_ACL_RESPONSE)
return 200, response_headers, template.render(obj=key) return 200, response_headers, template.render(obj=key)
if "tagging" in query: if "tagging" in query:
tags = self.backend.get_key_tags(key)["Tags"]
template = self.response_template(S3_OBJECT_TAGGING_RESPONSE) template = self.response_template(S3_OBJECT_TAGGING_RESPONSE)
return 200, response_headers, template.render(obj=key) return 200, response_headers, template.render(tags=tags)
response_headers.update(key.metadata) response_headers.update(key.metadata)
response_headers.update(key.response_dict) response_headers.update(key.response_dict)
@ -1171,8 +1170,9 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
version_id = query["versionId"][0] version_id = query["versionId"][0]
else: else:
version_id = None version_id = None
key = self.backend.get_key(bucket_name, key_name, version_id=version_id)
tagging = self._tagging_from_xml(body) tagging = self._tagging_from_xml(body)
self.backend.set_key_tagging(bucket_name, key_name, tagging, version_id) self.backend.set_key_tags(key, tagging, key_name)
return 200, response_headers, "" return 200, response_headers, ""
if "x-amz-copy-source" in request.headers: if "x-amz-copy-source" in request.headers:
@ -1213,7 +1213,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
tdirective = request.headers.get("x-amz-tagging-directive") tdirective = request.headers.get("x-amz-tagging-directive")
if tdirective == "REPLACE": if tdirective == "REPLACE":
tagging = self._tagging_from_headers(request.headers) tagging = self._tagging_from_headers(request.headers)
new_key.set_tagging(tagging) self.backend.set_key_tags(new_key, tagging)
template = self.response_template(S3_OBJECT_COPY_RESPONSE) template = self.response_template(S3_OBJECT_COPY_RESPONSE)
response_headers.update(new_key.response_dict) response_headers.update(new_key.response_dict)
return 200, response_headers, template.render(key=new_key) return 200, response_headers, template.render(key=new_key)
@ -1237,7 +1237,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
new_key.website_redirect_location = request.headers.get( new_key.website_redirect_location = request.headers.get(
"x-amz-website-redirect-location" "x-amz-website-redirect-location"
) )
new_key.set_tagging(tagging) self.backend.set_key_tags(new_key, tagging)
response_headers.update(new_key.response_dict) response_headers.update(new_key.response_dict)
return 200, response_headers, "" return 200, response_headers, ""
@ -1365,55 +1365,45 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return None return None
def _tagging_from_headers(self, headers): def _tagging_from_headers(self, headers):
tags = {}
if headers.get("x-amz-tagging"): if headers.get("x-amz-tagging"):
parsed_header = parse_qs(headers["x-amz-tagging"], keep_blank_values=True) parsed_header = parse_qs(headers["x-amz-tagging"], keep_blank_values=True)
tags = []
for tag in parsed_header.items(): for tag in parsed_header.items():
tags.append(FakeTag(tag[0], tag[1][0])) tags[tag[0]] = tag[1][0]
return tags
tag_set = FakeTagSet(tags)
tagging = FakeTagging(tag_set)
return tagging
else:
return FakeTagging()
def _tagging_from_xml(self, xml): def _tagging_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml, force_list={"Tag": True}) parsed_xml = xmltodict.parse(xml, force_list={"Tag": True})
tags = [] tags = {}
for tag in parsed_xml["Tagging"]["TagSet"]["Tag"]: for tag in parsed_xml["Tagging"]["TagSet"]["Tag"]:
tags.append(FakeTag(tag["Key"], tag["Value"])) tags[tag["Key"]] = tag["Value"]
tag_set = FakeTagSet(tags) return tags
tagging = FakeTagging(tag_set)
return tagging
def _bucket_tagging_from_xml(self, xml): def _bucket_tagging_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml) parsed_xml = xmltodict.parse(xml)
tags = [] tags = {}
# Optional if no tags are being sent: # Optional if no tags are being sent:
if parsed_xml["Tagging"].get("TagSet"): if parsed_xml["Tagging"].get("TagSet"):
# If there is only 1 tag, then it's not a list: # If there is only 1 tag, then it's not a list:
if not isinstance(parsed_xml["Tagging"]["TagSet"]["Tag"], list): if not isinstance(parsed_xml["Tagging"]["TagSet"]["Tag"], list):
tags.append( tags[parsed_xml["Tagging"]["TagSet"]["Tag"]["Key"]] = parsed_xml[
FakeTag( "Tagging"
parsed_xml["Tagging"]["TagSet"]["Tag"]["Key"], ]["TagSet"]["Tag"]["Value"]
parsed_xml["Tagging"]["TagSet"]["Tag"]["Value"],
)
)
else: else:
for tag in parsed_xml["Tagging"]["TagSet"]["Tag"]: for tag in parsed_xml["Tagging"]["TagSet"]["Tag"]:
tags.append(FakeTag(tag["Key"], tag["Value"])) if tag["Key"] in tags:
raise DuplicateTagKeys()
tags[tag["Key"]] = tag["Value"]
# Verify that "aws:" is not in the tags. If so, then this is a problem: # Verify that "aws:" is not in the tags. If so, then this is a problem:
for tag in tags: for key, _ in tags.items():
if tag.key.startswith("aws:"): if key.startswith("aws:"):
raise NoSystemTags() raise NoSystemTags()
tag_set = FakeTagSet(tags) return tags
tagging = FakeTagging(tag_set)
return tagging
def _cors_from_xml(self, xml): def _cors_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml) parsed_xml = xmltodict.parse(xml)
@ -1733,10 +1723,10 @@ S3_BUCKET_LIFECYCLE_CONFIGURATION = """<?xml version="1.0" encoding="UTF-8"?>
{% if rule.filter.prefix != None %} {% if rule.filter.prefix != None %}
<Prefix>{{ rule.filter.prefix }}</Prefix> <Prefix>{{ rule.filter.prefix }}</Prefix>
{% endif %} {% endif %}
{% if rule.filter.tag %} {% if rule.filter.tag_key %}
<Tag> <Tag>
<Key>{{ rule.filter.tag.key }}</Key> <Key>{{ rule.filter.tag_key }}</Key>
<Value>{{ rule.filter.tag.value }}</Value> <Value>{{ rule.filter.tag_value }}</Value>
</Tag> </Tag>
{% endif %} {% endif %}
{% if rule.filter.and_filter %} {% if rule.filter.and_filter %}
@ -1744,10 +1734,10 @@ S3_BUCKET_LIFECYCLE_CONFIGURATION = """<?xml version="1.0" encoding="UTF-8"?>
{% if rule.filter.and_filter.prefix != None %} {% if rule.filter.and_filter.prefix != None %}
<Prefix>{{ rule.filter.and_filter.prefix }}</Prefix> <Prefix>{{ rule.filter.and_filter.prefix }}</Prefix>
{% endif %} {% endif %}
{% for tag in rule.filter.and_filter.tags %} {% for key, value in rule.filter.and_filter.tags.items() %}
<Tag> <Tag>
<Key>{{ tag.key }}</Key> <Key>{{ key }}</Key>
<Value>{{ tag.value }}</Value> <Value>{{ value }}</Value>
</Tag> </Tag>
{% endfor %} {% endfor %}
</And> </And>
@ -1908,22 +1898,10 @@ S3_OBJECT_TAGGING_RESPONSE = """\
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<Tagging xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> <Tagging xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<TagSet> <TagSet>
{% for tag in obj.tagging.tag_set.tags %} {% for tag in tags %}
<Tag> <Tag>
<Key>{{ tag.key }}</Key> <Key>{{ tag.Key }}</Key>
<Value>{{ tag.value }}</Value> <Value>{{ tag.Value }}</Value>
</Tag>
{% endfor %}
</TagSet>
</Tagging>"""
S3_BUCKET_TAGGING_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<Tagging>
<TagSet>
{% for tag in bucket.tagging.tag_set.tags %}
<Tag>
<Key>{{ tag.key }}</Key>
<Value>{{ tag.value }}</Value>
</Tag> </Tag>
{% endfor %} {% endfor %}
</TagSet> </TagSet>

View File

@ -5,14 +5,22 @@ class TaggingService:
self.valueName = valueName self.valueName = valueName
self.tags = {} self.tags = {}
def get_tag_dict_for_resource(self, arn):
result = {}
if self.has_tags(arn):
for k, v in self.tags[arn].items():
result[k] = v
return result
def list_tags_for_resource(self, arn): def list_tags_for_resource(self, arn):
result = [] result = []
if arn in self.tags: if self.has_tags(arn):
for k, v in self.tags[arn].items(): for k, v in self.tags[arn].items():
result.append({self.keyName: k, self.valueName: v}) result.append({self.keyName: k, self.valueName: v})
return {self.tagName: result} return {self.tagName: result}
def delete_all_tags_for_resource(self, arn): def delete_all_tags_for_resource(self, arn):
if self.has_tags(arn):
del self.tags[arn] del self.tags[arn]
def has_tags(self, arn): def has_tags(self, arn):
@ -27,6 +35,12 @@ class TaggingService:
else: else:
self.tags[arn][t[self.keyName]] = None self.tags[arn][t[self.keyName]] = None
def copy_tags(self, from_arn, to_arn):
if self.has_tags(from_arn):
self.tag_resource(
to_arn, self.list_tags_for_resource(from_arn)[self.tagName]
)
def untag_resource_using_names(self, arn, tag_names): def untag_resource_using_names(self, arn, tag_names):
for name in tag_names: for name in tag_names:
if name in self.tags.get(arn, {}): if name in self.tags.get(arn, {}):

View File

@ -11,6 +11,8 @@ from moto import mock_s3
from moto.config import mock_config from moto.config import mock_config
from moto.core import ACCOUNT_ID from moto.core import ACCOUNT_ID
import sure # noqa
@mock_config @mock_config
def test_put_configuration_recorder(): def test_put_configuration_recorder():

View File

@ -3256,7 +3256,8 @@ def test_boto3_put_object_tagging_on_earliest_version():
# Older version has tags while the most recent does not # Older version has tags while the most recent does not
resp = s3.get_object_tagging(Bucket=bucket_name, Key=key, VersionId=first_object.id) resp = s3.get_object_tagging(Bucket=bucket_name, Key=key, VersionId=first_object.id)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
resp["TagSet"].should.equal( sorted_tagset = sorted(resp["TagSet"], key=lambda t: t["Key"])
sorted_tagset.should.equal(
[{"Key": "item1", "Value": "foo"}, {"Key": "item2", "Value": "bar"}] [{"Key": "item1", "Value": "foo"}, {"Key": "item2", "Value": "bar"}]
) )
@ -3334,7 +3335,8 @@ def test_boto3_put_object_tagging_on_both_version():
resp = s3.get_object_tagging(Bucket=bucket_name, Key=key, VersionId=first_object.id) resp = s3.get_object_tagging(Bucket=bucket_name, Key=key, VersionId=first_object.id)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
resp["TagSet"].should.equal( sorted_tagset = sorted(resp["TagSet"], key=lambda t: t["Key"])
sorted_tagset.should.equal(
[{"Key": "item1", "Value": "foo"}, {"Key": "item2", "Value": "bar"}] [{"Key": "item1", "Value": "foo"}, {"Key": "item2", "Value": "bar"}]
) )
@ -3342,7 +3344,8 @@ def test_boto3_put_object_tagging_on_both_version():
Bucket=bucket_name, Key=key, VersionId=second_object.id Bucket=bucket_name, Key=key, VersionId=second_object.id
) )
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
resp["TagSet"].should.equal( sorted_tagset = sorted(resp["TagSet"], key=lambda t: t["Key"])
sorted_tagset.should.equal(
[{"Key": "item1", "Value": "baz"}, {"Key": "item2", "Value": "bin"}] [{"Key": "item1", "Value": "baz"}, {"Key": "item2", "Value": "bin"}]
) )
@ -4293,24 +4296,17 @@ def test_s3_config_dict():
FakeAcl, FakeAcl,
FakeGrant, FakeGrant,
FakeGrantee, FakeGrantee,
FakeTag,
FakeTagging,
FakeTagSet,
OWNER, OWNER,
) )
# Without any buckets: # Without any buckets:
assert not s3_config_query.get_config_resource("some_bucket") assert not s3_config_query.get_config_resource("some_bucket")
tags = FakeTagging( tags = {"someTag": "someValue", "someOtherTag": "someOtherValue"}
FakeTagSet(
[FakeTag("someTag", "someValue"), FakeTag("someOtherTag", "someOtherValue")]
)
)
# With 1 bucket in us-west-2: # With 1 bucket in us-west-2:
s3_config_query.backends["global"].create_bucket("bucket1", "us-west-2") s3_config_query.backends["global"].create_bucket("bucket1", "us-west-2")
s3_config_query.backends["global"].put_bucket_tagging("bucket1", tags) s3_config_query.backends["global"].put_bucket_tags("bucket1", tags)
# With a log bucket: # With a log bucket:
s3_config_query.backends["global"].create_bucket("logbucket", "us-west-2") s3_config_query.backends["global"].create_bucket("logbucket", "us-west-2")

View File

@ -77,3 +77,34 @@ def test_extract_tag_names():
expected = ["key1", "key2"] expected = ["key1", "key2"]
expected.should.be.equal(actual) expected.should.be.equal(actual)
def test_copy_non_existing_arn():
svc = TaggingService()
tags = [{"Key": "key1", "Value": "value1"}, {"Key": "key2", "Value": "value2"}]
svc.tag_resource("new_arn", tags)
#
svc.copy_tags("non_existing_arn", "new_arn")
# Copying from a non-existing ARN should a NOOP
# Assert the old tags still exist
actual = sorted(
svc.list_tags_for_resource("new_arn")["Tags"], key=lambda t: t["Key"]
)
actual.should.equal(tags)
def test_copy_existing_arn():
svc = TaggingService()
tags_old_arn = [{"Key": "key1", "Value": "value1"}]
tags_new_arn = [{"Key": "key2", "Value": "value2"}]
svc.tag_resource("old_arn", tags_old_arn)
svc.tag_resource("new_arn", tags_new_arn)
#
svc.copy_tags("old_arn", "new_arn")
# Assert the old tags still exist
actual = sorted(
svc.list_tags_for_resource("new_arn")["Tags"], key=lambda t: t["Key"]
)
actual.should.equal(
[{"Key": "key1", "Value": "value1"}, {"Key": "key2", "Value": "value2"}]
)