diff --git a/moto/s3/models.py b/moto/s3/models.py index 39c4b1edd..6210aecd6 100644 --- a/moto/s3/models.py +++ b/moto/s3/models.py @@ -1115,8 +1115,8 @@ class S3Backend(BaseBackend): else: return None - def set_key_tagging(self, bucket_name, key_name, tagging): - key = self.get_key(bucket_name, key_name) + def set_key_tagging(self, bucket_name, key_name, tagging, version_id=None): + key = self.get_key(bucket_name, key_name, version_id) if key is None: raise MissingKey(key_name) key.set_tagging(tagging) diff --git a/moto/s3/responses.py b/moto/s3/responses.py index ae6662579..e0e65850c 100644 --- a/moto/s3/responses.py +++ b/moto/s3/responses.py @@ -905,8 +905,12 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin): return 200, response_headers, "" if 'tagging' in query: + if 'versionId' in query: + version_id = query['versionId'][0] + else: + version_id = None tagging = self._tagging_from_xml(body) - self.backend.set_key_tagging(bucket_name, key_name, tagging) + self.backend.set_key_tagging(bucket_name, key_name, tagging, version_id) return 200, response_headers, "" if 'x-amz-copy-source' in request.headers: diff --git a/tests/test_s3/test_s3.py b/tests/test_s3/test_s3.py index 0d8f3385d..8e22bfdad 100644 --- a/tests/test_s3/test_s3.py +++ b/tests/test_s3/test_s3.py @@ -2599,6 +2599,161 @@ def test_boto3_put_object_tagging(): resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200) +@mock_s3 +def test_boto3_put_object_tagging_on_earliest_version(): + s3 = boto3.client('s3', region_name='us-east-1') + bucket_name = 'mybucket' + key = 'key-with-tags' + s3.create_bucket(Bucket=bucket_name) + s3_resource = boto3.resource('s3') + bucket_versioning = s3_resource.BucketVersioning(bucket_name) + bucket_versioning.enable() + bucket_versioning.status.should.equal('Enabled') + + with assert_raises(ClientError) as err: + s3.put_object_tagging( + Bucket=bucket_name, + Key=key, + Tagging={'TagSet': [ + {'Key': 'item1', 'Value': 'foo'}, + {'Key': 'item2', 'Value': 'bar'}, + ]} + ) + + e = err.exception + e.response['Error'].should.equal({ + 'Code': 'NoSuchKey', + 'Message': 'The specified key does not exist.', + 'RequestID': '7a62c49f-347e-4fc4-9331-6e8eEXAMPLE', + }) + + s3.put_object( + Bucket=bucket_name, + Key=key, + Body='test' + ) + s3.put_object( + Bucket=bucket_name, + Key=key, + Body='test_updated' + ) + + object_versions = list(s3_resource.Bucket(bucket_name).object_versions.all()) + first_object = object_versions[0] + second_object = object_versions[1] + + resp = s3.put_object_tagging( + Bucket=bucket_name, + Key=key, + Tagging={'TagSet': [ + {'Key': 'item1', 'Value': 'foo'}, + {'Key': 'item2', 'Value': 'bar'}, + ]}, + VersionId=first_object.id + ) + + resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200) + + # Older version has tags while the most recent does not + resp = s3.get_object_tagging(Bucket=bucket_name, Key=key, VersionId=first_object.id) + resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200) + resp['TagSet'].should.equal( + [ + {'Key': 'item1', 'Value': 'foo'}, + {'Key': 'item2', 'Value': 'bar'} + ] + ) + + resp = s3.get_object_tagging(Bucket=bucket_name, Key=key, VersionId=second_object.id) + resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200) + resp['TagSet'].should.equal([]) + + +@mock_s3 +def test_boto3_put_object_tagging_on_both_version(): + s3 = boto3.client('s3', region_name='us-east-1') + bucket_name = 'mybucket' + key = 'key-with-tags' + s3.create_bucket(Bucket=bucket_name) + s3_resource = boto3.resource('s3') + bucket_versioning = s3_resource.BucketVersioning(bucket_name) + bucket_versioning.enable() + bucket_versioning.status.should.equal('Enabled') + + with assert_raises(ClientError) as err: + s3.put_object_tagging( + Bucket=bucket_name, + Key=key, + Tagging={'TagSet': [ + {'Key': 'item1', 'Value': 'foo'}, + {'Key': 'item2', 'Value': 'bar'}, + ]} + ) + + e = err.exception + e.response['Error'].should.equal({ + 'Code': 'NoSuchKey', + 'Message': 'The specified key does not exist.', + 'RequestID': '7a62c49f-347e-4fc4-9331-6e8eEXAMPLE', + }) + + s3.put_object( + Bucket=bucket_name, + Key=key, + Body='test' + ) + s3.put_object( + Bucket=bucket_name, + Key=key, + Body='test_updated' + ) + + object_versions = list(s3_resource.Bucket(bucket_name).object_versions.all()) + first_object = object_versions[0] + second_object = object_versions[1] + + resp = s3.put_object_tagging( + Bucket=bucket_name, + Key=key, + Tagging={'TagSet': [ + {'Key': 'item1', 'Value': 'foo'}, + {'Key': 'item2', 'Value': 'bar'}, + ]}, + VersionId=first_object.id + ) + resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200) + + resp = s3.put_object_tagging( + Bucket=bucket_name, + Key=key, + Tagging={'TagSet': [ + {'Key': 'item1', 'Value': 'baz'}, + {'Key': 'item2', 'Value': 'bin'}, + ]}, + VersionId=second_object.id + ) + resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200) + + # Older version has tags while the most recent does not + resp = s3.get_object_tagging(Bucket=bucket_name, Key=key, VersionId=first_object.id) + resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200) + resp['TagSet'].should.equal( + [ + {'Key': 'item1', 'Value': 'foo'}, + {'Key': 'item2', 'Value': 'bar'} + ] + ) + + resp = s3.get_object_tagging(Bucket=bucket_name, Key=key, VersionId=second_object.id) + resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200) + resp['TagSet'].should.equal( + [ + {'Key': 'item1', 'Value': 'baz'}, + {'Key': 'item2', 'Value': 'bin'} + ] + ) + + @mock_s3 def test_boto3_put_object_tagging_with_single_tag(): s3 = boto3.client('s3', region_name='us-east-1')