fix: S3 CopyObjects with embedded percent encoding (#4514)

This commit is contained in:
Brandon Bodnar 2021-11-01 18:17:06 -04:00 committed by GitHub
parent 3b9e6261f9
commit 6264fb292c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 28 deletions

View File

@ -1638,8 +1638,6 @@ class S3Backend(BaseBackend):
key.lock_until = retention[1] key.lock_until = retention[1]
def append_to_key(self, bucket_name, key_name, value): def append_to_key(self, bucket_name, key_name, value):
key_name = clean_key_name(key_name)
key = self.get_object(bucket_name, key_name) key = self.get_object(bucket_name, key_name)
key.append_to_value(value) key.append_to_value(value)
return key return key
@ -2014,7 +2012,6 @@ class S3Backend(BaseBackend):
acl=None, acl=None,
src_version_id=None, src_version_id=None,
): ):
src_key_name = clean_key_name(src_key_name)
dest_key_name = clean_key_name(dest_key_name) dest_key_name = clean_key_name(dest_key_name)
dest_bucket = self.get_bucket(dest_bucket_name) dest_bucket = self.get_bucket(dest_bucket_name)
key = self.get_object(src_bucket_name, src_key_name, version_id=src_version_id) key = self.get_object(src_bucket_name, src_key_name, version_id=src_version_id)

View File

