Merge pull request #2816 from bblommers/bugfix/2789
S3 - Bugfix - Remove possibilty for concurrent access to file content
This commit is contained in:
commit
c13aadf75b
@ -12,6 +12,7 @@ import codecs
|
||||
import random
|
||||
import string
|
||||
import tempfile
|
||||
import threading
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
@ -110,6 +111,7 @@ class FakeKey(BaseModel):
|
||||
self._value_buffer = tempfile.SpooledTemporaryFile(max_size=max_buffer_size)
|
||||
self._max_buffer_size = max_buffer_size
|
||||
self.value = value
|
||||
self.lock = threading.Lock()
|
||||
|
||||
@property
|
||||
def version_id(self):
|
||||
@ -117,8 +119,12 @@ class FakeKey(BaseModel):
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
self.lock.acquire()
|
||||
self._value_buffer.seek(0)
|
||||
return self._value_buffer.read()
|
||||
r = self._value_buffer.read()
|
||||
r = copy.copy(r)
|
||||
self.lock.release()
|
||||
return r
|
||||
|
||||
@value.setter
|
||||
def value(self, new_value):
|
||||
@ -130,6 +136,7 @@ class FakeKey(BaseModel):
|
||||
if isinstance(new_value, six.text_type):
|
||||
new_value = new_value.encode(DEFAULT_TEXT_ENCODING)
|
||||
self._value_buffer.write(new_value)
|
||||
self.contentsize = len(new_value)
|
||||
|
||||
def copy(self, new_name=None, new_is_versioned=None):
|
||||
r = copy.deepcopy(self)
|
||||
@ -157,6 +164,7 @@ class FakeKey(BaseModel):
|
||||
self.acl = acl
|
||||
|
||||
def append_to_value(self, value):
|
||||
self.contentsize += len(value)
|
||||
self._value_buffer.seek(0, os.SEEK_END)
|
||||
self._value_buffer.write(value)
|
||||
|
||||
@ -229,8 +237,7 @@ class FakeKey(BaseModel):
|
||||
|
||||
@property
|
||||
def size(self):
|
||||
self._value_buffer.seek(0, os.SEEK_END)
|
||||
return self._value_buffer.tell()
|
||||
return self.contentsize
|
||||
|
||||
@property
|
||||
def storage_class(self):
|
||||
@ -249,6 +256,7 @@ class FakeKey(BaseModel):
|
||||
state = self.__dict__.copy()
|
||||
state["value"] = self.value
|
||||
del state["_value_buffer"]
|
||||
del state["lock"]
|
||||
return state
|
||||
|
||||
def __setstate__(self, state):
|
||||
@ -258,6 +266,7 @@ class FakeKey(BaseModel):
|
||||
max_size=self._max_buffer_size
|
||||
)
|
||||
self.value = state["value"]
|
||||
self.lock = threading.Lock()
|
||||
|
||||
|
||||
class FakeMultipart(BaseModel):
|
||||
@ -284,7 +293,7 @@ class FakeMultipart(BaseModel):
|
||||
etag = etag.replace('"', "")
|
||||
if part is None or part_etag != etag:
|
||||
raise InvalidPart()
|
||||
if last is not None and len(last.value) < UPLOAD_PART_MIN_SIZE:
|
||||
if last is not None and last.contentsize < UPLOAD_PART_MIN_SIZE:
|
||||
raise EntityTooSmall()
|
||||
md5s.extend(decode_hex(part_etag)[0])
|
||||
total.extend(part.value)
|
||||
|
Loading…
Reference in New Issue
Block a user