Merge pull request #2430 from jessevogt/handle-url-encoded-keys

Fix for #2431 delete objects fails with url encoded key
This commit is contained in:
Steve Pulec 2019-09-25 20:55:42 -05:00 committed by GitHub
commit 23c1696b72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 74 additions and 5 deletions

View File

@ -20,7 +20,7 @@ from .exceptions import BucketAlreadyExists, S3ClientError, MissingBucket, Missi
MalformedACLError, InvalidNotificationARN, InvalidNotificationEvent, ObjectNotInActiveTierError MalformedACLError, InvalidNotificationARN, InvalidNotificationEvent, ObjectNotInActiveTierError
from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey, FakeTagging, FakeTagSet, \ from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey, FakeTagging, FakeTagSet, \
FakeTag FakeTag
from .utils import bucket_name_from_url, clean_key_name, metadata_from_headers, parse_region_from_url from .utils import bucket_name_from_url, clean_key_name, undo_clean_key_name, metadata_from_headers, parse_region_from_url
from xml.dom import minidom from xml.dom import minidom
@ -711,7 +711,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
for k in keys: for k in keys:
key_name = k.firstChild.nodeValue key_name = k.firstChild.nodeValue
success = self.backend.delete_key(bucket_name, key_name) success = self.backend.delete_key(bucket_name, undo_clean_key_name(key_name))
if success: if success:
deleted_names.append(key_name) deleted_names.append(key_name)
else: else:

View File

@ -5,7 +5,7 @@ import os
from boto.s3.key import Key from boto.s3.key import Key
import re import re
import six import six
from six.moves.urllib.parse import urlparse, unquote from six.moves.urllib.parse import urlparse, unquote, quote
import sys import sys
@ -71,10 +71,15 @@ def metadata_from_headers(headers):
def clean_key_name(key_name): def clean_key_name(key_name):
if six.PY2: if six.PY2:
return unquote(key_name.encode('utf-8')).decode('utf-8') return unquote(key_name.encode('utf-8')).decode('utf-8')
return unquote(key_name) return unquote(key_name)
def undo_clean_key_name(key_name):
if six.PY2:
return quote(key_name.encode('utf-8')).decode('utf-8')
return quote(key_name)
class _VersionedKeyStore(dict): class _VersionedKeyStore(dict):
""" A simplified/modified version of Django's `MultiValueDict` taken from: """ A simplified/modified version of Django's `MultiValueDict` taken from:

View File

@ -21,6 +21,7 @@ from botocore.handlers import disable_signing
from boto.s3.connection import S3Connection from boto.s3.connection import S3Connection
from boto.s3.key import Key from boto.s3.key import Key
from freezegun import freeze_time from freezegun import freeze_time
from parameterized import parameterized
import six import six
import requests import requests
import tests.backport_assert_raises # noqa import tests.backport_assert_raises # noqa
@ -3046,3 +3047,39 @@ def test_root_dir_with_empty_name_works():
if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'true': if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'true':
raise SkipTest('Does not work in server mode due to error in Workzeug') raise SkipTest('Does not work in server mode due to error in Workzeug')
store_and_read_back_a_key('/') store_and_read_back_a_key('/')
@parameterized([
('foo/bar/baz',),
('foo',),
('foo/run_dt%3D2019-01-01%252012%253A30%253A00',),
])
@mock_s3
def test_delete_objects_with_url_encoded_key(key):
s3 = boto3.client('s3', region_name='us-east-1')
bucket_name = 'mybucket'
body = b'Some body'
s3.create_bucket(Bucket=bucket_name)
def put_object():
s3.put_object(
Bucket=bucket_name,
Key=key,
Body=body
)
def assert_deleted():
with assert_raises(ClientError) as e:
s3.get_object(Bucket=bucket_name, Key=key)
e.exception.response['Error']['Code'].should.equal('NoSuchKey')
put_object()
s3.delete_object(Bucket=bucket_name, Key=key)
assert_deleted()
put_object()
s3.delete_objects(Bucket=bucket_name, Delete={'Objects': [{'Key': key}]})
assert_deleted()

View File

@ -1,7 +1,8 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import os import os
from sure import expect from sure import expect
from moto.s3.utils import bucket_name_from_url, _VersionedKeyStore, parse_region_from_url from moto.s3.utils import bucket_name_from_url, _VersionedKeyStore, parse_region_from_url, clean_key_name, undo_clean_key_name
from parameterized import parameterized
def test_base_url(): def test_base_url():
@ -78,3 +79,29 @@ def test_parse_region_from_url():
'https://s3.amazonaws.com/bucket', 'https://s3.amazonaws.com/bucket',
'https://bucket.s3.amazonaws.com']: 'https://bucket.s3.amazonaws.com']:
parse_region_from_url(url).should.equal(expected) parse_region_from_url(url).should.equal(expected)
@parameterized([
('foo/bar/baz',
'foo/bar/baz'),
('foo',
'foo'),
('foo/run_dt%3D2019-01-01%252012%253A30%253A00',
'foo/run_dt=2019-01-01%2012%3A30%3A00'),
])
def test_clean_key_name(key, expected):
clean_key_name(key).should.equal(expected)
@parameterized([
('foo/bar/baz',
'foo/bar/baz'),
('foo',
'foo'),
('foo/run_dt%3D2019-01-01%252012%253A30%253A00',
'foo/run_dt%253D2019-01-01%25252012%25253A30%25253A00'),
])
def test_undo_clean_key_name(key, expected):
undo_clean_key_name(key).should.equal(expected)