diff --git a/moto/s3/models.py b/moto/s3/models.py
index aede52d26..b5224b64a 100644
--- a/moto/s3/models.py
+++ b/moto/s3/models.py
@@ -95,6 +95,7 @@ class FakeKey(BaseModel):
version_id=0,
max_buffer_size=DEFAULT_KEY_BUFFER_SIZE,
multipart=None,
+ bucket_name=None,
):
self.name = name
self.last_modified = datetime.datetime.utcnow()
@@ -106,8 +107,8 @@ class FakeKey(BaseModel):
self._etag = etag
self._version_id = version_id
self._is_versioned = is_versioned
- self._tagging = FakeTagging()
self.multipart = multipart
+ self.bucket_name = bucket_name
self._value_buffer = tempfile.SpooledTemporaryFile(max_size=max_buffer_size)
self._max_buffer_size = max_buffer_size
@@ -127,6 +128,13 @@ class FakeKey(BaseModel):
self.lock.release()
return r
+ @property
+ def arn(self):
+ # S3 Objects don't have an ARN, but we do need something unique when creating tags against this resource
+ return "arn:aws:s3:::{}/{}/{}".format(
+ self.bucket_name, self.name, self.version_id
+ )
+
@value.setter
def value(self, new_value):
self._value_buffer.seek(0)
@@ -153,9 +161,6 @@ class FakeKey(BaseModel):
self._metadata = {}
self._metadata.update(metadata)
- def set_tagging(self, tagging):
- self._tagging = tagging
-
def set_storage_class(self, storage):
if storage is not None and storage not in STORAGE_CLASS:
raise InvalidStorageClass(storage=storage)
@@ -211,10 +216,6 @@ class FakeKey(BaseModel):
def metadata(self):
return self._metadata
- @property
- def tagging(self):
- return self._tagging
-
@property
def response_dict(self):
res = {
@@ -1355,11 +1356,17 @@ class S3Backend(BaseBackend):
else:
return None
- def set_key_tagging(self, bucket_name, key_name, tagging, version_id=None):
- key = self.get_key(bucket_name, key_name, version_id)
+ def get_key_tags(self, key):
+ return self.tagger.list_tags_for_resource(key.arn)
+
+ def set_key_tags(self, key, tagging, key_name=None):
if key is None:
raise MissingKey(key_name)
- key.set_tagging(tagging)
+ self.tagger.delete_all_tags_for_resource(key.arn)
+ self.tagger.tag_resource(
+ key.arn,
+ [{"Key": tag.key, "Value": tag.value} for tag in tagging.tag_set.tags],
+ )
return key
def get_bucket_tags(self, bucket_name):
@@ -1587,6 +1594,7 @@ class S3Backend(BaseBackend):
key = self.get_key(src_bucket_name, src_key_name, version_id=src_version_id)
new_key = key.copy(dest_key_name, dest_bucket.is_versioned)
+ self.tagger.copy_tags(key.arn, new_key.arn)
if storage is not None:
new_key.set_storage_class(storage)
diff --git a/moto/s3/responses.py b/moto/s3/responses.py
index f3a5eeaac..4e3b9a67b 100644
--- a/moto/s3/responses.py
+++ b/moto/s3/responses.py
@@ -383,7 +383,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
if len(tags) == 0:
template = self.response_template(S3_NO_BUCKET_TAGGING)
return 404, {}, template.render(bucket_name=bucket_name)
- template = self.response_template(S3_BUCKET_TAGGING_RESPONSE)
+ template = self.response_template(S3_OBJECT_TAGGING_RESPONSE)
return template.render(tags=tags)
elif "logging" in querystring:
bucket = self.backend.get_bucket(bucket_name)
@@ -1091,8 +1091,9 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
template = self.response_template(S3_OBJECT_ACL_RESPONSE)
return 200, response_headers, template.render(obj=key)
if "tagging" in query:
+ tags = self.backend.get_key_tags(key)["Tags"]
template = self.response_template(S3_OBJECT_TAGGING_RESPONSE)
- return 200, response_headers, template.render(obj=key)
+ return 200, response_headers, template.render(tags=tags)
response_headers.update(key.metadata)
response_headers.update(key.response_dict)
@@ -1164,8 +1165,9 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
version_id = query["versionId"][0]
else:
version_id = None
+ key = self.backend.get_key(bucket_name, key_name, version_id=version_id)
tagging = self._tagging_from_xml(body)
- self.backend.set_key_tagging(bucket_name, key_name, tagging, version_id)
+ self.backend.set_key_tags(key, tagging, key_name)
return 200, response_headers, ""
if "x-amz-copy-source" in request.headers:
@@ -1206,7 +1208,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
tdirective = request.headers.get("x-amz-tagging-directive")
if tdirective == "REPLACE":
tagging = self._tagging_from_headers(request.headers)
- new_key.set_tagging(tagging)
+ self.backend.set_key_tags(new_key, tagging)
template = self.response_template(S3_OBJECT_COPY_RESPONSE)
response_headers.update(new_key.response_dict)
return 200, response_headers, template.render(key=new_key)
@@ -1230,7 +1232,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
new_key.website_redirect_location = request.headers.get(
"x-amz-website-redirect-location"
)
- new_key.set_tagging(tagging)
+ self.backend.set_key_tags(new_key, tagging)
template = self.response_template(S3_OBJECT_RESPONSE)
response_headers.update(new_key.response_dict)
@@ -1916,23 +1918,11 @@ S3_OBJECT_ACL_RESPONSE = """
S3_OBJECT_TAGGING_RESPONSE = """\
-
- {% for tag in obj.tagging.tag_set.tags %}
-
- {{ tag.key }}
- {{ tag.value }}
-
- {% endfor %}
-
-"""
-
-S3_BUCKET_TAGGING_RESPONSE = """
-
{% for tag in tags %}
- {{ tag.key }}
- {{ tag.value }}
+ {{ tag.Key }}
+ {{ tag.Value }}
{% endfor %}
diff --git a/moto/utilities/tagging_service.py b/moto/utilities/tagging_service.py
index 8c3228552..2d6ac99c9 100644
--- a/moto/utilities/tagging_service.py
+++ b/moto/utilities/tagging_service.py
@@ -35,6 +35,12 @@ class TaggingService:
else:
self.tags[arn][t[self.keyName]] = None
+ def copy_tags(self, from_arn, to_arn):
+ if self.has_tags(from_arn):
+ self.tag_resource(
+ to_arn, self.list_tags_for_resource(from_arn)[self.tagName]
+ )
+
def untag_resource_using_names(self, arn, tag_names):
for name in tag_names:
if name in self.tags.get(arn, {}):
diff --git a/tests/test_s3/test_s3.py b/tests/test_s3/test_s3.py
index 303ed523d..4ddc160a8 100644
--- a/tests/test_s3/test_s3.py
+++ b/tests/test_s3/test_s3.py
@@ -3255,7 +3255,8 @@ def test_boto3_put_object_tagging_on_earliest_version():
# Older version has tags while the most recent does not
resp = s3.get_object_tagging(Bucket=bucket_name, Key=key, VersionId=first_object.id)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
- resp["TagSet"].should.equal(
+ sorted_tagset = sorted(resp["TagSet"], key=lambda t: t["Key"])
+ sorted_tagset.should.equal(
[{"Key": "item1", "Value": "foo"}, {"Key": "item2", "Value": "bar"}]
)
@@ -3333,7 +3334,8 @@ def test_boto3_put_object_tagging_on_both_version():
resp = s3.get_object_tagging(Bucket=bucket_name, Key=key, VersionId=first_object.id)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
- resp["TagSet"].should.equal(
+ sorted_tagset = sorted(resp["TagSet"], key=lambda t: t["Key"])
+ sorted_tagset.should.equal(
[{"Key": "item1", "Value": "foo"}, {"Key": "item2", "Value": "bar"}]
)
@@ -3341,7 +3343,8 @@ def test_boto3_put_object_tagging_on_both_version():
Bucket=bucket_name, Key=key, VersionId=second_object.id
)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
- resp["TagSet"].should.equal(
+ sorted_tagset = sorted(resp["TagSet"], key=lambda t: t["Key"])
+ sorted_tagset.should.equal(
[{"Key": "item1", "Value": "baz"}, {"Key": "item2", "Value": "bin"}]
)
diff --git a/tests/test_utilities/test_tagging_service.py b/tests/test_utilities/test_tagging_service.py
index 249e903fe..1eac276a1 100644
--- a/tests/test_utilities/test_tagging_service.py
+++ b/tests/test_utilities/test_tagging_service.py
@@ -77,3 +77,34 @@ def test_extract_tag_names():
expected = ["key1", "key2"]
expected.should.be.equal(actual)
+
+
+def test_copy_non_existing_arn():
+ svc = TaggingService()
+ tags = [{"Key": "key1", "Value": "value1"}, {"Key": "key2", "Value": "value2"}]
+ svc.tag_resource("new_arn", tags)
+ #
+ svc.copy_tags("non_existing_arn", "new_arn")
+ # Copying from a non-existing ARN should a NOOP
+ # Assert the old tags still exist
+ actual = sorted(
+ svc.list_tags_for_resource("new_arn")["Tags"], key=lambda t: t["Key"]
+ )
+ actual.should.equal(tags)
+
+
+def test_copy_existing_arn():
+ svc = TaggingService()
+ tags_old_arn = [{"Key": "key1", "Value": "value1"}]
+ tags_new_arn = [{"Key": "key2", "Value": "value2"}]
+ svc.tag_resource("old_arn", tags_old_arn)
+ svc.tag_resource("new_arn", tags_new_arn)
+ #
+ svc.copy_tags("old_arn", "new_arn")
+ # Assert the old tags still exist
+ actual = sorted(
+ svc.list_tags_for_resource("new_arn")["Tags"], key=lambda t: t["Key"]
+ )
+ actual.should.equal(
+ [{"Key": "key1", "Value": "value1"}, {"Key": "key2", "Value": "value2"}]
+ )