Techdebt: Reuse parsed URL across services (#6635)

This commit is contained in:
Bert Blommers 2023-08-12 07:00:16 +00:00 committed by GitHub
parent 303b1b92cb
commit 3682cc633b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 46 additions and 78 deletions

View File

@ -7,15 +7,7 @@ from urllib.parse import urlparse
from werkzeug.wrappers import Request from werkzeug.wrappers import Request
from .responses import TYPE_RESPONSE from .responses import TYPE_RESPONSE
from moto.utilities.distutils_version import LooseVersion from moto.core.versions import is_responses_0_17_x
try:
from importlib.metadata import version
except ImportError:
from importlib_metadata import version
RESPONSES_VERSION = version("responses")
class CallbackResponse(responses.CallbackResponse): class CallbackResponse(responses.CallbackResponse):
@ -155,7 +147,7 @@ def get_response_mock() -> responses.RequestsMock:
""" """
responses_mock = None responses_mock = None
if LooseVersion(RESPONSES_VERSION) >= LooseVersion("0.17.0"): if is_responses_0_17_x():
from .responses_custom_registry import CustomRegistry from .responses_custom_registry import CustomRegistry
responses_mock = responses.RequestsMock( responses_mock = responses.RequestsMock(
@ -170,7 +162,7 @@ def get_response_mock() -> responses.RequestsMock:
def reset_responses_mock(responses_mock: responses.RequestsMock) -> None: def reset_responses_mock(responses_mock: responses.RequestsMock) -> None:
if LooseVersion(RESPONSES_VERSION) >= LooseVersion("0.17.0"): if is_responses_0_17_x():
from .responses_custom_registry import CustomRegistry from .responses_custom_registry import CustomRegistry
responses_mock.reset() responses_mock.reset()

View File

@ -237,6 +237,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
use_raw_body: Use incoming bytes if True, encode to string otherwise use_raw_body: Use incoming bytes if True, encode to string otherwise
""" """
self.is_werkzeug_request = "werkzeug" in str(type(request)) self.is_werkzeug_request = "werkzeug" in str(type(request))
self.parsed_url = urlparse(full_url)
querystring: Dict[str, Any] = OrderedDict() querystring: Dict[str, Any] = OrderedDict()
if hasattr(request, "body"): if hasattr(request, "body"):
# Boto # Boto
@ -258,9 +259,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
self.body = self.body.decode("utf-8") self.body = self.body.decode("utf-8")
if not querystring: if not querystring:
querystring.update( querystring.update(parse_qs(self.parsed_url.query, keep_blank_values=True))
parse_qs(urlparse(full_url).query, keep_blank_values=True)
)
if not querystring: if not querystring:
if ( if (
"json" in request.headers.get("content-type", []) "json" in request.headers.get("content-type", [])
@ -298,7 +297,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
self.uri = full_url self.uri = full_url
self.path = urlparse(full_url).path self.path = self.parsed_url.path
self.querystring = querystring self.querystring = querystring
self.data = querystring self.data = querystring
self.method = request.method self.method = request.method
@ -307,7 +306,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
self.headers = request.headers self.headers = request.headers
if "host" not in self.headers: if "host" not in self.headers:
self.headers["host"] = urlparse(full_url).netloc self.headers["host"] = self.parsed_url.netloc
self.response_headers = { self.response_headers = {
"server": "amazon.com", "server": "amazon.com",
"date": datetime.datetime.now().strftime("%a, %d %b %Y %H:%M:%S GMT"), "date": datetime.datetime.now().strftime("%a, %d %b %Y %H:%M:%S GMT"),

View File

@ -6,8 +6,13 @@ except ImportError:
from importlib_metadata import version from importlib_metadata import version
RESPONSES_VERSION = version("responses")
WERKZEUG_VERSION = version("werkzeug") WERKZEUG_VERSION = version("werkzeug")
def is_responses_0_17_x() -> bool:
return LooseVersion(RESPONSES_VERSION) >= LooseVersion("0.17.0")
def is_werkzeug_2_3_x() -> bool: def is_werkzeug_2_3_x() -> bool:
return LooseVersion(WERKZEUG_VERSION) >= LooseVersion("2.3.0") return LooseVersion(WERKZEUG_VERSION) >= LooseVersion("2.3.0")

View File

@ -1,6 +1,5 @@
import json import json
from typing import Any, Dict, Union from typing import Any, Dict, Union
from urllib.parse import urlparse
from moto.core.common_types import TYPE_RESPONSE from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
@ -40,8 +39,7 @@ class DataBrewResponse(BaseResponse):
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
# https://docs.aws.amazon.com/databrew/latest/dg/API_DeleteRecipeVersion.html # https://docs.aws.amazon.com/databrew/latest/dg/API_DeleteRecipeVersion.html
if request.method == "DELETE": if request.method == "DELETE":
parsed_url = urlparse(full_url) split_path = self.parsed_url.path.strip("/").split("/")
split_path = parsed_url.path.strip("/").split("/")
recipe_name = split_path[1] recipe_name = split_path[1]
recipe_version = split_path[3] recipe_version = split_path[3]
self.databrew_backend.delete_recipe_version(recipe_name, recipe_version) self.databrew_backend.delete_recipe_version(recipe_name, recipe_version)
@ -100,8 +98,7 @@ class DataBrewResponse(BaseResponse):
def publish_recipe(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def publish_recipe(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
if request.method == "POST": if request.method == "POST":
parsed_url = urlparse(full_url) recipe_name = self.parsed_url.path.strip("/").split("/", 2)[1]
recipe_name = parsed_url.path.strip("/").split("/", 2)[1]
recipe_description = self.parameters.get("Description") recipe_description = self.parameters.get("Description")
self.databrew_backend.publish_recipe(recipe_name, recipe_description) self.databrew_backend.publish_recipe(recipe_name, recipe_description)
return 200, {}, json.dumps({"Name": recipe_name}) return 200, {}, json.dumps({"Name": recipe_name})
@ -128,9 +125,8 @@ class DataBrewResponse(BaseResponse):
@amzn_request_id @amzn_request_id
def recipe_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def recipe_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url)
recipe_name = parsed_url.path.rstrip("/").rsplit("/", 1)[1] recipe_name = self.parsed_url.path.rstrip("/").rsplit("/", 1)[1]
if request.method == "PUT": if request.method == "PUT":
return self.put_recipe_response(recipe_name) return self.put_recipe_response(recipe_name)
@ -180,9 +176,8 @@ class DataBrewResponse(BaseResponse):
@amzn_request_id @amzn_request_id
def ruleset_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def ruleset_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url)
ruleset_name = parsed_url.path.split("/")[-1] ruleset_name = self.parsed_url.path.split("/")[-1]
if request.method == "PUT": if request.method == "PUT":
response = self.put_ruleset_response(ruleset_name) response = self.put_ruleset_response(ruleset_name)
@ -285,9 +280,8 @@ class DataBrewResponse(BaseResponse):
@amzn_request_id @amzn_request_id
def dataset_response(self, request: Any, full_url: str, headers: Any) -> Union[str, TYPE_RESPONSE]: # type: ignore[return] def dataset_response(self, request: Any, full_url: str, headers: Any) -> Union[str, TYPE_RESPONSE]: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url)
dataset_name = parsed_url.path.split("/")[-1] dataset_name = self.parsed_url.path.split("/")[-1]
if request.method == "POST": if request.method == "POST":
return self.create_dataset() return self.create_dataset()
@ -339,9 +333,8 @@ class DataBrewResponse(BaseResponse):
@amzn_request_id @amzn_request_id
def job_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def job_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url)
job_name = parsed_url.path.rstrip("/").rsplit("/", 1)[1] job_name = self.parsed_url.path.rstrip("/").rsplit("/", 1)[1]
if request.method == "GET": if request.method == "GET":
return self.get_job_response(job_name) return self.get_job_response(job_name)
@ -430,9 +423,8 @@ class DataBrewResponse(BaseResponse):
@amzn_request_id @amzn_request_id
def profile_job_response(self, request: Any, full_url: str, headers: Any) -> str: # type: ignore[return] def profile_job_response(self, request: Any, full_url: str, headers: Any) -> str: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url)
job_name = parsed_url.path.rstrip("/").rsplit("/", 1)[1] job_name = self.parsed_url.path.rstrip("/").rsplit("/", 1)[1]
if request.method == "PUT": if request.method == "PUT":
return self.update_profile_job_response(job_name) return self.update_profile_job_response(job_name)
@ -440,9 +432,8 @@ class DataBrewResponse(BaseResponse):
@amzn_request_id @amzn_request_id
def recipe_job_response(self, request: Any, full_url: str, headers: Any) -> str: # type: ignore[return] def recipe_job_response(self, request: Any, full_url: str, headers: Any) -> str: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url)
job_name = parsed_url.path.rstrip("/").rsplit("/", 1)[1] job_name = self.parsed_url.path.rstrip("/").rsplit("/", 1)[1]
if request.method == "PUT": if request.method == "PUT":
return self.update_recipe_job_response(job_name) return self.update_recipe_job_response(job_name)

View File

@ -4,7 +4,6 @@ from datetime import datetime, timezone
from functools import wraps from functools import wraps
from typing import Any, Callable, Dict, List, Pattern from typing import Any, Callable, Dict, List, Pattern
from urllib.parse import urlparse
from moto.core.responses import AWSServiceSpec from moto.core.responses import AWSServiceSpec
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.core.responses import xml_to_json_response from moto.core.responses import xml_to_json_response
@ -65,9 +64,8 @@ class ElasticMapReduceResponse(BaseResponse):
super().__init__(service_name="emr") super().__init__(service_name="emr")
def get_region_from_url(self, request: Any, full_url: str) -> str: def get_region_from_url(self, request: Any, full_url: str) -> str:
parsed = urlparse(full_url)
for regex in ElasticMapReduceResponse.emr_region_regex: for regex in ElasticMapReduceResponse.emr_region_regex:
match = regex.search(parsed.netloc) match = regex.search(self.parsed_url.netloc)
if match: if match:
return match.group(1) return match.group(1)
return self.default_region return self.default_region

View File

@ -1,6 +1,6 @@
import json import json
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from urllib.parse import urlparse, parse_qs from urllib.parse import parse_qs
from moto.core.common_types import TYPE_RESPONSE from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
@ -291,8 +291,7 @@ class ManagedBlockchainResponse(BaseResponse):
def _node_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def _node_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
method = request.method method = request.method
parsed_url = urlparse(full_url) querystring = parse_qs(self.parsed_url.query, keep_blank_values=True)
querystring = parse_qs(parsed_url.query, keep_blank_values=True)
network_id = networkid_from_managedblockchain_url(full_url) network_id = networkid_from_managedblockchain_url(full_url)
member_id = memberid_from_managedblockchain_request(full_url, self.body) member_id = memberid_from_managedblockchain_request(full_url, self.body)
if method == "GET": if method == "GET":

View File

@ -1,7 +1,7 @@
"""Handles Route53 API requests, invokes method and returns response.""" """Handles Route53 API requests, invokes method and returns response."""
import datetime import datetime
import re import re
from urllib.parse import parse_qs, urlparse from urllib.parse import parse_qs
from jinja2 import Template from jinja2 import Template
from typing import Any from typing import Any
@ -95,8 +95,7 @@ class Route53(BaseResponse):
self, request: Any, full_url: str, headers: Any self, request: Any, full_url: str, headers: Any
) -> TYPE_RESPONSE: ) -> TYPE_RESPONSE:
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url) query_params = parse_qs(self.parsed_url.query)
query_params = parse_qs(parsed_url.query)
dnsnames = query_params.get("dnsname") dnsnames = query_params.get("dnsname")
dnsname, zones = self.backend.list_hosted_zones_by_name(dnsnames) dnsname, zones = self.backend.list_hosted_zones_by_name(dnsnames)
@ -108,8 +107,7 @@ class Route53(BaseResponse):
self, request: Any, full_url: str, headers: Any self, request: Any, full_url: str, headers: Any
) -> TYPE_RESPONSE: ) -> TYPE_RESPONSE:
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url) query_params = parse_qs(self.parsed_url.query)
query_params = parse_qs(parsed_url.query)
vpc_id = query_params.get("vpcid")[0] # type: ignore vpc_id = query_params.get("vpcid")[0] # type: ignore
zones = self.backend.list_hosted_zones_by_vpc(vpc_id) zones = self.backend.list_hosted_zones_by_vpc(vpc_id)
template = Template(LIST_HOSTED_ZONES_BY_VPC_RESPONSE) template = Template(LIST_HOSTED_ZONES_BY_VPC_RESPONSE)
@ -125,8 +123,7 @@ class Route53(BaseResponse):
def get_or_delete_hostzone_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def get_or_delete_hostzone_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url) zoneid = self.parsed_url.path.rstrip("/").rsplit("/", 1)[1]
zoneid = parsed_url.path.rstrip("/").rsplit("/", 1)[1]
if request.method == "GET": if request.method == "GET":
the_zone = self.backend.get_hosted_zone(zoneid) the_zone = self.backend.get_hosted_zone(zoneid)
@ -149,10 +146,9 @@ class Route53(BaseResponse):
# TODO: implement enable/disable dnssec apis # TODO: implement enable/disable dnssec apis
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url)
method = request.method method = request.method
zoneid = parsed_url.path.rstrip("/").rsplit("/", 2)[1] zoneid = self.parsed_url.path.rstrip("/").rsplit("/", 2)[1]
if method == "GET": if method == "GET":
self.backend.get_dnssec(zoneid) self.backend.get_dnssec(zoneid)
@ -163,8 +159,7 @@ class Route53(BaseResponse):
) -> TYPE_RESPONSE: ) -> TYPE_RESPONSE:
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url) zoneid = self.parsed_url.path.rstrip("/").rsplit("/", 2)[1]
zoneid = parsed_url.path.rstrip("/").rsplit("/", 2)[1]
elements = xmltodict.parse(self.body) elements = xmltodict.parse(self.body)
comment = vpc = elements.get("AssociateVPCWithHostedZoneRequest", {}).get( comment = vpc = elements.get("AssociateVPCWithHostedZoneRequest", {}).get(
@ -184,8 +179,7 @@ class Route53(BaseResponse):
) -> TYPE_RESPONSE: ) -> TYPE_RESPONSE:
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url) zoneid = self.parsed_url.path.rstrip("/").rsplit("/", 2)[1]
zoneid = parsed_url.path.rstrip("/").rsplit("/", 2)[1]
elements = xmltodict.parse(self.body) elements = xmltodict.parse(self.body)
comment = vpc = elements.get("DisassociateVPCFromHostedZoneRequest", {}).get( comment = vpc = elements.get("DisassociateVPCFromHostedZoneRequest", {}).get(
@ -202,10 +196,9 @@ class Route53(BaseResponse):
def rrset_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def rrset_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url)
method = request.method method = request.method
zoneid = parsed_url.path.rstrip("/").rsplit("/", 2)[1] zoneid = self.parsed_url.path.rstrip("/").rsplit("/", 2)[1]
if method == "POST": if method == "POST":
elements = xmltodict.parse(self.body) elements = xmltodict.parse(self.body)
@ -242,7 +235,7 @@ class Route53(BaseResponse):
return 200, headers, CHANGE_RRSET_RESPONSE return 200, headers, CHANGE_RRSET_RESPONSE
elif method == "GET": elif method == "GET":
querystring = parse_qs(parsed_url.query) querystring = parse_qs(self.parsed_url.query)
template = Template(LIST_RRSET_RESPONSE) template = Template(LIST_RRSET_RESPONSE)
start_type = querystring.get("type", [None])[0] # type: ignore start_type = querystring.get("type", [None])[0] # type: ignore
start_name = querystring.get("name", [None])[0] # type: ignore start_name = querystring.get("name", [None])[0] # type: ignore
@ -314,9 +307,8 @@ class Route53(BaseResponse):
def health_check_response2(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def health_check_response2(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url)
method = request.method method = request.method
health_check_id = parsed_url.path.split("/")[-1] health_check_id = self.parsed_url.path.split("/")[-1]
if method == "GET": if method == "GET":
health_check = self.backend.get_health_check(health_check_id) health_check = self.backend.get_health_check(health_check_id)
@ -351,14 +343,12 @@ class Route53(BaseResponse):
def health_check_status_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def health_check_status_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url)
method = request.method method = request.method
health_check_id = re.search( health_check_match = re.search(
r"healthcheck/(?P<health_check_id>[^/]+)/status$", parsed_url.path r"healthcheck/(?P<health_check_id>[^/]+)/status$", self.parsed_url.path
).group( # type: ignore[union-attr]
"health_check_id"
) )
health_check_id = health_check_match.group("health_check_id") # type: ignore[union-attr]
if method == "GET": if method == "GET":
self.backend.get_health_check(health_check_id) self.backend.get_health_check(health_check_id)
@ -390,9 +380,8 @@ class Route53(BaseResponse):
def list_or_change_tags_for_resource_request(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def list_or_change_tags_for_resource_request(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url) id_ = self.parsed_url.path.split("/")[-1]
id_ = parsed_url.path.split("/")[-1] type_ = self.parsed_url.path.split("/")[-2]
type_ = parsed_url.path.split("/")[-2]
if request.method == "GET": if request.method == "GET":
tags = self.backend.list_tags_for_resource(id_) tags = self.backend.list_tags_for_resource(id_)
@ -419,8 +408,7 @@ class Route53(BaseResponse):
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
if request.method == "GET": if request.method == "GET":
parsed_url = urlparse(full_url) change_id = self.parsed_url.path.rstrip("/").rsplit("/", 1)[1]
change_id = parsed_url.path.rstrip("/").rsplit("/", 1)[1]
template = Template(GET_CHANGE_RESPONSE) template = Template(GET_CHANGE_RESPONSE)
return 200, headers, template.render(change_id=change_id, xmlns=XMLNS) return 200, headers, template.render(change_id=change_id, xmlns=XMLNS)
@ -470,8 +458,7 @@ class Route53(BaseResponse):
def get_or_delete_query_logging_config_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def get_or_delete_query_logging_config_response(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url) query_logging_config_id = self.parsed_url.path.rstrip("/").rsplit("/", 1)[1]
query_logging_config_id = parsed_url.path.rstrip("/").rsplit("/", 1)[1]
if request.method == "GET": if request.method == "GET":
query_logging_config = self.backend.get_query_logging_config( query_logging_config = self.backend.get_query_logging_config(
@ -520,8 +507,7 @@ class Route53(BaseResponse):
def reusable_delegation_set(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def reusable_delegation_set(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
parsed_url = urlparse(full_url) ds_id = self.parsed_url.path.rstrip("/").rsplit("/")[-1]
ds_id = parsed_url.path.rstrip("/").rsplit("/")[-1]
if request.method == "GET": if request.method == "GET":
delegation_set = self.backend.get_reusable_delegation_set( delegation_set = self.backend.get_reusable_delegation_set(
delegation_set_id=ds_id delegation_set_id=ds_id

View File

@ -292,7 +292,7 @@ class S3Response(BaseResponse):
def _bucket_response( def _bucket_response(
self, request: Any, full_url: str self, request: Any, full_url: str
) -> Union[str, TYPE_RESPONSE]: ) -> Union[str, TYPE_RESPONSE]:
querystring = self._get_querystring(request, full_url) querystring = self._get_querystring(request)
method = request.method method = request.method
region_name = parse_region_from_url(full_url, use_default_region=False) region_name = parse_region_from_url(full_url, use_default_region=False)
if region_name is None: if region_name is None:
@ -327,7 +327,7 @@ class S3Response(BaseResponse):
f"Method {method} has not been implemented in the S3 backend yet" f"Method {method} has not been implemented in the S3 backend yet"
) )
def _get_querystring(self, request: Any, full_url: str) -> Dict[str, Any]: # type: ignore[misc] def _get_querystring(self, request: Any) -> Dict[str, Any]: # type: ignore[misc]
# Flask's Request has the querystring already parsed # Flask's Request has the querystring already parsed
# In ServerMode, we can use this, instead of manually parsing this # In ServerMode, we can use this, instead of manually parsing this
if hasattr(request, "args"): if hasattr(request, "args"):
@ -338,7 +338,6 @@ class S3Response(BaseResponse):
query_dict[key] = val if isinstance(val, list) else [val] query_dict[key] = val if isinstance(val, list) else [val]
return query_dict return query_dict
parsed_url = urlparse(full_url)
# full_url can be one of two formats, depending on the version of werkzeug used: # full_url can be one of two formats, depending on the version of werkzeug used:
# http://foobaz.localhost:5000/?prefix=bar%2Bbaz # http://foobaz.localhost:5000/?prefix=bar%2Bbaz
# http://foobaz.localhost:5000/?prefix=bar+baz # http://foobaz.localhost:5000/?prefix=bar+baz
@ -347,7 +346,7 @@ class S3Response(BaseResponse):
# #
# Workaround - manually reverse the encoding. # Workaround - manually reverse the encoding.
# Keep the + encoded, ensuring that parse_qsl doesn't replace it, and parse_qsl will unquote it afterwards # Keep the + encoded, ensuring that parse_qsl doesn't replace it, and parse_qsl will unquote it afterwards
qs = (parsed_url.query or "").replace("+", "%2B") qs = (self.parsed_url.query or "").replace("+", "%2B")
return parse_qs(qs, keep_blank_values=True) return parse_qs(qs, keep_blank_values=True)
def _bucket_response_head( def _bucket_response_head(
@ -1249,9 +1248,8 @@ class S3Response(BaseResponse):
def _key_response( def _key_response(
self, request: Any, full_url: str, headers: Dict[str, Any] self, request: Any, full_url: str, headers: Dict[str, Any]
) -> TYPE_RESPONSE: ) -> TYPE_RESPONSE:
parsed_url = urlparse(full_url) url_path = self.get_safe_path_from_url(self.parsed_url)
url_path = self.get_safe_path_from_url(parsed_url) query = parse_qs(self.parsed_url.query, keep_blank_values=True)
query = parse_qs(parsed_url.query, keep_blank_values=True)
method = request.method method = request.method
key_name = self.parse_key_name(request, url_path) key_name = self.parse_key_name(request, url_path)