Core: Reset data when mock stops (#5958)
This commit is contained in:
parent
eb3ed9106b
commit
7816b191b8
@ -64,10 +64,12 @@ class BaseMockAWS:
|
||||
if self.__class__.nested_count == 0:
|
||||
self.reset() # type: ignore[attr-defined]
|
||||
|
||||
def __call__(self, func: Callable[..., Any], reset: bool = True) -> Any:
|
||||
def __call__(
|
||||
self, func: Callable[..., Any], reset: bool = True, remove_data: bool = True
|
||||
) -> Any:
|
||||
if inspect.isclass(func):
|
||||
return self.decorate_class(func)
|
||||
return self.decorate_callable(func, reset)
|
||||
return self.decorate_callable(func, reset, remove_data)
|
||||
|
||||
def __enter__(self) -> "BaseMockAWS":
|
||||
self.start()
|
||||
@ -89,7 +91,7 @@ class BaseMockAWS:
|
||||
|
||||
self.enable_patching(reset) # type: ignore[attr-defined]
|
||||
|
||||
def stop(self) -> None:
|
||||
def stop(self, remove_data: bool = True) -> None:
|
||||
self.__class__.nested_count -= 1
|
||||
|
||||
if self.__class__.nested_count < 0:
|
||||
@ -105,18 +107,23 @@ class BaseMockAWS:
|
||||
pass
|
||||
self.unmock_env_variables()
|
||||
self.__class__.mocks_active = False
|
||||
if remove_data:
|
||||
# Reset the data across all backends
|
||||
for backend in self.backends.values():
|
||||
backend.reset()
|
||||
# Remove references to all model instances that were created
|
||||
reset_model_data()
|
||||
self.disable_patching() # type: ignore[attr-defined]
|
||||
|
||||
def decorate_callable(
|
||||
self, func: Callable[..., CALLABLE_RETURN], reset: bool
|
||||
self, func: Callable[..., CALLABLE_RETURN], reset: bool, remove_data: bool
|
||||
) -> Callable[..., CALLABLE_RETURN]:
|
||||
def wrapper(*args: Any, **kwargs: Any) -> CALLABLE_RETURN:
|
||||
self.start(reset=reset)
|
||||
try:
|
||||
result = func(*args, **kwargs)
|
||||
finally:
|
||||
self.stop()
|
||||
self.stop(remove_data=remove_data)
|
||||
return result
|
||||
|
||||
functools.update_wrapper(wrapper, func)
|
||||
@ -173,15 +180,18 @@ class BaseMockAWS:
|
||||
# Special case for UnitTests-class
|
||||
is_test_method = attr.startswith(unittest.TestLoader.testMethodPrefix)
|
||||
should_reset = False
|
||||
should_remove_data = False
|
||||
if attr in ["setUp", "setup_method"]:
|
||||
should_reset = True
|
||||
elif not has_setup_method and is_test_method:
|
||||
should_reset = True
|
||||
should_remove_data = True
|
||||
else:
|
||||
# Method is unrelated to the test setup
|
||||
# Method is a test, but was already reset while executing the setUp-method
|
||||
pass
|
||||
setattr(klass, attr, self(attr_value, reset=should_reset))
|
||||
kwargs = {"reset": should_reset, "remove_data": should_remove_data}
|
||||
setattr(klass, attr, self(attr_value, **kwargs))
|
||||
except TypeError:
|
||||
# Sometimes we can't set this for built-in types
|
||||
continue
|
||||
|
@ -5,8 +5,9 @@ import boto3
|
||||
import json
|
||||
import pytest
|
||||
from botocore.exceptions import ClientError
|
||||
from moto import mock_autoscaling, mock_sqs, settings
|
||||
from unittest import SkipTest
|
||||
from moto import mock_autoscaling, mock_s3, mock_sqs, settings
|
||||
from moto.core.model_instances import model_data, reset_model_data
|
||||
from unittest import SkipTest, TestCase
|
||||
|
||||
base_url = (
|
||||
"http://localhost:5000"
|
||||
@ -73,3 +74,61 @@ def test_creation_error__data_api_still_returns_thing():
|
||||
|
||||
names = [obj["name"] for obj in as_objects["FakeAutoScalingGroup"]]
|
||||
names.should.contain("test_asg")
|
||||
|
||||
|
||||
def test_model_data_is_emptied_as_necessary():
|
||||
if settings.TEST_SERVER_MODE:
|
||||
raise SkipTest("We're only interested in the decorator performance here")
|
||||
|
||||
# Reset any residual data
|
||||
reset_model_data()
|
||||
|
||||
# No instances exist, because we have just reset it
|
||||
for classes_per_service in model_data.values():
|
||||
for _class in classes_per_service.values():
|
||||
_class.instances.should.equal([])
|
||||
|
||||
with mock_sqs():
|
||||
# When just starting a mock, it is empty
|
||||
for classes_per_service in model_data.values():
|
||||
for _class in classes_per_service.values():
|
||||
_class.instances.should.equal([])
|
||||
|
||||
# After creating a queue, some data will be present
|
||||
conn = boto3.client("sqs", region_name="us-west-1")
|
||||
conn.create_queue(QueueName="queue1")
|
||||
|
||||
model_data["sqs"]["Queue"].instances.should.have.length_of(1)
|
||||
|
||||
# But after the mock ends, it is empty again
|
||||
for classes_per_service in model_data.values():
|
||||
for _class in classes_per_service.values():
|
||||
_class.instances.should.equal([])
|
||||
|
||||
# When we have multiple/nested mocks, the data should still be present after the first mock ends
|
||||
with mock_sqs():
|
||||
conn = boto3.client("sqs", region_name="us-west-1")
|
||||
conn.create_queue(QueueName="queue1")
|
||||
with mock_s3():
|
||||
# The data should still be here - instances should not reset if another mock is still active
|
||||
model_data["sqs"]["Queue"].instances.should.have.length_of(1)
|
||||
# The data should still be here - the inner mock has exited, but the outer mock is still active
|
||||
model_data["sqs"]["Queue"].instances.should.have.length_of(1)
|
||||
|
||||
|
||||
@mock_sqs
|
||||
class TestModelDataResetForClassDecorator(TestCase):
|
||||
def setUp(self):
|
||||
if settings.TEST_SERVER_MODE:
|
||||
raise SkipTest("We're only interested in the decorator performance here")
|
||||
|
||||
# No data is present at the beginning
|
||||
for classes_per_service in model_data.values():
|
||||
for _class in classes_per_service.values():
|
||||
_class.instances.should.equal([])
|
||||
|
||||
conn = boto3.client("sqs", region_name="us-west-1")
|
||||
conn.create_queue(QueueName="queue1")
|
||||
|
||||
def test_should_find_bucket(self):
|
||||
model_data["sqs"]["Queue"].instances.should.have.length_of(1)
|
||||
|
@ -168,6 +168,7 @@ class TestS3FileHandleClosures(TestCase):
|
||||
@verify_zero_warnings
|
||||
def test_overwrite_versioned_upload(self):
|
||||
self.s3.put_object("versioned-bucket", "my-key", "x" * 10_000_000)
|
||||
self.s3.put_object("versioned-bucket", "my-key", "x" * 10_000_000)
|
||||
|
||||
@verify_zero_warnings
|
||||
def test_multiple_versions_upload(self):
|
||||
@ -262,6 +263,21 @@ class TestS3FileHandleClosuresUsingMocks(TestCase):
|
||||
version = self.s3.put_object(Bucket="foo", Key="b", Body="s")["VersionId"]
|
||||
self.s3.delete_object(Bucket="foo", Key="b", VersionId=version)
|
||||
|
||||
@verify_zero_warnings
|
||||
def test_update_versioned_object__while_looping(self):
|
||||
for _ in (1, 2):
|
||||
with mock_s3():
|
||||
self.s3.create_bucket(Bucket="foo")
|
||||
self.s3.put_bucket_versioning(
|
||||
Bucket="foo",
|
||||
VersioningConfiguration={
|
||||
"Status": "Enabled",
|
||||
"MFADelete": "Disabled",
|
||||
},
|
||||
)
|
||||
self.s3.put_object(Bucket="foo", Key="bar", Body="stuff")
|
||||
self.s3.put_object(Bucket="foo", Key="bar", Body="stuff2")
|
||||
|
||||
|
||||
def test_verify_key_can_be_copied_after_disposing():
|
||||
# https://github.com/getmoto/moto/issues/5588
|
||||
|
Loading…
Reference in New Issue
Block a user