switch from calling clean in loop to undoing clean in delete_keys

This commit is contained in:
Jesse Vogt 2019-09-24 17:07:58 -05:00
parent 2df0309db5
commit 3b4cd1c27b
3 changed files with 28 additions and 20 deletions

View File

@ -20,7 +20,7 @@ from .exceptions import BucketAlreadyExists, S3ClientError, MissingBucket, Missi
MalformedACLError, InvalidNotificationARN, InvalidNotificationEvent, ObjectNotInActiveTierError MalformedACLError, InvalidNotificationARN, InvalidNotificationEvent, ObjectNotInActiveTierError
from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey, FakeTagging, FakeTagSet, \ from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey, FakeTagging, FakeTagSet, \
FakeTag FakeTag
from .utils import bucket_name_from_url, clean_key_name, metadata_from_headers, parse_region_from_url from .utils import bucket_name_from_url, clean_key_name, undo_clean_key_name, metadata_from_headers, parse_region_from_url
from xml.dom import minidom from xml.dom import minidom
@ -711,7 +711,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
for k in keys: for k in keys:
key_name = k.firstChild.nodeValue key_name = k.firstChild.nodeValue
success = self.backend.delete_key(bucket_name, key_name) success = self.backend.delete_key(bucket_name, undo_clean_key_name(key_name))
if success: if success:
deleted_names.append(key_name) deleted_names.append(key_name)
else: else:

View File

@ -5,7 +5,7 @@ import os
from boto.s3.key import Key from boto.s3.key import Key
import re import re
import six import six
from six.moves.urllib.parse import urlparse, unquote from six.moves.urllib.parse import urlparse, unquote, quote
import sys import sys
@ -68,22 +68,16 @@ def metadata_from_headers(headers):
return metadata return metadata
def clean_key_name(key_name, attempts=4): def clean_key_name(key_name):
if six.PY2: if six.PY2:
def uq(k): return unquote(key_name.encode('utf-8')).decode('utf-8')
return unquote(k.encode('utf-8')).decode('utf-8') return unquote(key_name)
else:
uq = unquote
original = cleaned = key_name
last_attempt = attempts - 1 def undo_clean_key_name(key_name):
for attempt in range(attempts): if six.PY2:
cleaned = uq(key_name) return quote(key_name.encode('utf-8')).decode('utf-8')
if cleaned == key_name: return quote(key_name)
return cleaned
if attempt != last_attempt:
key_name = cleaned
raise Exception('unable to fully clean name: original %s, last clean %s prior clean %s' % (original, cleaned, key_name))
class _VersionedKeyStore(dict): class _VersionedKeyStore(dict):

View File

@ -1,7 +1,7 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import os import os
from sure import expect from sure import expect
from moto.s3.utils import bucket_name_from_url, _VersionedKeyStore, parse_region_from_url, clean_key_name from moto.s3.utils import bucket_name_from_url, _VersionedKeyStore, parse_region_from_url, clean_key_name, undo_clean_key_name
from parameterized import parameterized from parameterized import parameterized
@ -87,7 +87,21 @@ def test_parse_region_from_url():
('foo', ('foo',
'foo'), 'foo'),
('foo/run_dt%3D2019-01-01%252012%253A30%253A00', ('foo/run_dt%3D2019-01-01%252012%253A30%253A00',
'foo/run_dt=2019-01-01 12:30:00'), 'foo/run_dt=2019-01-01%2012%3A30%3A00'),
]) ])
def test_clean_key_name(key, expected): def test_clean_key_name(key, expected):
clean_key_name(key).should.equal(expected) clean_key_name(key).should.equal(expected)
@parameterized([
('foo/bar/baz',
'foo/bar/baz'),
('foo',
'foo'),
('foo/run_dt%3D2019-01-01%252012%253A30%253A00',
'foo/run_dt%253D2019-01-01%25252012%25253A30%25253A00'),
])
def test_undo_clean_key_name(key, expected):
undo_clean_key_name(key).should.equal(expected)