2023-05-01 09:14:31 +00:00
|
|
|
import datetime
|
|
|
|
|
2022-02-24 21:42:38 +00:00
|
|
|
import boto3
|
|
|
|
from botocore.client import ClientError
|
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
from moto import mock_s3, mock_kms
|
2022-02-24 21:42:38 +00:00
|
|
|
from moto.s3.responses import DEFAULT_REGION_NAME
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"key_name",
|
|
|
|
[
|
|
|
|
"the-key",
|
|
|
|
"the-unicode-💩-key",
|
|
|
|
"key-with?question-mark",
|
|
|
|
"key-with%2Fembedded%2Furl%2Fencoding",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_key_boto3(key_name):
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2022-02-24 21:42:38 +00:00
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource.create_bucket(Bucket="foobar")
|
2022-02-24 21:42:38 +00:00
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
key = s3_resource.Object("foobar", key_name)
|
2022-02-24 21:42:38 +00:00
|
|
|
key.put(Body=b"some value")
|
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
key2 = s3_resource.Object("foobar", "new-key")
|
2022-11-17 22:41:08 +00:00
|
|
|
key2.copy_from(CopySource=f"foobar/{key_name}")
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
resp = client.get_object(Bucket="foobar", Key=key_name)
|
2023-08-07 16:48:48 +00:00
|
|
|
assert resp["Body"].read() == b"some value"
|
2022-02-24 21:42:38 +00:00
|
|
|
resp = client.get_object(Bucket="foobar", Key="new-key")
|
2023-08-07 16:48:48 +00:00
|
|
|
assert resp["Body"].read() == b"some value"
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
|
2023-04-22 20:45:00 +00:00
|
|
|
@mock_s3
|
|
|
|
def test_copy_key_boto3_with_sha256_checksum():
|
|
|
|
# Setup
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2023-04-22 20:45:00 +00:00
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
key_name = "key"
|
|
|
|
new_key = "new_key"
|
|
|
|
bucket = "foobar"
|
2023-05-12 12:00:00 +00:00
|
|
|
expected_hash = "qz0H8xacy9DtbEtF3iFRn5+TjHLSQSSZiquUnOg7tRs="
|
2023-04-22 20:45:00 +00:00
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource.create_bucket(Bucket=bucket)
|
|
|
|
key = s3_resource.Object("foobar", key_name)
|
2023-04-22 20:45:00 +00:00
|
|
|
key.put(Body=b"some value")
|
|
|
|
|
|
|
|
# Execute
|
2023-08-07 16:48:48 +00:00
|
|
|
key2 = s3_resource.Object(bucket, new_key)
|
2023-04-22 20:45:00 +00:00
|
|
|
key2.copy(
|
|
|
|
CopySource={"Bucket": bucket, "Key": key_name},
|
|
|
|
ExtraArgs={"ChecksumAlgorithm": "SHA256"},
|
|
|
|
)
|
|
|
|
|
|
|
|
# Verify
|
|
|
|
resp = client.get_object_attributes(
|
|
|
|
Bucket=bucket, Key=new_key, ObjectAttributes=["Checksum"]
|
|
|
|
)
|
|
|
|
|
|
|
|
assert "Checksum" in resp
|
|
|
|
assert "ChecksumSHA256" in resp["Checksum"]
|
|
|
|
assert resp["Checksum"]["ChecksumSHA256"] == expected_hash
|
|
|
|
|
2023-05-10 14:59:55 +00:00
|
|
|
# Verify in place
|
|
|
|
copy_in_place = client.copy_object(
|
|
|
|
Bucket=bucket,
|
|
|
|
CopySource=f"{bucket}/{new_key}",
|
|
|
|
Key=new_key,
|
|
|
|
ChecksumAlgorithm="SHA256",
|
|
|
|
MetadataDirective="REPLACE",
|
|
|
|
)
|
|
|
|
|
|
|
|
assert "ChecksumSHA256" in copy_in_place["CopyObjectResult"]
|
|
|
|
assert copy_in_place["CopyObjectResult"]["ChecksumSHA256"] == expected_hash
|
|
|
|
|
2023-04-22 20:45:00 +00:00
|
|
|
|
2022-02-24 21:42:38 +00:00
|
|
|
@mock_s3
|
|
|
|
def test_copy_key_with_version_boto3():
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2022-02-24 21:42:38 +00:00
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource.create_bucket(Bucket="foobar")
|
2022-02-24 21:42:38 +00:00
|
|
|
client.put_bucket_versioning(
|
|
|
|
Bucket="foobar", VersioningConfiguration={"Status": "Enabled"}
|
|
|
|
)
|
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
key = s3_resource.Object("foobar", "the-key")
|
2022-02-24 21:42:38 +00:00
|
|
|
key.put(Body=b"some value")
|
|
|
|
key.put(Body=b"another value")
|
|
|
|
|
|
|
|
all_versions = client.list_object_versions(Bucket="foobar", Prefix="the-key")[
|
|
|
|
"Versions"
|
|
|
|
]
|
|
|
|
old_version = [v for v in all_versions if not v["IsLatest"]][0]
|
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
key2 = s3_resource.Object("foobar", "new-key")
|
2022-11-17 22:41:08 +00:00
|
|
|
key2.copy_from(CopySource=f"foobar/the-key?versionId={old_version['VersionId']}")
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
resp = client.get_object(Bucket="foobar", Key="the-key")
|
2023-08-07 16:48:48 +00:00
|
|
|
assert resp["Body"].read() == b"another value"
|
2022-02-24 21:42:38 +00:00
|
|
|
resp = client.get_object(Bucket="foobar", Key="new-key")
|
2023-08-07 16:48:48 +00:00
|
|
|
assert resp["Body"].read() == b"some value"
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_object_with_bucketkeyenabled_returns_the_value():
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2022-02-24 21:42:38 +00:00
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
bucket_name = "test-copy-object-with-bucketkeyenabled"
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource.create_bucket(Bucket=bucket_name)
|
2022-02-24 21:42:38 +00:00
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
key = s3_resource.Object(bucket_name, "the-key")
|
2022-02-24 21:42:38 +00:00
|
|
|
key.put(Body=b"some value")
|
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
key2 = s3_resource.Object(bucket_name, "new-key")
|
2022-02-24 21:42:38 +00:00
|
|
|
key2.copy_from(
|
|
|
|
CopySource=f"{bucket_name}/the-key",
|
|
|
|
BucketKeyEnabled=True,
|
|
|
|
ServerSideEncryption="aws:kms",
|
|
|
|
)
|
|
|
|
|
|
|
|
resp = client.get_object(Bucket=bucket_name, Key="the-key")
|
|
|
|
src_headers = resp["ResponseMetadata"]["HTTPHeaders"]
|
2023-08-07 16:48:48 +00:00
|
|
|
assert "x-amz-server-side-encryption" not in src_headers
|
|
|
|
assert "x-amz-server-side-encryption-aws-kms-key-id" not in src_headers
|
|
|
|
assert "x-amz-server-side-encryption-bucket-key-enabled" not in src_headers
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
resp = client.get_object(Bucket=bucket_name, Key="new-key")
|
|
|
|
target_headers = resp["ResponseMetadata"]["HTTPHeaders"]
|
2023-08-07 16:48:48 +00:00
|
|
|
assert "x-amz-server-side-encryption" in target_headers
|
2022-02-24 21:42:38 +00:00
|
|
|
# AWS will also return the KMS default key id - not yet implemented
|
2023-08-07 16:48:48 +00:00
|
|
|
# assert "x-amz-server-side-encryption-aws-kms-key-id" in target_headers
|
2022-02-24 21:42:38 +00:00
|
|
|
# This field is only returned if encryption is set to 'aws:kms'
|
2023-08-07 16:48:48 +00:00
|
|
|
assert "x-amz-server-side-encryption-bucket-key-enabled" in target_headers
|
|
|
|
assert (
|
|
|
|
str(target_headers["x-amz-server-side-encryption-bucket-key-enabled"]).lower()
|
|
|
|
== "true"
|
|
|
|
)
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_key_with_metadata():
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2022-02-24 21:42:38 +00:00
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource.create_bucket(Bucket="foobar")
|
2022-02-24 21:42:38 +00:00
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
key = s3_resource.Object("foobar", "the-key")
|
2022-02-24 21:42:38 +00:00
|
|
|
metadata = {"md": "Metadatastring"}
|
|
|
|
content_type = "application/json"
|
|
|
|
initial = key.put(Body=b"{}", Metadata=metadata, ContentType=content_type)
|
|
|
|
|
|
|
|
client.copy_object(Bucket="foobar", CopySource="foobar/the-key", Key="new-key")
|
|
|
|
|
|
|
|
resp = client.get_object(Bucket="foobar", Key="new-key")
|
2023-08-07 16:48:48 +00:00
|
|
|
assert resp["Metadata"] == metadata
|
|
|
|
assert resp["ContentType"] == content_type
|
|
|
|
assert resp["ETag"] == initial["ETag"]
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_key_replace_metadata():
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2022-02-24 21:42:38 +00:00
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource.create_bucket(Bucket="foobar")
|
2022-02-24 21:42:38 +00:00
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
key = s3_resource.Object("foobar", "the-key")
|
2022-02-24 21:42:38 +00:00
|
|
|
initial = key.put(Body=b"some value", Metadata={"md": "Metadatastring"})
|
|
|
|
|
|
|
|
client.copy_object(
|
|
|
|
Bucket="foobar",
|
|
|
|
CopySource="foobar/the-key",
|
|
|
|
Key="new-key",
|
|
|
|
Metadata={"momd": "Mometadatastring"},
|
|
|
|
MetadataDirective="REPLACE",
|
|
|
|
)
|
|
|
|
|
|
|
|
resp = client.get_object(Bucket="foobar", Key="new-key")
|
2023-08-07 16:48:48 +00:00
|
|
|
assert resp["Metadata"] == {"momd": "Mometadatastring"}
|
|
|
|
assert resp["ETag"] == initial["ETag"]
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_key_without_changes_should_error():
|
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
bucket_name = "my_bucket"
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2022-02-24 21:42:38 +00:00
|
|
|
key_name = "my_key"
|
2023-08-07 16:48:48 +00:00
|
|
|
key = s3_resource.Object(bucket_name, key_name)
|
2022-02-24 21:42:38 +00:00
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource.create_bucket(Bucket=bucket_name)
|
2022-02-24 21:42:38 +00:00
|
|
|
key.put(Body=b"some value")
|
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
with pytest.raises(ClientError) as exc:
|
2022-02-24 21:42:38 +00:00
|
|
|
client.copy_object(
|
|
|
|
Bucket=bucket_name,
|
2022-11-17 22:41:08 +00:00
|
|
|
CopySource=f"{bucket_name}/{key_name}",
|
2022-02-24 21:42:38 +00:00
|
|
|
Key=key_name,
|
|
|
|
)
|
2023-08-07 16:48:48 +00:00
|
|
|
assert exc.value.response["Error"]["Message"] == (
|
|
|
|
"This copy request is illegal because it is trying to copy an "
|
|
|
|
"object to itself without changing the object's metadata, storage "
|
|
|
|
"class, website redirect location or encryption attributes."
|
2022-02-24 21:42:38 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_key_without_changes_should_not_error():
|
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
bucket_name = "my_bucket"
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2022-02-24 21:42:38 +00:00
|
|
|
key_name = "my_key"
|
2023-08-07 16:48:48 +00:00
|
|
|
key = s3_resource.Object(bucket_name, key_name)
|
2022-02-24 21:42:38 +00:00
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource.create_bucket(Bucket=bucket_name)
|
2022-02-24 21:42:38 +00:00
|
|
|
key.put(Body=b"some value")
|
|
|
|
|
|
|
|
client.copy_object(
|
|
|
|
Bucket=bucket_name,
|
2022-11-17 22:41:08 +00:00
|
|
|
CopySource=f"{bucket_name}/{key_name}",
|
2022-02-24 21:42:38 +00:00
|
|
|
Key=key_name,
|
|
|
|
Metadata={"some-key": "some-value"},
|
|
|
|
MetadataDirective="REPLACE",
|
|
|
|
)
|
|
|
|
|
|
|
|
new_object = client.get_object(Bucket=bucket_name, Key=key_name)
|
|
|
|
|
|
|
|
assert new_object["Metadata"] == {"some-key": "some-value"}
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_key_reduced_redundancy():
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2022-02-24 21:42:38 +00:00
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
2023-08-07 16:48:48 +00:00
|
|
|
bucket = s3_resource.Bucket("test_bucket")
|
2022-02-24 21:42:38 +00:00
|
|
|
bucket.create()
|
|
|
|
|
|
|
|
bucket.put_object(Key="the-key", Body=b"somedata")
|
|
|
|
|
|
|
|
client.copy_object(
|
|
|
|
Bucket="test_bucket",
|
|
|
|
CopySource="test_bucket/the-key",
|
|
|
|
Key="new-key",
|
|
|
|
StorageClass="REDUCED_REDUNDANCY",
|
|
|
|
)
|
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
keys = {k.key: k for k in bucket.objects.all()}
|
|
|
|
assert keys["new-key"].storage_class == "REDUCED_REDUNDANCY"
|
|
|
|
assert keys["the-key"].storage_class == "STANDARD"
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_non_existing_file():
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2022-02-24 21:42:38 +00:00
|
|
|
src = "srcbucket"
|
|
|
|
target = "target"
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource.create_bucket(Bucket=src)
|
|
|
|
s3_resource.create_bucket(Bucket=target)
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
s3_client = boto3.client("s3")
|
|
|
|
with pytest.raises(ClientError) as exc:
|
|
|
|
s3_client.copy_object(
|
|
|
|
Bucket=target, CopySource={"Bucket": src, "Key": "foofoofoo"}, Key="newkey"
|
|
|
|
)
|
|
|
|
err = exc.value.response["Error"]
|
2023-08-07 16:48:48 +00:00
|
|
|
assert err["Code"] == "NoSuchKey"
|
|
|
|
assert err["Message"] == "The specified key does not exist."
|
|
|
|
assert err["Key"] == "foofoofoo"
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_object_with_versioning():
|
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
|
|
|
|
client.create_bucket(
|
|
|
|
Bucket="blah", CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}
|
|
|
|
)
|
|
|
|
client.put_bucket_versioning(
|
|
|
|
Bucket="blah", VersioningConfiguration={"Status": "Enabled"}
|
|
|
|
)
|
|
|
|
|
|
|
|
client.put_object(Bucket="blah", Key="test1", Body=b"test1")
|
|
|
|
client.put_object(Bucket="blah", Key="test2", Body=b"test2")
|
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
_ = client.get_object(Bucket="blah", Key="test1")["VersionId"]
|
2022-02-24 21:42:38 +00:00
|
|
|
obj2_version = client.get_object(Bucket="blah", Key="test2")["VersionId"]
|
|
|
|
|
|
|
|
client.copy_object(
|
|
|
|
CopySource={"Bucket": "blah", "Key": "test1"}, Bucket="blah", Key="test2"
|
|
|
|
)
|
|
|
|
obj2_version_new = client.get_object(Bucket="blah", Key="test2")["VersionId"]
|
|
|
|
|
|
|
|
# Version should be different to previous version
|
2023-08-07 16:48:48 +00:00
|
|
|
assert obj2_version_new != obj2_version
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
client.copy_object(
|
|
|
|
CopySource={"Bucket": "blah", "Key": "test2", "VersionId": obj2_version},
|
|
|
|
Bucket="blah",
|
|
|
|
Key="test3",
|
|
|
|
)
|
|
|
|
obj3_version_new = client.get_object(Bucket="blah", Key="test3")["VersionId"]
|
2023-08-07 16:48:48 +00:00
|
|
|
assert obj3_version_new != obj2_version_new
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
# Copy file that doesn't exist
|
2023-08-07 16:48:48 +00:00
|
|
|
with pytest.raises(ClientError) as exc:
|
2022-02-24 21:42:38 +00:00
|
|
|
client.copy_object(
|
|
|
|
CopySource={"Bucket": "blah", "Key": "test4", "VersionId": obj2_version},
|
|
|
|
Bucket="blah",
|
|
|
|
Key="test5",
|
|
|
|
)
|
2023-08-07 16:48:48 +00:00
|
|
|
assert exc.value.response["Error"]["Code"] == "NoSuchKey"
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
response = client.create_multipart_upload(Bucket="blah", Key="test4")
|
|
|
|
upload_id = response["UploadId"]
|
|
|
|
response = client.upload_part_copy(
|
|
|
|
Bucket="blah",
|
|
|
|
Key="test4",
|
|
|
|
CopySource={"Bucket": "blah", "Key": "test3", "VersionId": obj3_version_new},
|
|
|
|
UploadId=upload_id,
|
|
|
|
PartNumber=1,
|
|
|
|
)
|
|
|
|
etag = response["CopyPartResult"]["ETag"]
|
|
|
|
client.complete_multipart_upload(
|
|
|
|
Bucket="blah",
|
|
|
|
Key="test4",
|
|
|
|
UploadId=upload_id,
|
|
|
|
MultipartUpload={"Parts": [{"ETag": etag, "PartNumber": 1}]},
|
|
|
|
)
|
|
|
|
|
|
|
|
response = client.get_object(Bucket="blah", Key="test4")
|
|
|
|
data = response["Body"].read()
|
2023-08-07 16:48:48 +00:00
|
|
|
assert data == b"test2"
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_object_from_unversioned_to_versioned_bucket():
|
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
|
|
|
|
client.create_bucket(
|
|
|
|
Bucket="src", CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}
|
|
|
|
)
|
|
|
|
client.create_bucket(
|
|
|
|
Bucket="dest", CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}
|
|
|
|
)
|
|
|
|
client.put_bucket_versioning(
|
|
|
|
Bucket="dest", VersioningConfiguration={"Status": "Enabled"}
|
|
|
|
)
|
|
|
|
|
|
|
|
client.put_object(Bucket="src", Key="test", Body=b"content")
|
|
|
|
|
|
|
|
obj2_version_new = client.copy_object(
|
|
|
|
CopySource={"Bucket": "src", "Key": "test"}, Bucket="dest", Key="test"
|
|
|
|
).get("VersionId")
|
|
|
|
|
|
|
|
# VersionId should be present in the response
|
2023-08-07 16:48:48 +00:00
|
|
|
assert obj2_version_new is not None
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_object_with_replacement_tagging():
|
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
client.create_bucket(Bucket="mybucket")
|
|
|
|
client.put_object(
|
|
|
|
Bucket="mybucket", Key="original", Body=b"test", Tagging="tag=old"
|
|
|
|
)
|
|
|
|
|
|
|
|
# using system tags will fail
|
|
|
|
with pytest.raises(ClientError) as err:
|
|
|
|
client.copy_object(
|
|
|
|
CopySource={"Bucket": "mybucket", "Key": "original"},
|
|
|
|
Bucket="mybucket",
|
|
|
|
Key="copy1",
|
|
|
|
TaggingDirective="REPLACE",
|
|
|
|
Tagging="aws:tag=invalid_key",
|
|
|
|
)
|
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
exc = err.value
|
|
|
|
assert exc.response["Error"]["Code"] == "InvalidTag"
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
client.copy_object(
|
|
|
|
CopySource={"Bucket": "mybucket", "Key": "original"},
|
|
|
|
Bucket="mybucket",
|
|
|
|
Key="copy1",
|
|
|
|
TaggingDirective="REPLACE",
|
|
|
|
Tagging="tag=new",
|
|
|
|
)
|
|
|
|
client.copy_object(
|
|
|
|
CopySource={"Bucket": "mybucket", "Key": "original"},
|
|
|
|
Bucket="mybucket",
|
|
|
|
Key="copy2",
|
|
|
|
TaggingDirective="COPY",
|
|
|
|
)
|
|
|
|
|
|
|
|
tags1 = client.get_object_tagging(Bucket="mybucket", Key="copy1")["TagSet"]
|
2023-08-07 16:48:48 +00:00
|
|
|
assert tags1 == [{"Key": "tag", "Value": "new"}]
|
2022-02-24 21:42:38 +00:00
|
|
|
tags2 = client.get_object_tagging(Bucket="mybucket", Key="copy2")["TagSet"]
|
2023-08-07 16:48:48 +00:00
|
|
|
assert tags2 == [{"Key": "tag", "Value": "old"}]
|
2022-02-24 21:42:38 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
@mock_kms
|
|
|
|
def test_copy_object_with_kms_encryption():
|
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
kms_client = boto3.client("kms", region_name=DEFAULT_REGION_NAME)
|
|
|
|
kms_key = kms_client.create_key()["KeyMetadata"]["KeyId"]
|
|
|
|
|
|
|
|
client.create_bucket(
|
|
|
|
Bucket="blah", CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}
|
|
|
|
)
|
|
|
|
|
|
|
|
client.put_object(Bucket="blah", Key="test1", Body=b"test1")
|
|
|
|
|
|
|
|
client.copy_object(
|
|
|
|
CopySource={"Bucket": "blah", "Key": "test1"},
|
|
|
|
Bucket="blah",
|
|
|
|
Key="test2",
|
|
|
|
SSEKMSKeyId=kms_key,
|
|
|
|
ServerSideEncryption="aws:kms",
|
|
|
|
)
|
|
|
|
result = client.head_object(Bucket="blah", Key="test2")
|
|
|
|
assert result["SSEKMSKeyId"] == kms_key
|
|
|
|
assert result["ServerSideEncryption"] == "aws:kms"
|
2023-05-01 09:14:31 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
@mock_kms
|
|
|
|
def test_copy_object_in_place_with_encryption():
|
|
|
|
kms_client = boto3.client("kms", region_name=DEFAULT_REGION_NAME)
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2023-05-01 09:14:31 +00:00
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
kms_key = kms_client.create_key()["KeyMetadata"]["KeyId"]
|
2023-08-07 16:48:48 +00:00
|
|
|
bucket = s3_resource.Bucket("test_bucket")
|
2023-05-01 09:14:31 +00:00
|
|
|
bucket.create()
|
|
|
|
key = "source-key"
|
|
|
|
resp = client.put_object(
|
|
|
|
Bucket="test_bucket",
|
|
|
|
Key=key,
|
|
|
|
Body=b"somedata",
|
|
|
|
ServerSideEncryption="aws:kms",
|
|
|
|
BucketKeyEnabled=True,
|
|
|
|
SSEKMSKeyId=kms_key,
|
|
|
|
)
|
|
|
|
assert resp["BucketKeyEnabled"] is True
|
|
|
|
|
|
|
|
# assert that you can copy in place with the same Encryption settings
|
|
|
|
client.copy_object(
|
|
|
|
Bucket="test_bucket",
|
|
|
|
CopySource=f"test_bucket/{key}",
|
|
|
|
Key=key,
|
|
|
|
ServerSideEncryption="aws:kms",
|
|
|
|
BucketKeyEnabled=True,
|
|
|
|
SSEKMSKeyId=kms_key,
|
|
|
|
)
|
|
|
|
|
|
|
|
# assert that the BucketKeyEnabled setting is not kept in the destination key
|
|
|
|
resp = client.copy_object(
|
|
|
|
Bucket="test_bucket",
|
|
|
|
CopySource=f"test_bucket/{key}",
|
|
|
|
Key=key,
|
|
|
|
ServerSideEncryption="aws:kms",
|
|
|
|
SSEKMSKeyId=kms_key,
|
|
|
|
)
|
|
|
|
assert "BucketKeyEnabled" not in resp
|
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
# This is an edge case, if the source object SSE was not AES256,
|
|
|
|
# AWS allows you to not specify any fields as it will use AES256 by
|
|
|
|
# default and is different from the source key.
|
2023-05-01 09:14:31 +00:00
|
|
|
resp = client.copy_object(
|
|
|
|
Bucket="test_bucket",
|
|
|
|
CopySource=f"test_bucket/{key}",
|
|
|
|
Key=key,
|
|
|
|
)
|
|
|
|
assert resp["ServerSideEncryption"] == "AES256"
|
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
# Check that it allows copying in the place with the same
|
|
|
|
# ServerSideEncryption setting as the source.
|
2023-05-01 09:14:31 +00:00
|
|
|
resp = client.copy_object(
|
|
|
|
Bucket="test_bucket",
|
|
|
|
CopySource=f"test_bucket/{key}",
|
|
|
|
Key=key,
|
|
|
|
ServerSideEncryption="AES256",
|
|
|
|
)
|
|
|
|
assert resp["ServerSideEncryption"] == "AES256"
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_object_in_place_with_storage_class():
|
2023-08-07 16:48:48 +00:00
|
|
|
"""Validate setting StorageClass allows a copy in place.
|
|
|
|
|
|
|
|
This should be true even if destination object is the same as source.
|
|
|
|
"""
|
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2023-05-01 09:14:31 +00:00
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
bucket_name = "test-bucket"
|
2023-08-07 16:48:48 +00:00
|
|
|
bucket = s3_resource.Bucket(bucket_name)
|
2023-05-01 09:14:31 +00:00
|
|
|
bucket.create()
|
|
|
|
key = "source-key"
|
|
|
|
bucket.put_object(Key=key, Body=b"somedata", StorageClass="STANDARD")
|
|
|
|
client.copy_object(
|
|
|
|
Bucket=bucket_name,
|
|
|
|
CopySource=f"{bucket_name}/{key}",
|
|
|
|
Key=key,
|
|
|
|
StorageClass="STANDARD",
|
|
|
|
)
|
|
|
|
# verify that the copy worked
|
|
|
|
resp = client.get_object_attributes(
|
|
|
|
Bucket=bucket_name, Key=key, ObjectAttributes=["StorageClass"]
|
|
|
|
)
|
|
|
|
assert resp["StorageClass"] == "STANDARD"
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_object_does_not_copy_storage_class():
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2023-05-01 09:14:31 +00:00
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
2023-08-07 16:48:48 +00:00
|
|
|
bucket = s3_resource.Bucket("test_bucket")
|
2023-05-01 09:14:31 +00:00
|
|
|
bucket.create()
|
|
|
|
source_key = "source-key"
|
|
|
|
dest_key = "dest-key"
|
|
|
|
bucket.put_object(Key=source_key, Body=b"somedata", StorageClass="STANDARD_IA")
|
|
|
|
client.copy_object(
|
|
|
|
Bucket="test_bucket",
|
|
|
|
CopySource=f"test_bucket/{source_key}",
|
|
|
|
Key=dest_key,
|
|
|
|
)
|
|
|
|
|
|
|
|
# Verify that the destination key does not have STANDARD_IA as StorageClass
|
2023-08-07 16:48:48 +00:00
|
|
|
keys = {k.key: k for k in bucket.objects.all()}
|
|
|
|
assert keys[source_key].storage_class == "STANDARD_IA"
|
|
|
|
assert keys[dest_key].storage_class == "STANDARD"
|
2023-05-01 09:14:31 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_object_does_not_copy_acl():
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2023-05-01 09:14:31 +00:00
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
bucket_name = "testbucket"
|
2023-08-07 16:48:48 +00:00
|
|
|
bucket = s3_resource.Bucket(bucket_name)
|
2023-05-01 09:14:31 +00:00
|
|
|
bucket.create()
|
|
|
|
source_key = "source-key"
|
|
|
|
dest_key = "dest-key"
|
|
|
|
control_key = "control-key"
|
|
|
|
# do not set ACL for the control key to get default ACL
|
|
|
|
bucket.put_object(Key=control_key, Body=b"somedata")
|
|
|
|
# set ACL for the source key to check if it will get copied
|
|
|
|
bucket.put_object(Key=source_key, Body=b"somedata", ACL="public-read")
|
|
|
|
# copy object without specifying ACL, so it should get default ACL
|
|
|
|
client.copy_object(
|
|
|
|
Bucket=bucket_name,
|
|
|
|
CopySource=f"{bucket_name}/{source_key}",
|
|
|
|
Key=dest_key,
|
|
|
|
)
|
|
|
|
|
|
|
|
# Get the ACL from the all the keys
|
|
|
|
source_acl = client.get_object_acl(Bucket=bucket_name, Key=source_key)
|
|
|
|
dest_acl = client.get_object_acl(Bucket=bucket_name, Key=dest_key)
|
|
|
|
default_acl = client.get_object_acl(Bucket=bucket_name, Key=control_key)
|
|
|
|
# assert that the source key ACL are different from the destination key ACL
|
|
|
|
assert source_acl["Grants"] != dest_acl["Grants"]
|
|
|
|
# assert that the copied key got the default ACL like the control key
|
|
|
|
assert default_acl["Grants"] == dest_acl["Grants"]
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_object_in_place_with_metadata():
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
|
2023-05-01 09:14:31 +00:00
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
bucket_name = "testbucket"
|
2023-08-07 16:48:48 +00:00
|
|
|
bucket = s3_resource.Bucket(bucket_name)
|
2023-05-01 09:14:31 +00:00
|
|
|
bucket.create()
|
|
|
|
key_name = "source-key"
|
|
|
|
bucket.put_object(Key=key_name, Body=b"somedata")
|
|
|
|
|
2023-08-07 16:48:48 +00:00
|
|
|
# test that giving metadata is not enough and should provide
|
|
|
|
# MetadataDirective=REPLACE on top.
|
|
|
|
with pytest.raises(ClientError) as exc:
|
2023-05-01 09:14:31 +00:00
|
|
|
client.copy_object(
|
|
|
|
Bucket=bucket_name,
|
|
|
|
CopySource=f"{bucket_name}/{key_name}",
|
|
|
|
Key=key_name,
|
|
|
|
Metadata={"key": "value"},
|
|
|
|
)
|
2023-08-07 16:48:48 +00:00
|
|
|
assert exc.value.response["Error"]["Message"] == (
|
|
|
|
"This copy request is illegal because it is trying to copy an "
|
|
|
|
"object to itself without changing the object's metadata, "
|
|
|
|
"storage class, website redirect location or encryption attributes."
|
2023-05-01 09:14:31 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
# you can only provide MetadataDirective=REPLACE and it will copy without any metadata
|
|
|
|
client.copy_object(
|
|
|
|
Bucket=bucket_name,
|
|
|
|
CopySource=f"{bucket_name}/{key_name}",
|
|
|
|
Key=key_name,
|
|
|
|
MetadataDirective="REPLACE",
|
|
|
|
)
|
|
|
|
|
|
|
|
result = client.head_object(Bucket=bucket_name, Key=key_name)
|
|
|
|
assert result["Metadata"] == {}
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_objet_legal_hold():
|
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
bucket_name = "testbucket"
|
|
|
|
source_key = "source-key"
|
|
|
|
dest_key = "dest-key"
|
|
|
|
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
|
|
|
|
client.put_object(
|
|
|
|
Bucket=bucket_name,
|
|
|
|
Key=source_key,
|
|
|
|
Body=b"somedata",
|
|
|
|
ObjectLockLegalHoldStatus="ON",
|
|
|
|
)
|
|
|
|
|
|
|
|
head_object = client.head_object(Bucket=bucket_name, Key=source_key)
|
|
|
|
assert head_object["ObjectLockLegalHoldStatus"] == "ON"
|
|
|
|
assert "VersionId" in head_object
|
|
|
|
version_id = head_object["VersionId"]
|
|
|
|
|
|
|
|
resp = client.copy_object(
|
|
|
|
Bucket=bucket_name,
|
|
|
|
CopySource=f"{bucket_name}/{source_key}",
|
|
|
|
Key=dest_key,
|
|
|
|
)
|
|
|
|
assert resp["CopySourceVersionId"] == version_id
|
|
|
|
assert resp["VersionId"] != version_id
|
|
|
|
|
|
|
|
# the destination key did not keep the legal hold from the source key
|
|
|
|
head_object = client.head_object(Bucket=bucket_name, Key=dest_key)
|
|
|
|
assert "ObjectLockLegalHoldStatus" not in head_object
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_s3_copy_object_lock():
|
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
bucket_name = "testbucket"
|
|
|
|
source_key = "source-key"
|
|
|
|
dest_key = "dest-key"
|
|
|
|
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
|
|
|
|
# manipulate a bit the datetime object for an easier comparison
|
|
|
|
retain_until = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(
|
|
|
|
minutes=1
|
|
|
|
)
|
|
|
|
retain_until = retain_until.replace(microsecond=0)
|
|
|
|
|
|
|
|
client.put_object(
|
|
|
|
Bucket=bucket_name,
|
|
|
|
Key=source_key,
|
|
|
|
Body="test",
|
|
|
|
ObjectLockMode="GOVERNANCE",
|
|
|
|
ObjectLockRetainUntilDate=retain_until,
|
|
|
|
)
|
|
|
|
|
|
|
|
head_object = client.head_object(Bucket=bucket_name, Key=source_key)
|
|
|
|
|
|
|
|
assert head_object["ObjectLockMode"] == "GOVERNANCE"
|
|
|
|
assert head_object["ObjectLockRetainUntilDate"] == retain_until
|
|
|
|
assert "VersionId" in head_object
|
|
|
|
version_id = head_object["VersionId"]
|
|
|
|
|
|
|
|
resp = client.copy_object(
|
|
|
|
Bucket=bucket_name,
|
|
|
|
CopySource=f"{bucket_name}/{source_key}",
|
|
|
|
Key=dest_key,
|
|
|
|
)
|
|
|
|
assert resp["CopySourceVersionId"] == version_id
|
|
|
|
assert resp["VersionId"] != version_id
|
|
|
|
|
|
|
|
# the destination key did not keep the lock mode nor the lock until from the source key
|
|
|
|
head_object = client.head_object(Bucket=bucket_name, Key=dest_key)
|
|
|
|
assert "ObjectLockMode" not in head_object
|
|
|
|
assert "ObjectLockRetainUntilDate" not in head_object
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_object_in_place_website_redirect_location():
|
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
bucket_name = "testbucket"
|
|
|
|
key = "source-key"
|
|
|
|
client.create_bucket(Bucket=bucket_name)
|
2023-08-07 16:48:48 +00:00
|
|
|
# This test will validate that setting WebsiteRedirectLocation
|
|
|
|
# (even the same as source) allows a copy in place.
|
2023-05-01 09:14:31 +00:00
|
|
|
|
|
|
|
client.put_object(
|
|
|
|
Bucket=bucket_name,
|
|
|
|
Key=key,
|
|
|
|
Body="test",
|
|
|
|
WebsiteRedirectLocation="/test/direct",
|
|
|
|
)
|
|
|
|
|
|
|
|
head_object = client.head_object(Bucket=bucket_name, Key=key)
|
|
|
|
assert head_object["WebsiteRedirectLocation"] == "/test/direct"
|
|
|
|
|
|
|
|
# copy the object with the same WebsiteRedirectLocation as the source object
|
|
|
|
client.copy_object(
|
|
|
|
Bucket=bucket_name,
|
|
|
|
CopySource=f"{bucket_name}/{key}",
|
|
|
|
Key=key,
|
|
|
|
WebsiteRedirectLocation="/test/direct",
|
|
|
|
)
|
|
|
|
|
|
|
|
head_object = client.head_object(Bucket=bucket_name, Key=key)
|
|
|
|
assert head_object["WebsiteRedirectLocation"] == "/test/direct"
|
2023-05-10 10:03:03 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_object_in_place_with_bucket_encryption():
|
|
|
|
# If a bucket has encryption configured, it will allow copy in place per default
|
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
bucket_name = "test-bucket"
|
|
|
|
client.create_bucket(Bucket=bucket_name)
|
|
|
|
key = "source-key"
|
|
|
|
|
|
|
|
response = client.put_bucket_encryption(
|
|
|
|
Bucket=bucket_name,
|
|
|
|
ServerSideEncryptionConfiguration={
|
|
|
|
"Rules": [
|
|
|
|
{
|
|
|
|
"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"},
|
|
|
|
"BucketKeyEnabled": False,
|
|
|
|
},
|
|
|
|
]
|
|
|
|
},
|
|
|
|
)
|
|
|
|
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
|
|
|
|
|
|
|
|
response = client.put_object(
|
|
|
|
Body=b"",
|
|
|
|
Bucket=bucket_name,
|
|
|
|
Key=key,
|
|
|
|
)
|
|
|
|
assert response["ServerSideEncryption"] == "AES256"
|
|
|
|
|
|
|
|
response = client.copy_object(
|
|
|
|
Bucket=bucket_name,
|
|
|
|
CopySource={"Bucket": bucket_name, "Key": key},
|
|
|
|
Key=key,
|
|
|
|
)
|
|
|
|
assert response["ServerSideEncryption"] == "AES256"
|
2023-05-12 12:00:00 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"algorithm",
|
|
|
|
["CRC32", "SHA1", "SHA256"],
|
|
|
|
)
|
|
|
|
def test_copy_key_boto3_with_both_sha256_checksum(algorithm):
|
2023-08-07 16:48:48 +00:00
|
|
|
"""Validate that moto S3 checksum calculations are correct.
|
|
|
|
|
|
|
|
We first create an object with a Checksum calculated by boto, by
|
|
|
|
specifying ChecksumAlgorithm="SHA256".
|
|
|
|
|
|
|
|
We then retrieve the right checksum from this request.
|
|
|
|
|
|
|
|
We copy the object while requesting moto to recalculate the checksum
|
|
|
|
for that key.
|
|
|
|
|
|
|
|
We verify that both checksums are equal.
|
|
|
|
"""
|
2023-05-12 12:00:00 +00:00
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
source_key = "source-key"
|
|
|
|
dest_key = "dest-key"
|
|
|
|
bucket = "foobar"
|
|
|
|
body = b"checksum-test"
|
|
|
|
client.create_bucket(Bucket=bucket)
|
|
|
|
|
|
|
|
checksum_key = f"Checksum{algorithm}"
|
|
|
|
|
|
|
|
resp = client.put_object(
|
|
|
|
Bucket=bucket,
|
|
|
|
Key=source_key,
|
|
|
|
Body=body,
|
|
|
|
ChecksumAlgorithm=algorithm,
|
|
|
|
)
|
|
|
|
assert checksum_key in resp
|
|
|
|
checksum_by_boto = resp[checksum_key]
|
|
|
|
|
|
|
|
resp = client.copy_object(
|
|
|
|
Bucket=bucket,
|
|
|
|
CopySource=f"{bucket}/{source_key}",
|
|
|
|
Key=dest_key,
|
|
|
|
ChecksumAlgorithm=algorithm,
|
|
|
|
)
|
|
|
|
|
|
|
|
assert checksum_key in resp["CopyObjectResult"]
|
|
|
|
assert resp["CopyObjectResult"][checksum_key] == checksum_by_boto
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"algorithm, checksum",
|
|
|
|
[
|
|
|
|
("CRC32", "lVk/nw=="),
|
|
|
|
("SHA1", "jbXkHAsXUrubtL3dqDQ4w+7WXc0="),
|
|
|
|
("SHA256", "1YQo81vx2VFUl0q5ccWISq8AkSBQQ0WO80S82TmfdIQ="),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_copy_object_calculates_checksum(algorithm, checksum):
|
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
source_key = "source-key"
|
|
|
|
dest_key = "dest-key"
|
|
|
|
bucket = "foobar"
|
|
|
|
body = b"test-checksum"
|
|
|
|
client.create_bucket(Bucket=bucket)
|
|
|
|
|
|
|
|
checksum_key = f"Checksum{algorithm}"
|
|
|
|
|
|
|
|
resp = client.put_object(
|
|
|
|
Bucket=bucket,
|
|
|
|
Key=source_key,
|
|
|
|
Body=body,
|
|
|
|
)
|
|
|
|
assert checksum_key not in resp
|
|
|
|
|
|
|
|
resp = client.copy_object(
|
|
|
|
Bucket=bucket,
|
|
|
|
CopySource=f"{bucket}/{source_key}",
|
|
|
|
Key=dest_key,
|
|
|
|
ChecksumAlgorithm=algorithm,
|
|
|
|
)
|
|
|
|
|
|
|
|
assert checksum_key in resp["CopyObjectResult"]
|
|
|
|
assert resp["CopyObjectResult"][checksum_key] == checksum
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3
|
|
|
|
def test_copy_object_keeps_checksum():
|
|
|
|
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
|
|
|
|
source_key = "source-key"
|
|
|
|
dest_key = "dest-key"
|
|
|
|
bucket = "foobar"
|
|
|
|
body = b"test-checksum"
|
|
|
|
expected_checksum = "1YQo81vx2VFUl0q5ccWISq8AkSBQQ0WO80S82TmfdIQ="
|
|
|
|
client.create_bucket(Bucket=bucket)
|
|
|
|
|
|
|
|
# put an object with a checksum
|
|
|
|
resp = client.put_object(
|
|
|
|
Bucket=bucket,
|
|
|
|
Key=source_key,
|
|
|
|
Body=body,
|
|
|
|
ChecksumAlgorithm="SHA256",
|
|
|
|
)
|
|
|
|
assert "ChecksumSHA256" in resp
|
|
|
|
assert resp["ChecksumSHA256"] == expected_checksum
|
|
|
|
|
|
|
|
# do not specify the checksum
|
|
|
|
resp = client.copy_object(
|
|
|
|
Bucket=bucket,
|
|
|
|
CopySource=f"{bucket}/{source_key}",
|
|
|
|
Key=dest_key,
|
|
|
|
)
|
|
|
|
|
|
|
|
# assert that it kept the checksum from the source key
|
|
|
|
assert "ChecksumSHA256" in resp["CopyObjectResult"]
|
|
|
|
assert resp["CopyObjectResult"]["ChecksumSHA256"] == expected_checksum
|