S3: fix unquoting of S3 object keys in DeleteObjects (#6933)

This commit is contained in:
Ben Simon Hartung 2023-10-23 12:14:09 +02:00 committed by GitHub
parent 0a18a730d3
commit 9d9673d90c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 44 additions and 141 deletions

View File

@ -363,6 +363,11 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
self.uri = full_url self.uri = full_url
self.path = self.parsed_url.path self.path = self.parsed_url.path
if self.is_werkzeug_request and "RAW_URI" in request.environ:
self.raw_path = urlparse(request.environ.get("RAW_URI")).path
else:
self.raw_path = self.path
self.querystring = querystring self.querystring = querystring
self.data = querystring self.data = querystring
self.method = request.method self.method = request.method

View File

@ -1,13 +1,12 @@
import datetime import datetime
import inspect import inspect
import re import re
import unicodedata
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from gzip import decompress from gzip import decompress
from typing import Any, Optional, List, Callable, Dict, Tuple from typing import Any, Optional, List, Callable, Dict, Tuple
from urllib.parse import urlparse, unquote from urllib.parse import urlparse
from .common_types import TYPE_RESPONSE from .common_types import TYPE_RESPONSE
from .versions import is_werkzeug_2_3_x, PYTHON_311 from .versions import PYTHON_311
def camelcase_to_underscores(argument: str) -> str: def camelcase_to_underscores(argument: str) -> str:
@ -339,98 +338,5 @@ def params_sort_function(item: Tuple[str, Any]) -> Tuple[str, Any]:
return item return item
def normalize_werkzeug_path(path: str) -> str:
if is_werkzeug_2_3_x():
# New versions of werkzeug expose a quoted path
# %40connections
#
# Older versions (and botocore requests) expose the original:
# @connections
#
# We're unquoting the path here manually, so it behaves the same as botocore requests and requests coming in from old werkzeug versions.
#
return _unquote_hex_characters(path)
else:
return unquote(path)
def _unquote_hex_characters(path: str) -> str:
allowed_characters = ["%2F"] # /
# Path can contain a single hex character
# my%3Fchar
#
# Path can also contain multiple hex characters in a row
# %AA%AB%AC
#
# This is how complex unicode characters, such as smileys, are encoded.
# Note that these particular characters do not translate to anything useful
# For the sake of simplicy, let's assume that it translates to a smiley: :)
#
# Just to make things interesting, they could be found right next to eachother:
# my%3F%AA%AB%ACchar
#
# Which should translate to my?:)char
# char_ranges contains all consecutie hex characters:
# [(2, 5, %3F), (0, 9, %AA%AB%AC)]
char_ranges = [
(m.start(0), m.end(0)) for m in re.finditer("(%[0-9A-F][0-9A-F])+", path)
]
# characters_found will contain the replacement characters
# [(2, 5, '?'), (0, 9, ':)')]
characters_found: List[Tuple[int, int, str]] = []
for char_range in char_ranges:
range_start, range_end = char_range
possible_combo_start = range_start
possible_combo_end = range_end
while possible_combo_start < possible_combo_end:
# For every range, create combinations of possibilities
# iter 1: %AA%AB%AC
# iter 2: %AA%AB
# iter3: %AA
possible_char = path[possible_combo_start:possible_combo_end]
if possible_char in allowed_characters:
# Werkzeug has already converted these characters for us
possible_combo_end -= 3
continue
try:
start_of_raw_repr = possible_combo_start + len(characters_found)
end_of_raw_repr = start_of_raw_repr + len(possible_char)
# Verify that the current possibility is a known unicode character
unicodedata.category(unquote(possible_char))
characters_found.append(
(start_of_raw_repr, end_of_raw_repr, unquote(possible_char))
)
if range_end == possible_combo_end:
# We've matched on the full phrase:
# %AA%AB%AC
break
else:
# we matched on %AA%AB
# reset the indexes, and try to match %AC next
possible_combo_start = possible_combo_end
possible_combo_end = range_end
except: # noqa: E722 Do not use bare except
# 'unicodedata.category' would have thrown an error, meaning:
# %AA%AB%AC does not exist
# Try the next possibility:
# %AA%AB
possible_combo_end -= 3
# Replace the hex characters with the appropriate unicode representation
char_offset = 0
for char_pos in characters_found:
combo_start, combo_end, character = char_pos
path = (
path[0 : combo_start - char_offset]
+ character
+ path[combo_end - char_offset :]
)
char_offset += (combo_end - combo_start) + len(character) - 1
return path
def gzip_decompress(body: bytes) -> bytes: def gzip_decompress(body: bytes) -> bytes:
return decompress(body) return decompress(body)

View File

@ -5,14 +5,11 @@ from typing import Any, Dict, List, Iterator, Union, Tuple, Optional, Type
import urllib.parse import urllib.parse
from moto import settings from moto import settings
from moto.core.versions import is_werkzeug_2_3_x
from moto.core.utils import ( from moto.core.utils import (
extract_region_from_aws_authorization, extract_region_from_aws_authorization,
str_to_rfc_1123_datetime, str_to_rfc_1123_datetime,
normalize_werkzeug_path,
) )
from urllib.parse import parse_qs, urlparse, unquote, urlencode, urlunparse from urllib.parse import parse_qs, urlparse, unquote, urlencode, urlunparse
from urllib.parse import ParseResult
import xmltodict import xmltodict
@ -164,14 +161,8 @@ class S3Response(BaseResponse):
# Taking the naive approach to never decompress anything from S3 for now # Taking the naive approach to never decompress anything from S3 for now
self.allow_request_decompression = False self.allow_request_decompression = False
def get_safe_path_from_url(self, url: ParseResult) -> str: def get_safe_path(self) -> str:
return self.get_safe_path(url.path) return unquote(self.raw_path)
def get_safe_path(self, part: str) -> str:
if self.is_werkzeug_request:
return normalize_werkzeug_path(part)
else:
return unquote(part)
@property @property
def is_access_point(self) -> bool: def is_access_point(self) -> bool:
@ -1153,10 +1144,6 @@ class S3Response(BaseResponse):
objects = [objects] objects = [objects]
if len(objects) == 0: if len(objects) == 0:
raise MalformedXML() raise MalformedXML()
if self.is_werkzeug_request and is_werkzeug_2_3_x():
for obj in objects:
if "Key" in obj:
obj["Key"] = self.get_safe_path(obj["Key"])
if authenticated: if authenticated:
deleted_objects = self.backend.delete_objects(bucket_name, objects) deleted_objects = self.backend.delete_objects(bucket_name, objects)
@ -1273,7 +1260,7 @@ class S3Response(BaseResponse):
self, request: Any, full_url: str, headers: Dict[str, Any] self, request: Any, full_url: str, headers: Dict[str, Any]
) -> TYPE_RESPONSE: ) -> TYPE_RESPONSE:
parsed_url = urlparse(full_url) parsed_url = urlparse(full_url)
url_path = self.get_safe_path_from_url(parsed_url) url_path = self.get_safe_path()
query = parse_qs(parsed_url.query, keep_blank_values=True) query = parse_qs(parsed_url.query, keep_blank_values=True)
method = request.method method = request.method
@ -1495,8 +1482,9 @@ class S3Response(BaseResponse):
if isinstance(copy_source, bytes): if isinstance(copy_source, bytes):
copy_source = copy_source.decode("utf-8") copy_source = copy_source.decode("utf-8")
copy_source_parsed = urlparse(copy_source) copy_source_parsed = urlparse(copy_source)
url_path = self.get_safe_path_from_url(copy_source_parsed) src_bucket, src_key = (
src_bucket, src_key = url_path.lstrip("/").split("/", 1) unquote(copy_source_parsed.path).lstrip("/").split("/", 1)
)
src_version_id = parse_qs(copy_source_parsed.query).get( src_version_id = parse_qs(copy_source_parsed.query).get(
"versionId", [None] # type: ignore "versionId", [None] # type: ignore
)[0] )[0]

