2c0636ae67
* S3 update CopyObject logic * add CopyObject tests * re-add missing type annotation * add ACL test * lint/format * fix typing * fix test * fix test test_copy_object_does_not_copy_storage_class
685 lines
23 KiB
Python
685 lines
23 KiB
Python
import datetime
|
|
|
|
import boto3
|
|
from botocore.client import ClientError
|
|
|
|
from moto.s3.responses import DEFAULT_REGION_NAME
|
|
import pytest
|
|
|
|
import sure # noqa # pylint: disable=unused-import
|
|
|
|
from moto import mock_s3, mock_kms
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"key_name",
|
|
[
|
|
"the-key",
|
|
"the-unicode-💩-key",
|
|
"key-with?question-mark",
|
|
"key-with%2Fembedded%2Furl%2Fencoding",
|
|
],
|
|
)
|
|
@mock_s3
|
|
def test_copy_key_boto3(key_name):
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
s3.create_bucket(Bucket="foobar")
|
|
|
|
key = s3.Object("foobar", key_name)
|
|
key.put(Body=b"some value")
|
|
|
|
key2 = s3.Object("foobar", "new-key")
|
|
key2.copy_from(CopySource=f"foobar/{key_name}")
|
|
|
|
resp = client.get_object(Bucket="foobar", Key=key_name)
|
|
resp["Body"].read().should.equal(b"some value")
|
|
resp = client.get_object(Bucket="foobar", Key="new-key")
|
|
resp["Body"].read().should.equal(b"some value")
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_key_boto3_with_sha256_checksum():
|
|
# Setup
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
key_name = "key"
|
|
new_key = "new_key"
|
|
bucket = "foobar"
|
|
expected_hash = "YWIzZDA3ZjMxNjljY2JkMGVkNmM0YjQ1ZGUyMTUxOWY5ZjkzOGM3MmQyNDEyNDk5OGFhYjk0OWNlODNiYjUxYg=="
|
|
|
|
s3.create_bucket(Bucket=bucket)
|
|
key = s3.Object("foobar", key_name)
|
|
key.put(Body=b"some value")
|
|
|
|
# Execute
|
|
key2 = s3.Object(bucket, new_key)
|
|
key2.copy(
|
|
CopySource={"Bucket": bucket, "Key": key_name},
|
|
ExtraArgs={"ChecksumAlgorithm": "SHA256"},
|
|
)
|
|
|
|
# Verify
|
|
resp = client.get_object_attributes(
|
|
Bucket=bucket, Key=new_key, ObjectAttributes=["Checksum"]
|
|
)
|
|
|
|
assert "Checksum" in resp
|
|
assert "ChecksumSHA256" in resp["Checksum"]
|
|
assert resp["Checksum"]["ChecksumSHA256"] == expected_hash
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_key_with_version_boto3():
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
s3.create_bucket(Bucket="foobar")
|
|
client.put_bucket_versioning(
|
|
Bucket="foobar", VersioningConfiguration={"Status": "Enabled"}
|
|
)
|
|
|
|
key = s3.Object("foobar", "the-key")
|
|
key.put(Body=b"some value")
|
|
key.put(Body=b"another value")
|
|
|
|
all_versions = client.list_object_versions(Bucket="foobar", Prefix="the-key")[
|
|
"Versions"
|
|
]
|
|
old_version = [v for v in all_versions if not v["IsLatest"]][0]
|
|
|
|
key2 = s3.Object("foobar", "new-key")
|
|
key2.copy_from(CopySource=f"foobar/the-key?versionId={old_version['VersionId']}")
|
|
|
|
resp = client.get_object(Bucket="foobar", Key="the-key")
|
|
resp["Body"].read().should.equal(b"another value")
|
|
resp = client.get_object(Bucket="foobar", Key="new-key")
|
|
resp["Body"].read().should.equal(b"some value")
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_object_with_bucketkeyenabled_returns_the_value():
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
bucket_name = "test-copy-object-with-bucketkeyenabled"
|
|
s3.create_bucket(Bucket=bucket_name)
|
|
|
|
key = s3.Object(bucket_name, "the-key")
|
|
key.put(Body=b"some value")
|
|
|
|
key2 = s3.Object(bucket_name, "new-key")
|
|
key2.copy_from(
|
|
CopySource=f"{bucket_name}/the-key",
|
|
BucketKeyEnabled=True,
|
|
ServerSideEncryption="aws:kms",
|
|
)
|
|
|
|
resp = client.get_object(Bucket=bucket_name, Key="the-key")
|
|
src_headers = resp["ResponseMetadata"]["HTTPHeaders"]
|
|
src_headers.shouldnt.have.key("x-amz-server-side-encryption")
|
|
src_headers.shouldnt.have.key("x-amz-server-side-encryption-aws-kms-key-id")
|
|
src_headers.shouldnt.have.key("x-amz-server-side-encryption-bucket-key-enabled")
|
|
|
|
resp = client.get_object(Bucket=bucket_name, Key="new-key")
|
|
target_headers = resp["ResponseMetadata"]["HTTPHeaders"]
|
|
target_headers.should.have.key("x-amz-server-side-encryption")
|
|
# AWS will also return the KMS default key id - not yet implemented
|
|
# target_headers.should.have.key("x-amz-server-side-encryption-aws-kms-key-id")
|
|
# This field is only returned if encryption is set to 'aws:kms'
|
|
target_headers.should.have.key("x-amz-server-side-encryption-bucket-key-enabled")
|
|
str(
|
|
target_headers["x-amz-server-side-encryption-bucket-key-enabled"]
|
|
).lower().should.equal("true")
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_key_with_metadata():
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
s3.create_bucket(Bucket="foobar")
|
|
|
|
key = s3.Object("foobar", "the-key")
|
|
metadata = {"md": "Metadatastring"}
|
|
content_type = "application/json"
|
|
initial = key.put(Body=b"{}", Metadata=metadata, ContentType=content_type)
|
|
|
|
client.copy_object(Bucket="foobar", CopySource="foobar/the-key", Key="new-key")
|
|
|
|
resp = client.get_object(Bucket="foobar", Key="new-key")
|
|
resp["Metadata"].should.equal(metadata)
|
|
resp["ContentType"].should.equal(content_type)
|
|
resp["ETag"].should.equal(initial["ETag"])
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_key_replace_metadata():
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
s3.create_bucket(Bucket="foobar")
|
|
|
|
key = s3.Object("foobar", "the-key")
|
|
initial = key.put(Body=b"some value", Metadata={"md": "Metadatastring"})
|
|
|
|
client.copy_object(
|
|
Bucket="foobar",
|
|
CopySource="foobar/the-key",
|
|
Key="new-key",
|
|
Metadata={"momd": "Mometadatastring"},
|
|
MetadataDirective="REPLACE",
|
|
)
|
|
|
|
resp = client.get_object(Bucket="foobar", Key="new-key")
|
|
resp["Metadata"].should.equal({"momd": "Mometadatastring"})
|
|
resp["ETag"].should.equal(initial["ETag"])
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_key_without_changes_should_error():
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
bucket_name = "my_bucket"
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
key_name = "my_key"
|
|
key = s3.Object(bucket_name, key_name)
|
|
|
|
s3.create_bucket(Bucket=bucket_name)
|
|
key.put(Body=b"some value")
|
|
|
|
with pytest.raises(ClientError) as e:
|
|
client.copy_object(
|
|
Bucket=bucket_name,
|
|
CopySource=f"{bucket_name}/{key_name}",
|
|
Key=key_name,
|
|
)
|
|
e.value.response["Error"]["Message"].should.equal(
|
|
"This copy request is illegal because it is trying to copy an object to itself without changing the object's metadata, storage class, website redirect location or encryption attributes."
|
|
)
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_key_without_changes_should_not_error():
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
bucket_name = "my_bucket"
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
key_name = "my_key"
|
|
key = s3.Object(bucket_name, key_name)
|
|
|
|
s3.create_bucket(Bucket=bucket_name)
|
|
key.put(Body=b"some value")
|
|
|
|
client.copy_object(
|
|
Bucket=bucket_name,
|
|
CopySource=f"{bucket_name}/{key_name}",
|
|
Key=key_name,
|
|
Metadata={"some-key": "some-value"},
|
|
MetadataDirective="REPLACE",
|
|
)
|
|
|
|
new_object = client.get_object(Bucket=bucket_name, Key=key_name)
|
|
|
|
assert new_object["Metadata"] == {"some-key": "some-value"}
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_key_reduced_redundancy():
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
bucket = s3.Bucket("test_bucket")
|
|
bucket.create()
|
|
|
|
bucket.put_object(Key="the-key", Body=b"somedata")
|
|
|
|
client.copy_object(
|
|
Bucket="test_bucket",
|
|
CopySource="test_bucket/the-key",
|
|
Key="new-key",
|
|
StorageClass="REDUCED_REDUNDANCY",
|
|
)
|
|
|
|
keys = dict([(k.key, k) for k in bucket.objects.all()])
|
|
keys["new-key"].storage_class.should.equal("REDUCED_REDUNDANCY")
|
|
keys["the-key"].storage_class.should.equal("STANDARD")
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_non_existing_file():
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
src = "srcbucket"
|
|
target = "target"
|
|
s3.create_bucket(Bucket=src)
|
|
s3.create_bucket(Bucket=target)
|
|
|
|
s3_client = boto3.client("s3")
|
|
with pytest.raises(ClientError) as exc:
|
|
s3_client.copy_object(
|
|
Bucket=target, CopySource={"Bucket": src, "Key": "foofoofoo"}, Key="newkey"
|
|
)
|
|
err = exc.value.response["Error"]
|
|
err["Code"].should.equal("NoSuchKey")
|
|
err["Message"].should.equal("The specified key does not exist.")
|
|
err["Key"].should.equal("foofoofoo")
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_object_with_versioning():
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
client.create_bucket(
|
|
Bucket="blah", CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}
|
|
)
|
|
client.put_bucket_versioning(
|
|
Bucket="blah", VersioningConfiguration={"Status": "Enabled"}
|
|
)
|
|
|
|
client.put_object(Bucket="blah", Key="test1", Body=b"test1")
|
|
client.put_object(Bucket="blah", Key="test2", Body=b"test2")
|
|
|
|
client.get_object(Bucket="blah", Key="test1")["VersionId"]
|
|
obj2_version = client.get_object(Bucket="blah", Key="test2")["VersionId"]
|
|
|
|
client.copy_object(
|
|
CopySource={"Bucket": "blah", "Key": "test1"}, Bucket="blah", Key="test2"
|
|
)
|
|
obj2_version_new = client.get_object(Bucket="blah", Key="test2")["VersionId"]
|
|
|
|
# Version should be different to previous version
|
|
obj2_version_new.should_not.equal(obj2_version)
|
|
|
|
client.copy_object(
|
|
CopySource={"Bucket": "blah", "Key": "test2", "VersionId": obj2_version},
|
|
Bucket="blah",
|
|
Key="test3",
|
|
)
|
|
obj3_version_new = client.get_object(Bucket="blah", Key="test3")["VersionId"]
|
|
obj3_version_new.should_not.equal(obj2_version_new)
|
|
|
|
# Copy file that doesn't exist
|
|
with pytest.raises(ClientError) as e:
|
|
client.copy_object(
|
|
CopySource={"Bucket": "blah", "Key": "test4", "VersionId": obj2_version},
|
|
Bucket="blah",
|
|
Key="test5",
|
|
)
|
|
e.value.response["Error"]["Code"].should.equal("NoSuchKey")
|
|
|
|
response = client.create_multipart_upload(Bucket="blah", Key="test4")
|
|
upload_id = response["UploadId"]
|
|
response = client.upload_part_copy(
|
|
Bucket="blah",
|
|
Key="test4",
|
|
CopySource={"Bucket": "blah", "Key": "test3", "VersionId": obj3_version_new},
|
|
UploadId=upload_id,
|
|
PartNumber=1,
|
|
)
|
|
etag = response["CopyPartResult"]["ETag"]
|
|
client.complete_multipart_upload(
|
|
Bucket="blah",
|
|
Key="test4",
|
|
UploadId=upload_id,
|
|
MultipartUpload={"Parts": [{"ETag": etag, "PartNumber": 1}]},
|
|
)
|
|
|
|
response = client.get_object(Bucket="blah", Key="test4")
|
|
data = response["Body"].read()
|
|
data.should.equal(b"test2")
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_object_from_unversioned_to_versioned_bucket():
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
client.create_bucket(
|
|
Bucket="src", CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}
|
|
)
|
|
client.create_bucket(
|
|
Bucket="dest", CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}
|
|
)
|
|
client.put_bucket_versioning(
|
|
Bucket="dest", VersioningConfiguration={"Status": "Enabled"}
|
|
)
|
|
|
|
client.put_object(Bucket="src", Key="test", Body=b"content")
|
|
|
|
obj2_version_new = client.copy_object(
|
|
CopySource={"Bucket": "src", "Key": "test"}, Bucket="dest", Key="test"
|
|
).get("VersionId")
|
|
|
|
# VersionId should be present in the response
|
|
obj2_version_new.should_not.equal(None)
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_object_with_replacement_tagging():
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
client.create_bucket(Bucket="mybucket")
|
|
client.put_object(
|
|
Bucket="mybucket", Key="original", Body=b"test", Tagging="tag=old"
|
|
)
|
|
|
|
# using system tags will fail
|
|
with pytest.raises(ClientError) as err:
|
|
client.copy_object(
|
|
CopySource={"Bucket": "mybucket", "Key": "original"},
|
|
Bucket="mybucket",
|
|
Key="copy1",
|
|
TaggingDirective="REPLACE",
|
|
Tagging="aws:tag=invalid_key",
|
|
)
|
|
|
|
e = err.value
|
|
e.response["Error"]["Code"].should.equal("InvalidTag")
|
|
|
|
client.copy_object(
|
|
CopySource={"Bucket": "mybucket", "Key": "original"},
|
|
Bucket="mybucket",
|
|
Key="copy1",
|
|
TaggingDirective="REPLACE",
|
|
Tagging="tag=new",
|
|
)
|
|
client.copy_object(
|
|
CopySource={"Bucket": "mybucket", "Key": "original"},
|
|
Bucket="mybucket",
|
|
Key="copy2",
|
|
TaggingDirective="COPY",
|
|
)
|
|
|
|
tags1 = client.get_object_tagging(Bucket="mybucket", Key="copy1")["TagSet"]
|
|
tags1.should.equal([{"Key": "tag", "Value": "new"}])
|
|
tags2 = client.get_object_tagging(Bucket="mybucket", Key="copy2")["TagSet"]
|
|
tags2.should.equal([{"Key": "tag", "Value": "old"}])
|
|
|
|
|
|
@mock_s3
|
|
@mock_kms
|
|
def test_copy_object_with_kms_encryption():
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
kms_client = boto3.client("kms", region_name=DEFAULT_REGION_NAME)
|
|
kms_key = kms_client.create_key()["KeyMetadata"]["KeyId"]
|
|
|
|
client.create_bucket(
|
|
Bucket="blah", CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}
|
|
)
|
|
|
|
client.put_object(Bucket="blah", Key="test1", Body=b"test1")
|
|
|
|
client.copy_object(
|
|
CopySource={"Bucket": "blah", "Key": "test1"},
|
|
Bucket="blah",
|
|
Key="test2",
|
|
SSEKMSKeyId=kms_key,
|
|
ServerSideEncryption="aws:kms",
|
|
)
|
|
result = client.head_object(Bucket="blah", Key="test2")
|
|
assert result["SSEKMSKeyId"] == kms_key
|
|
assert result["ServerSideEncryption"] == "aws:kms"
|
|
|
|
|
|
@mock_s3
|
|
@mock_kms
|
|
def test_copy_object_in_place_with_encryption():
|
|
kms_client = boto3.client("kms", region_name=DEFAULT_REGION_NAME)
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
kms_key = kms_client.create_key()["KeyMetadata"]["KeyId"]
|
|
bucket = s3.Bucket("test_bucket")
|
|
bucket.create()
|
|
key = "source-key"
|
|
resp = client.put_object(
|
|
Bucket="test_bucket",
|
|
Key=key,
|
|
Body=b"somedata",
|
|
ServerSideEncryption="aws:kms",
|
|
BucketKeyEnabled=True,
|
|
SSEKMSKeyId=kms_key,
|
|
)
|
|
assert resp["BucketKeyEnabled"] is True
|
|
|
|
# assert that you can copy in place with the same Encryption settings
|
|
client.copy_object(
|
|
Bucket="test_bucket",
|
|
CopySource=f"test_bucket/{key}",
|
|
Key=key,
|
|
ServerSideEncryption="aws:kms",
|
|
BucketKeyEnabled=True,
|
|
SSEKMSKeyId=kms_key,
|
|
)
|
|
|
|
# assert that the BucketKeyEnabled setting is not kept in the destination key
|
|
resp = client.copy_object(
|
|
Bucket="test_bucket",
|
|
CopySource=f"test_bucket/{key}",
|
|
Key=key,
|
|
ServerSideEncryption="aws:kms",
|
|
SSEKMSKeyId=kms_key,
|
|
)
|
|
assert "BucketKeyEnabled" not in resp
|
|
|
|
# this is an edge case, if the source object SSE was not AES256, AWS allows you to not specify any fields
|
|
# as it will use AES256 by default and is different from the source key
|
|
resp = client.copy_object(
|
|
Bucket="test_bucket",
|
|
CopySource=f"test_bucket/{key}",
|
|
Key=key,
|
|
)
|
|
assert resp["ServerSideEncryption"] == "AES256"
|
|
|
|
# check that it allows copying in the place with the same ServerSideEncryption setting as the source
|
|
resp = client.copy_object(
|
|
Bucket="test_bucket",
|
|
CopySource=f"test_bucket/{key}",
|
|
Key=key,
|
|
ServerSideEncryption="AES256",
|
|
)
|
|
assert resp["ServerSideEncryption"] == "AES256"
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_object_in_place_with_storage_class():
|
|
# this test will validate that setting StorageClass (even the same as source) allows a copy in place
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
bucket_name = "test-bucket"
|
|
bucket = s3.Bucket(bucket_name)
|
|
bucket.create()
|
|
key = "source-key"
|
|
bucket.put_object(Key=key, Body=b"somedata", StorageClass="STANDARD")
|
|
client.copy_object(
|
|
Bucket=bucket_name,
|
|
CopySource=f"{bucket_name}/{key}",
|
|
Key=key,
|
|
StorageClass="STANDARD",
|
|
)
|
|
# verify that the copy worked
|
|
resp = client.get_object_attributes(
|
|
Bucket=bucket_name, Key=key, ObjectAttributes=["StorageClass"]
|
|
)
|
|
assert resp["StorageClass"] == "STANDARD"
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_object_does_not_copy_storage_class():
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
bucket = s3.Bucket("test_bucket")
|
|
bucket.create()
|
|
source_key = "source-key"
|
|
dest_key = "dest-key"
|
|
bucket.put_object(Key=source_key, Body=b"somedata", StorageClass="STANDARD_IA")
|
|
client.copy_object(
|
|
Bucket="test_bucket",
|
|
CopySource=f"test_bucket/{source_key}",
|
|
Key=dest_key,
|
|
)
|
|
|
|
# Verify that the destination key does not have STANDARD_IA as StorageClass
|
|
keys = dict([(k.key, k) for k in bucket.objects.all()])
|
|
keys[source_key].storage_class.should.equal("STANDARD_IA")
|
|
keys[dest_key].storage_class.should.equal("STANDARD")
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_object_does_not_copy_acl():
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
bucket_name = "testbucket"
|
|
bucket = s3.Bucket(bucket_name)
|
|
bucket.create()
|
|
source_key = "source-key"
|
|
dest_key = "dest-key"
|
|
control_key = "control-key"
|
|
# do not set ACL for the control key to get default ACL
|
|
bucket.put_object(Key=control_key, Body=b"somedata")
|
|
# set ACL for the source key to check if it will get copied
|
|
bucket.put_object(Key=source_key, Body=b"somedata", ACL="public-read")
|
|
# copy object without specifying ACL, so it should get default ACL
|
|
client.copy_object(
|
|
Bucket=bucket_name,
|
|
CopySource=f"{bucket_name}/{source_key}",
|
|
Key=dest_key,
|
|
)
|
|
|
|
# Get the ACL from the all the keys
|
|
source_acl = client.get_object_acl(Bucket=bucket_name, Key=source_key)
|
|
dest_acl = client.get_object_acl(Bucket=bucket_name, Key=dest_key)
|
|
default_acl = client.get_object_acl(Bucket=bucket_name, Key=control_key)
|
|
# assert that the source key ACL are different from the destination key ACL
|
|
assert source_acl["Grants"] != dest_acl["Grants"]
|
|
# assert that the copied key got the default ACL like the control key
|
|
assert default_acl["Grants"] == dest_acl["Grants"]
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_object_in_place_with_metadata():
|
|
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
bucket_name = "testbucket"
|
|
bucket = s3.Bucket(bucket_name)
|
|
bucket.create()
|
|
key_name = "source-key"
|
|
bucket.put_object(Key=key_name, Body=b"somedata")
|
|
|
|
# test that giving metadata is not enough, and should provide MetadataDirective=REPLACE on top
|
|
with pytest.raises(ClientError) as e:
|
|
client.copy_object(
|
|
Bucket=bucket_name,
|
|
CopySource=f"{bucket_name}/{key_name}",
|
|
Key=key_name,
|
|
Metadata={"key": "value"},
|
|
)
|
|
e.value.response["Error"]["Message"].should.equal(
|
|
"This copy request is illegal because it is trying to copy an object to itself without changing the object's metadata, storage class, website redirect location or encryption attributes."
|
|
)
|
|
|
|
# you can only provide MetadataDirective=REPLACE and it will copy without any metadata
|
|
client.copy_object(
|
|
Bucket=bucket_name,
|
|
CopySource=f"{bucket_name}/{key_name}",
|
|
Key=key_name,
|
|
MetadataDirective="REPLACE",
|
|
)
|
|
|
|
result = client.head_object(Bucket=bucket_name, Key=key_name)
|
|
assert result["Metadata"] == {}
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_objet_legal_hold():
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
bucket_name = "testbucket"
|
|
source_key = "source-key"
|
|
dest_key = "dest-key"
|
|
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
|
|
client.put_object(
|
|
Bucket=bucket_name,
|
|
Key=source_key,
|
|
Body=b"somedata",
|
|
ObjectLockLegalHoldStatus="ON",
|
|
)
|
|
|
|
head_object = client.head_object(Bucket=bucket_name, Key=source_key)
|
|
assert head_object["ObjectLockLegalHoldStatus"] == "ON"
|
|
assert "VersionId" in head_object
|
|
version_id = head_object["VersionId"]
|
|
|
|
resp = client.copy_object(
|
|
Bucket=bucket_name,
|
|
CopySource=f"{bucket_name}/{source_key}",
|
|
Key=dest_key,
|
|
)
|
|
assert resp["CopySourceVersionId"] == version_id
|
|
assert resp["VersionId"] != version_id
|
|
|
|
# the destination key did not keep the legal hold from the source key
|
|
head_object = client.head_object(Bucket=bucket_name, Key=dest_key)
|
|
assert "ObjectLockLegalHoldStatus" not in head_object
|
|
|
|
|
|
@mock_s3
|
|
def test_s3_copy_object_lock():
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
bucket_name = "testbucket"
|
|
source_key = "source-key"
|
|
dest_key = "dest-key"
|
|
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
|
|
# manipulate a bit the datetime object for an easier comparison
|
|
retain_until = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(
|
|
minutes=1
|
|
)
|
|
retain_until = retain_until.replace(microsecond=0)
|
|
|
|
client.put_object(
|
|
Bucket=bucket_name,
|
|
Key=source_key,
|
|
Body="test",
|
|
ObjectLockMode="GOVERNANCE",
|
|
ObjectLockRetainUntilDate=retain_until,
|
|
)
|
|
|
|
head_object = client.head_object(Bucket=bucket_name, Key=source_key)
|
|
|
|
assert head_object["ObjectLockMode"] == "GOVERNANCE"
|
|
assert head_object["ObjectLockRetainUntilDate"] == retain_until
|
|
assert "VersionId" in head_object
|
|
version_id = head_object["VersionId"]
|
|
|
|
resp = client.copy_object(
|
|
Bucket=bucket_name,
|
|
CopySource=f"{bucket_name}/{source_key}",
|
|
Key=dest_key,
|
|
)
|
|
assert resp["CopySourceVersionId"] == version_id
|
|
assert resp["VersionId"] != version_id
|
|
|
|
# the destination key did not keep the lock mode nor the lock until from the source key
|
|
head_object = client.head_object(Bucket=bucket_name, Key=dest_key)
|
|
assert "ObjectLockMode" not in head_object
|
|
assert "ObjectLockRetainUntilDate" not in head_object
|
|
|
|
|
|
@mock_s3
|
|
def test_copy_object_in_place_website_redirect_location():
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
bucket_name = "testbucket"
|
|
key = "source-key"
|
|
client.create_bucket(Bucket=bucket_name)
|
|
# this test will validate that setting WebsiteRedirectLocation (even the same as source) allows a copy in place
|
|
|
|
client.put_object(
|
|
Bucket=bucket_name,
|
|
Key=key,
|
|
Body="test",
|
|
WebsiteRedirectLocation="/test/direct",
|
|
)
|
|
|
|
head_object = client.head_object(Bucket=bucket_name, Key=key)
|
|
assert head_object["WebsiteRedirectLocation"] == "/test/direct"
|
|
|
|
# copy the object with the same WebsiteRedirectLocation as the source object
|
|
client.copy_object(
|
|
Bucket=bucket_name,
|
|
CopySource=f"{bucket_name}/{key}",
|
|
Key=key,
|
|
WebsiteRedirectLocation="/test/direct",
|
|
)
|
|
|
|
head_object = client.head_object(Bucket=bucket_name, Key=key)
|
|
assert head_object["WebsiteRedirectLocation"] == "/test/direct"
|