process multipart form
This commit is contained in:
parent
b3f6e5ab2f
commit
49b056563a
@ -7,7 +7,7 @@ import six
|
||||
from botocore.awsrequest import AWSPreparedRequest
|
||||
|
||||
from moto.core.utils import str_to_rfc_1123_datetime, py2_strip_unicode_keys
|
||||
from six.moves.urllib.parse import parse_qs, urlparse, unquote
|
||||
from six.moves.urllib.parse import parse_qs, urlparse, unquote, parse_qsl
|
||||
|
||||
import xmltodict
|
||||
|
||||
@ -143,6 +143,31 @@ def is_delete_keys(request, path, bucket_name):
|
||||
)
|
||||
|
||||
|
||||
def _process_multipart_formdata(request):
|
||||
"""
|
||||
When not using the live server, the request does not pass through flask, so it is not processed.
|
||||
This will only be used in places where we end up with a requests PreparedRequest.
|
||||
"""
|
||||
form = {}
|
||||
boundkey = request.headers['Content-Type'][len('multipart/form-data; boundary='):]
|
||||
boundary = f'--{boundkey}'
|
||||
data = request.body.decode().split(boundary)
|
||||
fields = [field.split('\r\n\r\n') for field in data][1:-1]
|
||||
for key, value in fields:
|
||||
key, value = key.replace('\r\n', ''), value.replace('\r\n', '')
|
||||
key = key.split('; ')
|
||||
if len(key) == 2:
|
||||
disposition, name = key
|
||||
filename = None
|
||||
else:
|
||||
disposition, name, filename = key
|
||||
name = name[len('name='):].strip('"')
|
||||
if disposition.endswith('form-data'):
|
||||
form[name] = value
|
||||
import code; code.interact(local=locals())
|
||||
return form
|
||||
|
||||
|
||||
class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
|
||||
def __init__(self, backend):
|
||||
super(ResponseObject, self).__init__()
|
||||
@ -776,9 +801,9 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
|
||||
template = self.response_template(S3_DELETE_BUCKET_WITH_ITEMS_ERROR)
|
||||
return 409, {}, template.render(bucket=removed_bucket)
|
||||
|
||||
def _bucket_response_post(self, request, body, bucket_name, headers):
|
||||
def _bucket_response_post(self, request, body, bucket_name):
|
||||
response_headers = {}
|
||||
if not request.headers.get('Content-Length'):
|
||||
if not request.headers.get("Content-Length"):
|
||||
return 411, {}, "Content-Length required"
|
||||
|
||||
path = self._get_path(request)
|
||||
@ -796,14 +821,12 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
|
||||
if hasattr(request, "form"):
|
||||
# Not HTTPretty
|
||||
form = request.form
|
||||
elif request.headers.get('Content-Type').startswith('multipart/form-data'):
|
||||
form = _process_multipart_formdata(request)
|
||||
else:
|
||||
# HTTPretty, build new form object
|
||||
body = body.decode()
|
||||
|
||||
form = {}
|
||||
for kv in body.split("&"):
|
||||
k, v = kv.split("=")
|
||||
form[k] = v
|
||||
form = dict(parse_qsl(body))
|
||||
|
||||
key = form["key"]
|
||||
if "file" in form:
|
||||
@ -811,12 +834,12 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
|
||||
else:
|
||||
f = request.files["file"].stream.read()
|
||||
|
||||
if 'success_action_redirect' in form:
|
||||
response_headers['Location'] = form['success_action_redirect']
|
||||
if "success_action_redirect" in form:
|
||||
response_headers["Location"] = form["success_action_redirect"]
|
||||
|
||||
if 'success_action_status' in form:
|
||||
status_code = form['success_action_status']
|
||||
elif 'success_action_redirect' in form:
|
||||
if "success_action_status" in form:
|
||||
status_code = form["success_action_status"]
|
||||
elif "success_action_redirect" in form:
|
||||
status_code = 303
|
||||
else:
|
||||
status_code = 204
|
||||
|
@ -4457,6 +4457,6 @@ def test_creating_presigned_post():
|
||||
ExpiresIn=1000,
|
||||
)
|
||||
resp = requests.post(data['url'], data=data['fields'], files={'file': fdata}, allow_redirects=False)
|
||||
assert resp.headers['Location'] == url
|
||||
assert resp.headers['Location'] == success_url
|
||||
assert resp.status_code == 303
|
||||
assert s3.get_object(Bucket=bucket, Key='{file_uuid}.txt'.format(file_uid=file_uid))['Body'].read() == fdata
|
||||
assert s3.get_object(Bucket=bucket, Key='{file_uid}.txt'.format(file_uid=file_uid))['Body'].read() == fdata
|
||||
|
Loading…
Reference in New Issue
Block a user