View File

@ -7,7 +7,6 @@ from moto.core.utils import (
unix_time, unix_time,
camelcase_to_pascal, camelcase_to_pascal,
pascal_to_camelcase, pascal_to_camelcase,
_unquote_hex_characters,
) )
@ -51,29 +50,3 @@ def test_camelcase_to_pascal(_input, expected):
@freeze_time("2015-01-01 12:00:00") @freeze_time("2015-01-01 12:00:00")
def test_unix_time(): def test_unix_time():
assert unix_time() == 1420113600.0 assert unix_time() == 1420113600.0
@pytest.mark.parametrize(
"original_url,result",
[
("some%3Fkey", "some?key"),
("6T7\x159\x12\r\x08.txt", "6T7\x159\x12\r\x08.txt"),
("foobar/the-unicode-%E2%98%BA-key", "foobar/the-unicode-☺-key"),
("key-with%2Eembedded%2Eurl%2Eencoding", "key-with.embedded.url.encoding"),
# Can represent a single character
("%E2%82%AC", ""),
("%2E", "."),
# Multiple chars in a row
("%E2%82%AC%E2%82%AC", "€€"),
("%2E%2E", ".."),
],
)
def test_quote_characters(original_url, result):
assert _unquote_hex_characters(original_url) == result
@pytest.mark.parametrize("original_path", ["%2F%2F", "s%2Fs%2Fs%2F"])
def test_quote_characters__with_slashes(original_path):
# If the string contains slashes, we ignore them
# Werkzeug already takes care of those for us
assert _unquote_hex_characters(original_path) == original_path

View File

@ -3413,6 +3413,37 @@ def test_delete_objects_with_empty_keyname():
assert "Contents" not in client.list_objects(Bucket=bucket_name) assert "Contents" not in client.list_objects(Bucket=bucket_name)
@mock_s3
def test_delete_objects_percent_encoded():
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "testbucket-encoded"
client.create_bucket(Bucket=bucket_name)
object_key_1 = "a%2Fb"
object_key_2 = "a/%F0%9F%98%80"
client.put_object(Bucket=bucket_name, Key=object_key_1, Body="percent encoding")
client.put_object(
Bucket=bucket_name, Key=object_key_2, Body="percent encoded emoji"
)
list_objs = client.list_objects(Bucket=bucket_name)
assert len(list_objs["Contents"]) == 2
assert list_objs["Contents"][0]["Key"] == object_key_1
assert list_objs["Contents"][1]["Key"] == object_key_2
delete_objects = client.delete_objects(
Bucket=bucket_name,
Delete={
"Objects": [
{"Key": object_key_1},
{"Key": object_key_2},
],
},
)
assert delete_objects["Deleted"][0] == {"Key": object_key_1}
assert delete_objects["Deleted"][1] == {"Key": object_key_2}
assert "Contents" not in client.list_objects(Bucket=bucket_name)
@mock_s3 @mock_s3
def test_head_object_should_return_default_content_type(): def test_head_object_should_return_default_content_type():
s3_resource = boto3.resource("s3", region_name="us-east-1") s3_resource = boto3.resource("s3", region_name="us-east-1")