@ -62,7 +62,6 @@ from .models import (
) )
from .utils import ( from .utils import (
bucket_name_from_url, bucket_name_from_url,
clean_key_name,
metadata_from_headers, metadata_from_headers,
parse_region_from_url, parse_region_from_url,
) )
@ -1396,14 +1395,14 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
upload_id = query["uploadId"][0] upload_id = query["uploadId"][0]
part_number = int(query["partNumber"][0]) part_number = int(query["partNumber"][0])
if "x-amz-copy-source" in request.headers: if "x-amz-copy-source" in request.headers:
src = unquote(request.headers.get("x-amz-copy-source")).lstrip("/") copy_source = request.headers.get("x-amz-copy-source")
src_bucket, src_key = src.split("/", 1) if isinstance(copy_source, bytes):
copy_source = copy_source.decode("utf-8")
src_key, src_version_id = ( copy_source_parsed = urlparse(copy_source)
src_key.split("?versionId=") src_bucket, src_key = copy_source_parsed.path.lstrip("/").split("/", 1)
if "?versionId=" in src_key src_version_id = parse_qs(copy_source_parsed.query).get(
else (src_key, None) "versionId", [None]
) )[0]
src_range = request.headers.get("x-amz-copy-source-range", "").split( src_range = request.headers.get("x-amz-copy-source-range", "").split(
"bytes=" "bytes="
)[-1] )[-1]
@ -1513,14 +1512,14 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
# Copy key # Copy key
# you can have a quoted ?version=abc with a version Id, so work on # you can have a quoted ?version=abc with a version Id, so work on
# we need to parse the unquoted string first # we need to parse the unquoted string first
src_key = request.headers.get("x-amz-copy-source") copy_source = request.headers.get("x-amz-copy-source")
if isinstance(src_key, bytes): if isinstance(copy_source, bytes):
src_key = src_key.decode("utf-8") copy_source = copy_source.decode("utf-8")
src_key_parsed = urlparse(src_key) copy_source_parsed = urlparse(copy_source)
src_bucket, src_key = ( src_bucket, src_key = copy_source_parsed.path.lstrip("/").split("/", 1)
clean_key_name(src_key_parsed.path).lstrip("/").split("/", 1) src_version_id = parse_qs(copy_source_parsed.query).get(
) "versionId", [None]
src_version_id = parse_qs(src_key_parsed.query).get("versionId", [None])[0] )[0]
key = self.backend.get_object( key = self.backend.get_object(
src_bucket, src_key, version_id=src_version_id src_bucket, src_key, version_id=src_version_id

View File

@ -380,29 +380,47 @@ def test_multipart_upload_with_headers_boto3():
# Has boto3 equivalent # Has boto3 equivalent
@pytest.mark.parametrize(
"original_key_name",
[
"original-key",
"the-unicode-💩-key",
"key-with?question-mark",
"key-with%2Fembedded%2Furl%2Fencoding",
],
)
@mock_s3_deprecated @mock_s3_deprecated
@reduced_min_part_size @reduced_min_part_size
def test_multipart_upload_with_copy_key(): def test_multipart_upload_with_copy_key(original_key_name):
conn = boto.connect_s3("the_key", "the_secret") conn = boto.connect_s3("the_key", "the_secret")
bucket = conn.create_bucket("foobar") bucket = conn.create_bucket("foobar")
key = Key(bucket) key = Key(bucket)
key.key = "original-key" key.key = original_key_name
key.set_contents_from_string("key_value") key.set_contents_from_string("key_value")
multipart = bucket.initiate_multipart_upload("the-key") multipart = bucket.initiate_multipart_upload("the-key")
part1 = b"0" * REDUCED_PART_SIZE part1 = b"0" * REDUCED_PART_SIZE
multipart.upload_part_from_file(BytesIO(part1), 1) multipart.upload_part_from_file(BytesIO(part1), 1)
multipart.copy_part_from_key("foobar", "original-key", 2, 0, 3) multipart.copy_part_from_key("foobar", original_key_name, 2, 0, 3)
multipart.complete_upload() multipart.complete_upload()
bucket.get_key("the-key").get_contents_as_string().should.equal(part1 + b"key_") bucket.get_key("the-key").get_contents_as_string().should.equal(part1 + b"key_")
@pytest.mark.parametrize(
"original_key_name",
[
"original-key",
"the-unicode-💩-key",
"key-with?question-mark",
"key-with%2Fembedded%2Furl%2Fencoding",
],
)
@mock_s3 @mock_s3
@reduced_min_part_size @reduced_min_part_size
def test_multipart_upload_with_copy_key_boto3(): def test_multipart_upload_with_copy_key_boto3(original_key_name):
s3 = boto3.client("s3", region_name=DEFAULT_REGION_NAME) s3 = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3.create_bucket(Bucket="foobar") s3.create_bucket(Bucket="foobar")
s3.put_object(Bucket="foobar", Key="original-key", Body="key_value") s3.put_object(Bucket="foobar", Key=original_key_name, Body="key_value")
mpu = s3.create_multipart_upload(Bucket="foobar", Key="the-key") mpu = s3.create_multipart_upload(Bucket="foobar", Key="the-key")
part1 = b"0" * REDUCED_PART_SIZE part1 = b"0" * REDUCED_PART_SIZE
@ -416,7 +434,7 @@ def test_multipart_upload_with_copy_key_boto3():
up2 = s3.upload_part_copy( up2 = s3.upload_part_copy(
Bucket="foobar", Bucket="foobar",
Key="the-key", Key="the-key",
CopySource={"Bucket": "foobar", "Key": "original-key"}, CopySource={"Bucket": "foobar", "Key": original_key_name},
CopySourceRange="0-3", CopySourceRange="0-3",
PartNumber=2, PartNumber=2,
UploadId=mpu["UploadId"], UploadId=mpu["UploadId"],
@ -884,7 +902,14 @@ def test_copy_key():
# Has boto3 equivalent # Has boto3 equivalent
@pytest.mark.parametrize("key_name", ["the-unicode-💩-key", "key-with?question-mark"]) @pytest.mark.parametrize(
"key_name",
[
"the-unicode-💩-key",
"key-with?question-mark",
"key-with%2Fembedded%2Furl%2Fencoding",
],
)
@mock_s3_deprecated @mock_s3_deprecated
def test_copy_key_with_special_chars(key_name): def test_copy_key_with_special_chars(key_name):
conn = boto.connect_s3("the_key", "the_secret") conn = boto.connect_s3("the_key", "the_secret")
@ -900,7 +925,13 @@ def test_copy_key_with_special_chars(key_name):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"key_name", ["the-key", "the-unicode-💩-key", "key-with?question-mark"] "key_name",
[
"the-key",
"the-unicode-💩-key",
"key-with?question-mark",
"key-with%2Fembedded%2Furl%2Fencoding",
],
) )
@mock_s3 @mock_s3
def test_copy_key_boto3(key_name): def test_copy_key_boto3(key_name):