diff --git a/moto/core/responses.py b/moto/core/responses.py index 7f699d870..5a5cc2fc4 100644 --- a/moto/core/responses.py +++ b/moto/core/responses.py @@ -363,6 +363,11 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin): self.uri = full_url self.path = self.parsed_url.path + if self.is_werkzeug_request and "RAW_URI" in request.environ: + self.raw_path = urlparse(request.environ.get("RAW_URI")).path + else: + self.raw_path = self.path + self.querystring = querystring self.data = querystring self.method = request.method diff --git a/moto/core/utils.py b/moto/core/utils.py index efda9a392..e40c98fb4 100644 --- a/moto/core/utils.py +++ b/moto/core/utils.py @@ -1,13 +1,12 @@ import datetime import inspect import re -import unicodedata from botocore.exceptions import ClientError from gzip import decompress from typing import Any, Optional, List, Callable, Dict, Tuple -from urllib.parse import urlparse, unquote +from urllib.parse import urlparse from .common_types import TYPE_RESPONSE -from .versions import is_werkzeug_2_3_x, PYTHON_311 +from .versions import PYTHON_311 def camelcase_to_underscores(argument: str) -> str: @@ -339,98 +338,5 @@ def params_sort_function(item: Tuple[str, Any]) -> Tuple[str, Any]: return item -def normalize_werkzeug_path(path: str) -> str: - if is_werkzeug_2_3_x(): - # New versions of werkzeug expose a quoted path - # %40connections - # - # Older versions (and botocore requests) expose the original: - # @connections - # - # We're unquoting the path here manually, so it behaves the same as botocore requests and requests coming in from old werkzeug versions. - # - return _unquote_hex_characters(path) - else: - return unquote(path) - - -def _unquote_hex_characters(path: str) -> str: - allowed_characters = ["%2F"] # / - # Path can contain a single hex character - # my%3Fchar - # - # Path can also contain multiple hex characters in a row - # %AA%AB%AC - # - # This is how complex unicode characters, such as smileys, are encoded. - # Note that these particular characters do not translate to anything useful - # For the sake of simplicy, let's assume that it translates to a smiley: :) - # - # Just to make things interesting, they could be found right next to eachother: - # my%3F%AA%AB%ACchar - # - # Which should translate to my?:)char - - # char_ranges contains all consecutie hex characters: - # [(2, 5, %3F), (0, 9, %AA%AB%AC)] - char_ranges = [ - (m.start(0), m.end(0)) for m in re.finditer("(%[0-9A-F][0-9A-F])+", path) - ] - - # characters_found will contain the replacement characters - # [(2, 5, '?'), (0, 9, ':)')] - characters_found: List[Tuple[int, int, str]] = [] - for char_range in char_ranges: - range_start, range_end = char_range - possible_combo_start = range_start - possible_combo_end = range_end - while possible_combo_start < possible_combo_end: - # For every range, create combinations of possibilities - # iter 1: %AA%AB%AC - # iter 2: %AA%AB - # iter3: %AA - possible_char = path[possible_combo_start:possible_combo_end] - - if possible_char in allowed_characters: - # Werkzeug has already converted these characters for us - possible_combo_end -= 3 - continue - try: - start_of_raw_repr = possible_combo_start + len(characters_found) - end_of_raw_repr = start_of_raw_repr + len(possible_char) - # Verify that the current possibility is a known unicode character - unicodedata.category(unquote(possible_char)) - characters_found.append( - (start_of_raw_repr, end_of_raw_repr, unquote(possible_char)) - ) - if range_end == possible_combo_end: - # We've matched on the full phrase: - # %AA%AB%AC - break - else: - # we matched on %AA%AB - # reset the indexes, and try to match %AC next - possible_combo_start = possible_combo_end - possible_combo_end = range_end - except: # noqa: E722 Do not use bare except - # 'unicodedata.category' would have thrown an error, meaning: - # %AA%AB%AC does not exist - # Try the next possibility: - # %AA%AB - possible_combo_end -= 3 - - # Replace the hex characters with the appropriate unicode representation - char_offset = 0 - for char_pos in characters_found: - combo_start, combo_end, character = char_pos - path = ( - path[0 : combo_start - char_offset] - + character - + path[combo_end - char_offset :] - ) - char_offset += (combo_end - combo_start) + len(character) - 1 - return path - - def gzip_decompress(body: bytes) -> bytes: return decompress(body) diff --git a/moto/s3/responses.py b/moto/s3/responses.py index 907bdce54..99f73e067 100644 --- a/moto/s3/responses.py +++ b/moto/s3/responses.py @@ -5,14 +5,11 @@ from typing import Any, Dict, List, Iterator, Union, Tuple, Optional, Type import urllib.parse from moto import settings -from moto.core.versions import is_werkzeug_2_3_x from moto.core.utils import ( extract_region_from_aws_authorization, str_to_rfc_1123_datetime, - normalize_werkzeug_path, ) from urllib.parse import parse_qs, urlparse, unquote, urlencode, urlunparse -from urllib.parse import ParseResult import xmltodict @@ -164,14 +161,8 @@ class S3Response(BaseResponse): # Taking the naive approach to never decompress anything from S3 for now self.allow_request_decompression = False - def get_safe_path_from_url(self, url: ParseResult) -> str: - return self.get_safe_path(url.path) - - def get_safe_path(self, part: str) -> str: - if self.is_werkzeug_request: - return normalize_werkzeug_path(part) - else: - return unquote(part) + def get_safe_path(self) -> str: + return unquote(self.raw_path) @property def is_access_point(self) -> bool: @@ -1153,10 +1144,6 @@ class S3Response(BaseResponse): objects = [objects] if len(objects) == 0: raise MalformedXML() - if self.is_werkzeug_request and is_werkzeug_2_3_x(): - for obj in objects: - if "Key" in obj: - obj["Key"] = self.get_safe_path(obj["Key"]) if authenticated: deleted_objects = self.backend.delete_objects(bucket_name, objects) @@ -1273,7 +1260,7 @@ class S3Response(BaseResponse): self, request: Any, full_url: str, headers: Dict[str, Any] ) -> TYPE_RESPONSE: parsed_url = urlparse(full_url) - url_path = self.get_safe_path_from_url(parsed_url) + url_path = self.get_safe_path() query = parse_qs(parsed_url.query, keep_blank_values=True) method = request.method @@ -1495,8 +1482,9 @@ class S3Response(BaseResponse): if isinstance(copy_source, bytes): copy_source = copy_source.decode("utf-8") copy_source_parsed = urlparse(copy_source) - url_path = self.get_safe_path_from_url(copy_source_parsed) - src_bucket, src_key = url_path.lstrip("/").split("/", 1) + src_bucket, src_key = ( + unquote(copy_source_parsed.path).lstrip("/").split("/", 1) + ) src_version_id = parse_qs(copy_source_parsed.query).get( "versionId", [None] # type: ignore )[0] diff --git a/tests/test_core/test_utils.py b/tests/test_core/test_utils.py index 0b9e796e4..4186f2117 100644 --- a/tests/test_core/test_utils.py +++ b/tests/test_core/test_utils.py @@ -7,7 +7,6 @@ from moto.core.utils import ( unix_time, camelcase_to_pascal, pascal_to_camelcase, - _unquote_hex_characters, ) @@ -51,29 +50,3 @@ def test_camelcase_to_pascal(_input, expected): @freeze_time("2015-01-01 12:00:00") def test_unix_time(): assert unix_time() == 1420113600.0 - - -@pytest.mark.parametrize( - "original_url,result", - [ - ("some%3Fkey", "some?key"), - ("6T7\x159\x12\r\x08.txt", "6T7\x159\x12\r\x08.txt"), - ("foobar/the-unicode-%E2%98%BA-key", "foobar/the-unicode-☺-key"), - ("key-with%2Eembedded%2Eurl%2Eencoding", "key-with.embedded.url.encoding"), - # Can represent a single character - ("%E2%82%AC", "€"), - ("%2E", "."), - # Multiple chars in a row - ("%E2%82%AC%E2%82%AC", "€€"), - ("%2E%2E", ".."), - ], -) -def test_quote_characters(original_url, result): - assert _unquote_hex_characters(original_url) == result - - -@pytest.mark.parametrize("original_path", ["%2F%2F", "s%2Fs%2Fs%2F"]) -def test_quote_characters__with_slashes(original_path): - # If the string contains slashes, we ignore them - # Werkzeug already takes care of those for us - assert _unquote_hex_characters(original_path) == original_path diff --git a/tests/test_s3/test_s3.py b/tests/test_s3/test_s3.py index aa45cb075..c2b5f66b1 100644 --- a/tests/test_s3/test_s3.py +++ b/tests/test_s3/test_s3.py @@ -3413,6 +3413,37 @@ def test_delete_objects_with_empty_keyname(): assert "Contents" not in client.list_objects(Bucket=bucket_name) +@mock_s3 +def test_delete_objects_percent_encoded(): + client = boto3.client("s3", region_name=DEFAULT_REGION_NAME) + bucket_name = "testbucket-encoded" + client.create_bucket(Bucket=bucket_name) + + object_key_1 = "a%2Fb" + object_key_2 = "a/%F0%9F%98%80" + client.put_object(Bucket=bucket_name, Key=object_key_1, Body="percent encoding") + client.put_object( + Bucket=bucket_name, Key=object_key_2, Body="percent encoded emoji" + ) + list_objs = client.list_objects(Bucket=bucket_name) + assert len(list_objs["Contents"]) == 2 + assert list_objs["Contents"][0]["Key"] == object_key_1 + assert list_objs["Contents"][1]["Key"] == object_key_2 + + delete_objects = client.delete_objects( + Bucket=bucket_name, + Delete={ + "Objects": [ + {"Key": object_key_1}, + {"Key": object_key_2}, + ], + }, + ) + assert delete_objects["Deleted"][0] == {"Key": object_key_1} + assert delete_objects["Deleted"][1] == {"Key": object_key_2} + assert "Contents" not in client.list_objects(Bucket=bucket_name) + + @mock_s3 def test_head_object_should_return_default_content_type(): s3_resource = boto3.resource("s3", region_name="us-east-1")