diff --git a/moto/s3/models.py b/moto/s3/models.py index 0226909d9..97c374be2 100644 --- a/moto/s3/models.py +++ b/moto/s3/models.py @@ -135,6 +135,7 @@ class FakeKey(BaseModel, ManagedState): max_buffer_size if max_buffer_size else get_s3_default_key_buffer_size() ) self._value_buffer = tempfile.SpooledTemporaryFile(self._max_buffer_size) + self.disposed = False self.value = value self.lock = threading.Lock() @@ -339,6 +340,22 @@ class FakeKey(BaseModel, ManagedState): return False + def dispose(self, garbage=False): + if garbage and not self.disposed: + import warnings + + warnings.warn("S3 key was not disposed of in time", ResourceWarning) + try: + self._value_buffer.close() + if self.multipart: + self.multipart.dispose() + except: # noqa: E722 Do not use bare except + pass + self.disposed = True + + def __del__(self): + self.dispose(garbage=True) + class FakeMultipart(BaseModel): def __init__( @@ -401,6 +418,9 @@ class FakeMultipart(BaseModel): key = FakeKey( part_id, value, encryption=self.sse_encryption, kms_key_id=self.kms_key_id ) + if part_id in self.parts: + # We're overwriting the current part - dispose of it first + self.parts[part_id].dispose() self.parts[part_id] = key if part_id not in self.partlist: insort(self.partlist, part_id) @@ -411,6 +431,10 @@ class FakeMultipart(BaseModel): for part_id in self.partlist[part_number_marker:max_marker]: yield self.parts[part_id] + def dispose(self): + for part in self.parts.values(): + part.dispose() + class FakeGrantee(BaseModel): def __init__(self, grantee_id="", uri="", display_name=""): @@ -867,13 +891,20 @@ class PublicAccessBlock(BaseModel): } +class MultipartDict(dict): + def __delitem__(self, key): + if key in self: + self[key].dispose() + super().__delitem__(key) + + class FakeBucket(CloudFormationModel): def __init__(self, name, account_id, region_name): self.name = name self.account_id = account_id self.region_name = region_name self.keys = _VersionedKeyStore() - self.multiparts = {} + self.multiparts = MultipartDict() self.versioning_status = None self.rules = [] self.policy = None @@ -1427,10 +1458,9 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider): for bucket in self.buckets.values(): for key in bucket.keys.values(): if isinstance(key, FakeKey): - key._value_buffer.close() - if key.multipart is not None: - for part in key.multipart.parts.values(): - part._value_buffer.close() + key.dispose() + for mp in bucket.multiparts.values(): + mp.dispose() super().reset() @property @@ -1748,11 +1778,13 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider): lock_until=lock_until, ) - keys = [ - key - for key in bucket.keys.getlist(key_name, []) - if key.version_id != new_key.version_id - ] + [new_key] + existing_keys = bucket.keys.getlist(key_name, []) + if bucket.is_versioned: + keys = existing_keys + [new_key] + else: + for key in existing_keys: + key.dispose() + keys = [new_key] bucket.keys.setlist(key_name, keys) notifications.send_event( @@ -1932,20 +1964,6 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider): pub_block_config.get("RestrictPublicBuckets"), ) - def complete_multipart(self, bucket_name, multipart_id, body): - bucket = self.get_bucket(bucket_name) - multipart = bucket.multiparts[multipart_id] - value, etag = multipart.complete(body) - if value is None: - return - del bucket.multiparts[multipart_id] - - key = self.put_object( - bucket_name, multipart.key_name, value, etag=etag, multipart=multipart - ) - key.set_metadata(multipart.metadata) - return key - def abort_multipart_upload(self, bucket_name, multipart_id): bucket = self.get_bucket(bucket_name) multipart_data = bucket.multiparts.get(multipart_id, None) diff --git a/moto/s3/utils.py b/moto/s3/utils.py index 8a175158f..79cf48ca5 100644 --- a/moto/s3/utils.py +++ b/moto/s3/utils.py @@ -108,6 +108,11 @@ class _VersionedKeyStore(dict): def __sgetitem__(self, key): return super().__getitem__(key) + def pop(self, key): + for version in self.getlist(key, []): + version.dispose() + super().pop(key) + def __getitem__(self, key): return self.__sgetitem__(key)[-1] diff --git a/tests/test_s3/test_s3_file_handles.py b/tests/test_s3/test_s3_file_handles.py new file mode 100644 index 000000000..e033cc9b5 --- /dev/null +++ b/tests/test_s3/test_s3_file_handles.py @@ -0,0 +1,180 @@ +import gc +import warnings +import unittest +from functools import wraps +from moto.s3 import models as s3model +from moto.s3.responses import S3ResponseInstance + + +def verify_zero_warnings(f): + @wraps(f) + def wrapped(*args, **kwargs): + with warnings.catch_warnings(record=True) as warning_list: + warnings.simplefilter("always", ResourceWarning) # add filter + resp = f(*args, **kwargs) + # Get the TestClass reference, and reset the S3Backend that we've created as part of that class + (class_ref,) = args + class_ref.__dict__["s3"].reset() + # Now collect garbage, which will throw any warnings if there are unclosed FileHandles + gc.collect() + warning_types = [type(warn.message) for warn in warning_list] + warning_types.shouldnt.contain(ResourceWarning) + return resp + + return wrapped + + +class TestS3FileHandleClosures(unittest.TestCase): + """ + Large Uploads are written to disk for performance reasons + These tests verifies that the filehandles are properly closed after specific actions + """ + + def setUp(self) -> None: + self.s3 = s3model.S3Backend("us-west-1", "1234") + self.s3.create_bucket("my-bucket", "us-west-1") + self.s3.create_bucket("versioned-bucket", "us-west-1") + self.s3.put_object("my-bucket", "my-key", "x" * 10_000_000) + + def tearDown(self) -> None: + self.s3.reset() + + @verify_zero_warnings + def test_upload_large_file(self): + # Verify that we can create an object, as done in the setUp-method + # Do not create any other objects, as it will taint the state that we're testing + pass + + @verify_zero_warnings + def test_delete_large_file(self): + self.s3.delete_object(bucket_name="my-bucket", key_name="my-key") + + @verify_zero_warnings + def test_overwriting_file(self): + self.s3.put_object("my-bucket", "my-key", "b" * 10_000_000) + + @verify_zero_warnings + def test_copy_object(self): + key = self.s3.get_object("my-bucket", "my-key") + self.s3.copy_object( + src_key=key, dest_bucket_name="my-bucket", dest_key_name="key-2" + ) + + @verify_zero_warnings + def test_part_upload(self): + multipart_id = self.s3.create_multipart_upload( + bucket_name="my-bucket", + key_name="mp-key", + metadata={}, + storage_type="STANDARD", + tags=[], + acl=None, + sse_encryption=None, + kms_key_id=None, + ) + self.s3.upload_part( + bucket_name="my-bucket", + multipart_id=multipart_id, + part_id=1, + value="b" * 10_000_000, + ) + + @verify_zero_warnings + def test_overwriting_part_upload(self): + multipart_id = self.s3.create_multipart_upload( + bucket_name="my-bucket", + key_name="mp-key", + metadata={}, + storage_type="STANDARD", + tags=[], + acl=None, + sse_encryption=None, + kms_key_id=None, + ) + self.s3.upload_part( + bucket_name="my-bucket", + multipart_id=multipart_id, + part_id=1, + value="b" * 10_000_000, + ) + self.s3.upload_part( + bucket_name="my-bucket", + multipart_id=multipart_id, + part_id=1, + value="c" * 10_000_000, + ) + + @verify_zero_warnings + def test_aborting_part_upload(self): + multipart_id = self.s3.create_multipart_upload( + bucket_name="my-bucket", + key_name="mp-key", + metadata={}, + storage_type="STANDARD", + tags=[], + acl=None, + sse_encryption=None, + kms_key_id=None, + ) + self.s3.upload_part( + bucket_name="my-bucket", + multipart_id=multipart_id, + part_id=1, + value="b" * 10_000_000, + ) + self.s3.abort_multipart_upload( + bucket_name="my-bucket", multipart_id=multipart_id + ) + + @verify_zero_warnings + def test_completing_part_upload(self): + multipart_id = self.s3.create_multipart_upload( + bucket_name="my-bucket", + key_name="mp-key", + metadata={}, + storage_type="STANDARD", + tags=[], + acl=None, + sse_encryption=None, + kms_key_id=None, + ) + etag = self.s3.upload_part( + bucket_name="my-bucket", + multipart_id=multipart_id, + part_id=1, + value="b" * 10_000_000, + ).etag + + mp_body = f"""{etag}1""" + body = S3ResponseInstance._complete_multipart_body(mp_body) + self.s3.complete_multipart_upload( + bucket_name="my-bucket", multipart_id=multipart_id, body=body + ) + + @verify_zero_warnings + def test_single_versioned_upload(self): + self.s3.put_object("versioned-bucket", "my-key", "x" * 10_000_000) + + @verify_zero_warnings + def test_overwrite_versioned_upload(self): + self.s3.put_object("versioned-bucket", "my-key", "x" * 10_000_000) + + @verify_zero_warnings + def test_multiple_versions_upload(self): + self.s3.put_object("versioned-bucket", "my-key", "x" * 10_000_000) + self.s3.put_object("versioned-bucket", "my-key", "y" * 10_000_000) + self.s3.put_object("versioned-bucket", "my-key", "z" * 10_000_000) + + @verify_zero_warnings + def test_delete_versioned_upload(self): + self.s3.put_object("versioned-bucket", "my-key", "x" * 10_000_000) + self.s3.put_object("versioned-bucket", "my-key", "x" * 10_000_000) + self.s3.delete_object(bucket_name="my-bucket", key_name="my-key") + + @verify_zero_warnings + def test_delete_specific_version(self): + self.s3.put_object("versioned-bucket", "my-key", "x" * 10_000_000) + key = self.s3.put_object("versioned-bucket", "my-key", "y" * 10_000_000) + self.s3.delete_object( + bucket_name="my-bucket", key_name="my-key", version_id=key._version_id + ) diff --git a/tests/test_s3/test_s3_multipart.py b/tests/test_s3/test_s3_multipart.py index 2226464ca..b152c88ba 100644 --- a/tests/test_s3/test_s3_multipart.py +++ b/tests/test_s3/test_s3_multipart.py @@ -1,10 +1,8 @@ import boto3 import os -import gc import pytest import sure # noqa # pylint: disable=unused-import import requests -import warnings from botocore.client import ClientError from functools import wraps @@ -1011,39 +1009,3 @@ def test_head_object_returns_part_count(): # Header is not returned when we do not pass PartNumber resp = client.head_object(Bucket=bucket, Key=key) resp.shouldnt.have.key("PartsCount") - - -def test_reset_after_multipart_upload_closes_file_handles(): - """ - Large Uploads are written to disk for performance reasons - This test verifies that the filehandles are properly closed when resetting Moto - """ - s3 = s3model.S3Backend("us-west-1", "1234") - s3.create_bucket("my-bucket", "us-west-1") - s3.put_object("my-bucket", "my-key", "x" * 10_000_000) - - with warnings.catch_warnings(record=True) as warning_list: - warnings.simplefilter("always", ResourceWarning) # add filter - s3.reset() - gc.collect() - warning_types = [type(warn.message) for warn in warning_list] - warning_types.shouldnt.contain(ResourceWarning) - - -def test_deleting_large_upload_closes_file_handles(): - """ - Large Uploads are written to disk for performance reasons - This test verifies that the filehandles are properly closed on deletion - """ - s3 = s3model.S3Backend("us-east-1", "1234") - s3.create_bucket("my-bucket", "us-west-1") - s3.put_object("my-bucket", "my-key", "x" * 10_000_000) - - with warnings.catch_warnings(record=True) as warning_list: - warnings.simplefilter("always", ResourceWarning) # add filter - s3.delete_object(bucket_name="my-bucket", key_name="my-key") - gc.collect() - warning_types = [type(warn.message) for warn in warning_list] - warning_types.shouldnt.contain(ResourceWarning) - - s3.reset()